1 /** 
2  * This module contains the entire DFlake library, which is mostly centered
3  * around the SnowflakeGenerator.
4  */
5 module dflake;
6 
7 import core.sync.mutex : Mutex;
8 
9 /** 
10  * The Snowflake generator is a thread-safe generator of unique "snowflake"
11  * ids, which guarantees that ids it generates will always be unique and
12  * always increasing in value, provided that there are no two generators with
13  * the same node id at the same time.
14  *
15  * Here's a simple example of how to generate ids:
16  * ```d
17  * auto gen = SnowflakeGenerator(1);
18  * long id = gen.nextId;
19  * ```
20  *
21  * For background info on this type of id, please see [Twitter's original blog
22  * post on the topic](https://blog.twitter.com/engineering/en_us/a/2010/announcing-snowflake).
23  *
24  * Authors: Andrew Lalis, andrewlalisofficial@gmail.com
25  */
26 struct SnowflakeGenerator {
27     // Standard definitions of the structure of a snowflake id.
28     public static const UNUSED_BITS = 1;
29     public static const EPOCH_BITS = 41;
30     public static const NODE_ID_BITS = 10;
31     public static const SEQUENCE_BITS = 12;
32 
33     public static const long MAX_NODE_ID = (1L << NODE_ID_BITS) - 1;
34     public static const long MAX_SEQUENCE = (1L << SEQUENCE_BITS) - 1;
35 
36     // Default epoch in case one isn't given. (January 1, 2015 Midnight UTC = 2015-01-01T00:00:00Z)
37     public static const long DEFAULT_CUSTOM_EPOCH = 1_420_070_400_000L;
38 
39     /** 
40      * The node id for this generator.
41      */
42     private const long nodeId = 0;
43 
44     /** 
45      * The epoch offset to use for this generator, in milliseconds since the
46      * unix epoch.
47      */
48     private const long epoch = DEFAULT_CUSTOM_EPOCH;
49 
50     /** 
51      * Tracks the last timestamp at which an id was issued, in milliseconds
52      * since the epoch.
53      */
54     private long lastTimestamp = -1L;
55 
56     /** 
57      * Tracks the last sequence value we used. If multiple ids are needed
58      * within the span of 1 millisecond, this sequence is incremented for each
59      * id.
60      */
61     private long sequence = 0L;
62 
63     /** 
64      * A mutex to ensure that ids are generated sequentially, even if `nextId`
65      * is called from multiple threads.
66      */
67     private shared Mutex mutex;
68 
69     /** 
70      * Constructs the generator using a given node id and epoch offset.
71      * Params:
72      *   nodeId = The node id.
73      *   epoch = The epoch offset, in milliseconds since 01-01-1970
74      */
75     this(long nodeId, long epoch) {
76         assert(nodeId >= 0 && nodeId < MAX_NODE_ID);
77         assert(epoch >= 0 && epoch < timestampMillis(0));
78         this.nodeId = nodeId;
79         this.epoch = epoch;
80         this.mutex = new shared Mutex();
81     }
82 
83     /** 
84      * Constructs the generator using a default epoch offset.
85      * Params:
86      *   nodeId = The node id.
87      */
88     this(long nodeId) {
89         this(nodeId, DEFAULT_CUSTOM_EPOCH);
90     }
91 
92     /** 
93      * Generates a new unique id. Calls to this method are synchronized such
94      * that ids from this generator are generated sequentially.
95      * Returns: The id.
96      */
97     public long nextId() {
98         this.mutex.lock_nothrow();
99         scope(exit) this.mutex.unlock_nothrow();
100 
101         long currentTimestamp = timestampMillis(this.epoch);
102         assert(currentTimestamp >= this.lastTimestamp);
103         if (currentTimestamp == this.lastTimestamp) {
104             this.sequence = (this.sequence + 1) & MAX_SEQUENCE;
105             if (this.sequence == 0) {
106                 currentTimestamp = waitNextMillis(currentTimestamp);
107             }
108         } else {
109             this.sequence = 0;
110         }
111 
112         this.lastTimestamp = currentTimestamp;
113         long id = currentTimestamp << (NODE_ID_BITS + SEQUENCE_BITS)
114             | (nodeId << SEQUENCE_BITS)
115             | this.sequence;
116         return id;
117     }
118 
119     import std.typecons : Tuple, tuple;
120     alias SnowflakeIdInfo = Tuple!(long, "timestamp", long, "nodeId", long, "sequence");
121 
122     /** 
123      * Parses a snowflake id into its constituent parts.
124      * Params:
125      *   id = The id to parse.
126      * Returns: A tuple containing the timestamp, node id, and sequence value
127      * of the snowflake id.
128      */
129     public SnowflakeIdInfo parseId(long id) {
130         long maskNodeId = ((1L << NODE_ID_BITS) - 1) << SEQUENCE_BITS;
131         long maskSequence = (1L << SEQUENCE_BITS) - 1;
132 
133         long timestamp = (id >> (NODE_ID_BITS + SEQUENCE_BITS)) + this.epoch;
134         long nodeIdLocal = (id & maskNodeId) >> SEQUENCE_BITS;
135         long sequenceLocal = id & maskSequence;
136         return tuple!("timestamp", "nodeId", "sequence")(timestamp, nodeIdLocal, sequenceLocal);
137     }
138 
139     /** 
140      * Gets the current timestamp in milliseconds.
141      * Params:
142      *   offset = The offset to subtract from unix time.
143      * Returns: The milliseconds that have elapsed since the given offset from
144      * unix time.
145      */
146     private static long timestampMillis(long offset) {
147         import std.datetime : Clock, SysTime, DateTime, UTC, Duration;
148         SysTime now = Clock.currStdTime();
149         SysTime unixEpoch = SysTime(DateTime(1970, 1, 1), UTC());
150         Duration diff = now - unixEpoch;
151         return diff.total!"msecs" - offset;
152     }
153 
154     /** 
155      * Performs a busy-wait until a single millisecond has elapsed since the
156      * given timestamp.
157      * Params:
158      *   currentTimestamp = The current timestamp.
159      * Returns: The next timestamp, once we reach it.
160      */
161     private long waitNextMillis(long currentTimestamp) {
162         while (currentTimestamp == lastTimestamp) {
163             currentTimestamp = timestampMillis(this.epoch);
164         }
165         return currentTimestamp;
166     }
167 }
168 
169 // Tests for the correctness of the generator.
170 unittest {
171     // Check that each next id is unique.
172     auto gen = SnowflakeGenerator(123);
173     const iterations = 5000;
174 
175     long[iterations] ids;
176     foreach (i; 0 .. iterations) {
177         ids[i] = gen.nextId;
178     }
179 
180     foreach (i; 0 .. ids.length) {
181         foreach (j; i + 1 .. ids.length) {
182             assert(ids[i] != ids[j]);
183         }
184     }
185 
186     // Check that ids generated from separate threads are unique.
187     import core.thread : Thread;
188     import std.format;
189     import std.stdio;
190 
191     const iterationsPerThread = 1000;
192     const threadCount = 50;
193     const multiIdsSize = iterationsPerThread * threadCount;
194     long[] multiIds = new long[multiIdsSize];
195     Thread[threadCount] threads;
196     auto gen2 = SnowflakeGenerator(345);
197 
198     class GenThread : Thread {
199         private SnowflakeGenerator* gen;
200         private const int startIdx;
201         private const int endIdx;
202         private long[] ids;
203         this(SnowflakeGenerator* gen, int startIdx, int endIdx, ref long[] ids) {
204             super(&run);
205             this.gen = gen;
206             this.startIdx = startIdx;
207             this.endIdx = endIdx;
208             this.ids = ids;
209         }
210         private void run() {
211             foreach (i; startIdx .. endIdx) {
212                 ids[i] = this.gen.nextId();
213             }
214         }
215     }
216 
217     writefln!"Generating %d ids using %d threads"(multiIdsSize, threadCount);
218     foreach (i; 0 .. threadCount) {
219         int start = i * iterationsPerThread;
220         int end = (i + 1) * iterationsPerThread;
221         threads[i] = new GenThread(&gen2, start, end, multiIds);
222         threads[i].start();
223     }
224 
225     foreach (t; threads) t.join();
226     writeln("Finished generating ids.");
227 
228     writeln("Checking for duplicates in ids generated by multiple threads.");
229     foreach (i; 0 .. multiIds.length) {
230         foreach (j; i + 1 .. multiIds.length) {
231             assert(multiIds[i] != multiIds[j], format!"Found duplicate at indexes %d and %d: %d"(i, j, multiIds[j]));
232         }
233     }
234 }
235 
236 // Separate test for performance testing.
237 unittest {
238     import std.stdio;
239     import std.datetime.stopwatch;
240 
241     auto sw = StopWatch(AutoStart.no);
242 
243     void genIdsTest() {
244         SnowflakeGenerator gen = SnowflakeGenerator(123);
245         int iterationCount = 1_000_000;
246         writefln!"Measuring the time taken to generate %d ids..."(iterationCount);
247         sw.start();
248         for (int i = 0; i < iterationCount; i++) {
249             gen.nextId();
250         }
251         sw.stop();
252         long ms = sw.peek().total!"msecs";
253         double nsPerId = sw.peek().total!"nsecs" / cast(double) iterationCount;
254         double idsPerSecond = 1_000_000_000.0 / nsPerId;
255         writefln!"Generated %d ids in %d ms, or %.6f ns per id, or %.0f ids per second."(iterationCount, ms, nsPerId, idsPerSecond);
256         sw.reset();
257     }
258 
259     genIdsTest();
260 }