1 /** 
2  * This module contains the entire DFlake library, which is mostly centered
3  * around the SnowflakeGenerator.
4  */
5 module dflake;
6 
7 import core.sync.mutex : Mutex;
8 
9 /** 
10  * The Snowflake generator is a thread-safe generator of unique "snowflake"
11  * ids, which guarantees that ids it generates will always be unique and
12  * always increasing in value, provided that there are no two generators with
13  * the same node id at the same time.
14  *
15  * Here's a simple example of how to generate ids:
16  * ```d
17  * auto gen = SnowflakeGenerator(1);
18  * long id = gen.nextId;
19  * ```
20  *
21  * For background info on this type of id, please see [Twitter's original blog
22  * post on the topic](https://blog.twitter.com/engineering/en_us/a/2010/announcing-snowflake).
23  *
24  * Authors: Andrew Lalis, andrewlalisofficial@gmail.com
25  */
26 struct SnowflakeGenerator {
27     // Standard definitions of the structure of a snowflake id.
28     public static const UNUSED_BITS = 1;
29     public static const EPOCH_BITS = 41;
30     public static const NODE_ID_BITS = 10;
31     public static const SEQUENCE_BITS = 12;
32 
33     public static const long MAX_NODE_ID = (1L << NODE_ID_BITS) - 1;
34     public static const long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;
35 
36     // Default epoch in case one isn't given. (January 1, 2015 Midnight UTC = 2015-01-01T00:00:00Z)
37     public static const long DEFAULT_CUSTOM_EPOCH = 1_420_070_400_000L;
38 
39     /** 
40      * The node id for this generator.
41      */
42     private const long nodeId = 0;
43 
44     /** 
45      * The epoch offset to use for this generator, in milliseconds since the
46      * unix epoch.
47      */
48     private const long epoch = DEFAULT_CUSTOM_EPOCH;
49 
50     /** 
51      * Tracks the last timestamp at which an id was issued, in milliseconds
52      * since the epoch.
53      */
54     private long lastTimestamp = -1L;
55 
56     /** 
57      * Tracks the last sequence value we used. If multiple ids are needed
58      * within the span of 1 millisecond, this sequence is incremented for each
59      * id.
60      */
61     private long sequence = 0L;
62 
63     /** 
64      * A mutex to ensure that ids are generated sequentially, even if `nextId`
65      * is called from multiple threads.
66      */
67     private shared Mutex mutex;
68 
69     /** 
70      * Constructs the generator using a given node id and epoch offset.
71      * Params:
72      *   nodeId = The node id.
73      *   epoch = The epoch offset, in milliseconds since 01-01-1970
74      */
75     this(long nodeId, long epoch) {
76         assert(nodeId >= 0 && nodeId < MAX_NODE_ID);
77         assert(epoch >= 0 && epoch < timestampMillis(0));
78         this.nodeId = nodeId;
79         this.epoch = epoch;
80         this.mutex = new shared Mutex();
81     }
82 
83     /** 
84      * Constructs the generator using a default epoch offset.
85      * Params:
86      *   nodeId = The node id.
87      */
88     this(long nodeId) {
89         this(nodeId, DEFAULT_CUSTOM_EPOCH);
90     }
91 
92     /** 
93      * Generates a new unique id. Calls to this method are synchronized such
94      * that ids from this generator are generated sequentially.
95      * Returns: The id.
96      */
97     public long nextId() {
98         // Aquire a lock so that only one id can be generated at once by a single generator.
99         this.mutex.lock_nothrow();
100         scope(exit) this.mutex.unlock_nothrow(); // Unlock when we're done.
101 
102         long currentTimestamp = timestampMillis(this.epoch);
103         assert(currentTimestamp >= this.lastTimestamp);
104         if (currentTimestamp == this.lastTimestamp) {
105             this.sequence = (this.sequence + 1) & MAX_SEQUENCE;
106             if (this.sequence == 0) {
107                 currentTimestamp = waitNextMillis(currentTimestamp);
108             }
109         } else {
110             this.sequence = 0;
111         }
112 
113         this.lastTimestamp = currentTimestamp;
114         long id = currentTimestamp << (NODE_ID_BITS + SEQUENCE_BITS)
115             | (nodeId << SEQUENCE_BITS)
116             | this.sequence;
117         return id;
118     }
119 
120     import std.typecons : Tuple, tuple;
121 
122     /** 
123      * An alias for a tuple containing the basic information of a snowflake id.
124      */
125     alias SnowflakeIdInfo = Tuple!(long, "timestamp", long, "nodeId", long, "sequence");
126 
127     /** 
128      * Parses a snowflake id into its constituent parts.
129      * Params:
130      *   id = The id to parse.
131      * Returns: A tuple containing the timestamp, node id, and sequence value
132      * of the snowflake id.
133      */
134     public SnowflakeIdInfo parseId(long id) {
135         long maskNodeId = ((1L << NODE_ID_BITS) - 1) << SEQUENCE_BITS;
136         long maskSequence = (1L << SEQUENCE_BITS) - 1;
137 
138         long timestamp = (id >> (NODE_ID_BITS + SEQUENCE_BITS)) + this.epoch;
139         long nodeIdLocal = (id & maskNodeId) >> SEQUENCE_BITS;
140         long sequenceLocal = id & maskSequence;
141         return tuple!("timestamp", "nodeId", "sequence")(timestamp, nodeIdLocal, sequenceLocal);
142     }
143 
144     /** 
145      * Gets the current timestamp in milliseconds.
146      * Params:
147      *   offset = The offset to subtract from unix time.
148      * Returns: The milliseconds that have elapsed since the given offset from
149      * unix time.
150      */
151     private static long timestampMillis(long offset) {
152         import std.datetime : Clock, SysTime, DateTime, UTC, Duration;
153         SysTime now = Clock.currStdTime();
154         SysTime unixEpoch = SysTime(DateTime(1970, 1, 1), UTC());
155         Duration diff = now - unixEpoch;
156         return diff.total!"msecs" - offset;
157     }
158 
159     /** 
160      * Performs a busy-wait until a single millisecond has elapsed since the
161      * given timestamp.
162      * Params:
163      *   currentTimestamp = The current timestamp.
164      * Returns: The next timestamp, once we reach it.
165      */
166     private long waitNextMillis(long currentTimestamp) {
167         while (currentTimestamp == lastTimestamp) {
168             currentTimestamp = timestampMillis(this.epoch);
169         }
170         return currentTimestamp;
171     }
172 }
173 
174 // Tests for the correctness of the generator.
175 unittest {
176     // Check that each next id is unique.
177     auto gen = SnowflakeGenerator(123);
178     const iterations = 5000;
179 
180     long[iterations] ids;
181     foreach (i; 0 .. iterations) {
182         ids[i] = gen.nextId;
183     }
184 
185     foreach (i; 0 .. ids.length) {
186         foreach (j; i + 1 .. ids.length) {
187             assert(ids[i] != ids[j], "Ids generated by the same generator should be unique.");
188             assert(ids[i] < ids[j], "Ids should be increasing in value over time.");
189         }
190     }
191 
192     // Check that ids generated from separate threads are unique.
193     import core.thread : Thread;
194     import std.format;
195     import std.stdio;
196 
197     const iterationsPerThread = 1000;
198     const threadCount = 50;
199     const multiIdsSize = iterationsPerThread * threadCount;
200     long[] multiIds = new long[multiIdsSize];
201     Thread[threadCount] threads;
202     auto gen2 = SnowflakeGenerator(345);
203 
204     class GenThread : Thread {
205         private SnowflakeGenerator* gen;
206         private const int startIdx;
207         private const int endIdx;
208         private long[] ids;
209         this(SnowflakeGenerator* gen, int startIdx, int endIdx, ref long[] ids) {
210             super(&run);
211             this.gen = gen;
212             this.startIdx = startIdx;
213             this.endIdx = endIdx;
214             this.ids = ids;
215         }
216         private void run() {
217             foreach (i; startIdx .. endIdx) {
218                 ids[i] = this.gen.nextId();
219             }
220         }
221     }
222 
223     writefln!"Generating %d ids using %d threads"(multiIdsSize, threadCount);
224     foreach (i; 0 .. threadCount) {
225         int start = i * iterationsPerThread;
226         int end = (i + 1) * iterationsPerThread;
227         threads[i] = new GenThread(&gen2, start, end, multiIds);
228         threads[i].start();
229     }
230 
231     foreach (t; threads) t.join();
232     writeln("Finished generating ids.");
233 
234     writeln("Checking for duplicates in ids generated by multiple threads.");
235     foreach (i; 0 .. multiIds.length) {
236         foreach (j; i + 1 .. multiIds.length) {
237             assert(multiIds[i] != multiIds[j], format!"Found duplicate at indexes %d and %d: %d"(i, j, multiIds[j]));
238         }
239     }
240 }
241 
242 // Separate test for performance testing.
243 unittest {
244     import std.stdio;
245     import std.datetime.stopwatch;
246 
247     auto sw = StopWatch(AutoStart.no);
248 
249     void genIdsTest() {
250         SnowflakeGenerator gen = SnowflakeGenerator(123);
251         int iterationCount = 1_000_000;
252         writefln!"Measuring the time taken to generate %d ids..."(iterationCount);
253         sw.start();
254         for (int i = 0; i < iterationCount; i++) {
255             gen.nextId();
256         }
257         sw.stop();
258         long ms = sw.peek().total!"msecs";
259         double nsPerId = sw.peek().total!"nsecs" / cast(double) iterationCount;
260         double idsPerSecond = 1_000_000_000.0 / nsPerId;
261         writefln!"Generated %d ids in %d ms, or %.6f ns per id, or %.0f ids per second."(iterationCount, ms, nsPerId, idsPerSecond);
262         sw.reset();
263     }
264 
265     genIdsTest();
266 }