Anyways... that's the choice for now, lets jump into some benchmarks!
Using the JMH Control
To benchmark the latency of 2 threads communicating via a queue requires using the JMH Control structure. This is demonstrated in the JMH samples measuring thread to thread "ping pong" via an AtomicBoolean.compareAndSwap (see original here):
The challenge here is that the CAS operation may fail, and when it does we would want to retry until we succeed, BUT if the opposite thread were to be stopped/suspended because the benchmark iteration is done we would be stuck in an infinite loop. To avoid this issue a benchmarked method can have a Control parameter passed in which exposes a flag indicating the end of a run, this flag can be used to determine benchmark termination and thus avoid the above issue. You'll notice other JMH annotations are in use above:
- State: sets the scope of the benchmark state, in this case the Group
- Group: multi-threaded benchmarks can utilize thread groups of different sizes to exercise concurrent data structure with different operations. In the case above we see one group calls pingpong. Both methods are exercised by threads from the group, using the default of 1 thread per operation.
- Note that if we were to run the above benchmark with more threads, we will be running several independent groups. Running with 10 threads will result in 5 concurrent benchmark runs each with it's own group of 2 threads.
Benchmark Mode Thr Count Sec Mean Mean error Units
i.j.s.l.BaselinePingPong.pingpong avgt 2 10 1 23.578 0.198 ns/op
i.j.s.l.BaselinePingPong.pingpong:ping avgt 2 10 1 23.578 0.198 ns/op
i.j.s.l.BaselinePingPong.pingpong:pong avgt 2 10 1 23.578 0.198 ns/op
And running across different cores:
Benchmark Mode Thr Count Sec Mean Mean error Units
i.j.s.l.BaselinePingPong.pingpong avgt 2 10 1 60.508 0.217 ns/op
i.j.s.l.BaselinePingPong.pingpong:ping avgt 2 10 1 60.496 0.195 ns/op
i.j.s.l.BaselinePingPong.pingpong:pong avgt 2 10 1 60.520 0.248 ns/op
Note from a hardware point of view: See this answer on Intel forums on cache latencies. Latencies stand at roughly:
"L1 CACHE hit, ~4 cyclesTo convert cycles to time you need to divide by your CPU clock speed, the delay manifestation will also be dependant on whether or not you require the data to make progress etc. The thing to keep in mind here is that there is a physical limit to the achievable latency. If you think you're going faster... check again ;-).
L2 CACHE hit, ~10 cycles
L3 CACHE hit, line unshared ~40 cycles
L3 CACHE hit, shared line in another core ~65 cycles
L3 CACHE hit, modified in another core ~75 cycles
remote L3 CACHE ~100-300 cycles
Local Dram ~60 ns
Remote Dram ~100 ns"
Queue round trip latency benchmark
- From thread send K objects down queue.
- Concurrently: In each thread K(0<K<N) read from queue[K-1] and write to queue[K].
- Thread waits for K objects to be polled from queue[N-1] and returns.
- The number of benchmarking threads is unknown at runtime (in JMH), so chain length must be defined separately.
- Setup/Teardown methods on the Iteration level are called per thread.
- Thread state is allocated and used per thread per iteration. The threads are pooled, so a given thread can switch 'roles' on each iteration. In other words @State(Scope.Thread) is not ThreadLocal.
- Concurrent polls to SPSC queues can leave the queue in an undefined state.
- The benchmark requires that the queues are empty when we start measuring, so at the beginning or end of an iteration we must clear the queues.
- Queues must be cleared from the consumer thread.
I'm relying on correct usage for some things, but JMH is basically doing most of the heavy lifting:
- JMH is taking care of all the measurements/measurement modes/reporting/formatting including forked runs and stats summaries and all.
- JMH is taking care of thread management, no need to spin up threads/executors and shut them down.
- JMH is synchronizing iterations and managing per thread state setup/teardown. Note the @TearDown annotation in Link/Source.
- JMH is allowing the group thread balance to be parametrized (just recently) which allows generic asymmetric multi-threaded benchmarks like the above.
- I would like to only specify the chain.length and derive the thread allocation from that. This is possible via the JMH launcher API.
- I would like @State(Scope.Thread) to mean ThreadLocal.
- I would like to be able to pass State down the stack as constructor parameters, so I could avoid the static field for sharing benchmark state between thread state objects.
I can't stress this point enough, the objective here is more than collecting data, it is about enabling, supporting and empowering peer review. I want the benchmarks to be as simple as they can be so people can either be easily convinced or easily point out my errors. The less code I need my peers to read, the easier it is for anyone to build/run/tweak the benchmarks, the less we rely on handrolled/halfassed attempts at benchmark frameworks THE BETTER (having said that, you got to do what you got to do).
Note: If you care about latency you should watch Gil Tene's excellent talk on How Not To Measure Latency, where he discusses coordinated omission and it's impact on latency measurement. As you may have noticed this experiment suffers from coordinated omission. I'll have to live with that for now, but I don't feel the data gained from a corrected measurement would impact the queue implementations in any form.
Hard Numbers? Numbers Hard?We've got the following queues:
- ConcurrentLinkedQueue (CLQ) - Good old JDK lock free queue, for reference.
- InlinedCountersSpscConcurrentArrayQueue (CAQ) - My inlined counter variation on Martin Thompson's ConcurrentArrayQueue previously discussed/explained/explored here and compared to this variation here. It can be run with sparse data as described here.
- FFBuffer (FF1) - A Java port of the FastFlow SPSC implementation with sparse data as described here.
- BQueue (BQ)- An SPSC previously discussed here which introduces an offer side batch of the null check (which also serves as a full queue induced false sharing protection) and a poll side batching of emptiness testing which also includes a spinning backoff to avoid the near empty queue issue.
- FFBufferWithOfferBatch (FF2) - A BQueue on the offer side and an FFBuffer on the poll side, this is a new addition to the arsenal.
- AverageTime - Average time per operation (from the command line use '-bm avgt')
- SampleTime - Time distribution, percentile estimation (from the command line use '-bm sample')
I'll start with the average RTT latency for each type, running same core and cross core:
The benchmarks were run on my laptop (JDK7u45/Ubuntu13.10/i7-4700MQ@2.40GHz no turbo boost/benchmark process pinned using taskset) and though I ran them repeatedly I didn't systematically collect run to run variance data, so consider the results subject to some potential run to run variance.
As we can see:
- FF2/FF1 are very similar and take the cake at 60/230ns (FF2 is marginally faster).
- BQ is the worst [392/936ns] because of its backoff on approaching near empty strategy.
- It is perhaps surprising to see that although CAQ/FF1/BQ show very similar throughput in my previous tests their latency characteristics are so distinct.
- Note that CLQ is actually not too bad latency wise (in this particular use case).
- CLQ makes no effort to amortize costs as its RTT grows with the burst size.
- BQ starts off as the worst, but somewhere between burst size 16 and 32 the benefits of it's design start to show.
- FF2 and CAQ demonstrate similar growth patterns with the cost per item dropping as the burst size grows. But FF2 remains the clear winner, at burst size 32 the round trip is 640ns.
- 640ns for 32 messages averages at just 20ns per message. This is NOT A MEANINGFUL LATENCY NUMBER! :)
Latency != Time/ThroughputLet me elaborate on this last point. The average cost is not a good representative of the latency. Why not? latency in this case is the round trip time from each message perspective, throughput in a concurrent environment is not an indicator of latency because it assumes a uniformity that is just not there in real life. We know the behaviour for the first message is at best the same, so assuming first message latency is 230ns, how would the other messages timeline need to look to make the mean latency 20ns? is that reasonable? What is really happening here is as follows (my understanding in any case):
- Producer pretty much writes all the messages into the queue in one go within a few nano-seconds of first and last message being written. So if we mark T1..T32 the send time of the messages, we can estimate T32 - T1 is within 100ns - 150ns (single threaded read/write to FF2 takes roughly 6ns, assume an even split between write and read and add some margin). Near empty queue contention can add to this cost, or any direct contention between poll/offer.
- In fact, assuming a baseline cost of 6ns read/write, multiplied by chain length, leads to a base cost of 32*6*2=384ns. Following from this is that the minimum latency experienced by any message can never be less than 32*3=96ns as the burst must be sent before messages are received. 96ns is assuming zero cost for the link, this is how fast this would go offering and polling from the same queue in the same thread.
- With any luck, and depending on the size of the burst, the consumer will see all the messages written by the time it detects data in the queue (i.e. it won't contend with the producer). There is opportunity here for reduced cost which is used by FF2/CAQ and BQ.
- The consumer now reads from the queue, the first message is detected like it would for burst size 1, i.e after 115ns or so for FF2. This can only get worse with the contention of potentially other messages being written as the first message is read adding further latency. The rest of the messages are then read and there is again an opportunity to lower costs. The messages are written into the outbound queue as soon as they are read. Reading and writing each message has a cost, which we can assume is at the very least as high as 6ns.
- The producer now detects the messages coming back. Let's mark the return times for the messages R1-R32. We know T1 - R1 is at least 230ns and we know that T1 - R32 is roughly 640ns. Assuming some fixed costs for writing/reading from the queue, we can estimate that R32-R1 is also within 100-150ns.
- So, working through the timeline as estimated above gives us the following best case scenario for message 32:
- 3ns - Source sends message 1
- 96ns - Source sends message 32
- 115ns - Link picks up message number 1, has to work through messages 1-31
- 230ns - Source picks up message 1
- 301ns - Link is done transferring 31 messages and is finally picking up message 32
- 307ns - Link has sent message 32
- 346ns - Source has finished processing message 32
- In this hypothetical best case scenario, message 32 which had the best chance of improved latency enjoyed a 150ns RTT. This is the theoretical best case ever. IT DIDN'T HAPPEN. The RTT we saw for a 32 messages burst was 640ns, indicating some contention and an imperfect world. A world in which there's every chance message 32 didn't have a latency as good as message 1.
- The fallacy of the average latency is that it assumes a uniform distribution of the send and receive times, but the reality is that the groups are probably lumped together on a timeline leading to many send times being within a few ns of each other, and many receive times being within a few ns of each other. This is the magic of buffering/queuing.
- The latency experienced by the last element in a burst can be better than the latency experienced by the first, but not significantly so and subject to some very delicate timing. The important thing here is that good queues allow the producer to make progress as the consumer idles or is busy consuming previous items, thus allowing us to capitalize on concurrent processing.
To Be Continued...
There's allot of experimentation and data to be had here, so in the interest of making progress with the project as well as sharing the results I'll cut this analysis short and continue some other time. To be explored in further posts:
- Compare sparse data impact on CAQ/FF2 for the bursty use case - I have done some experimentation which suggests FF2 gains little from the sparse data setting, while for CAQ the improvements are noticeable for certain burst sizes.
- Compare average time results with the sampling mode results - There are significant differences in the results, which I'm still trying to explain. One would expect the sampling mode to have some fixed overhead of having to call System.nanoTime() twice for each sample, but the difference I see is an x2 increase in the mean.
- Compare queue behaviour for different chain lengths - The results are unstable at the moment because the thread layout is not fixed. For chain length 2-4 the topography can be made static, but from chain size 5 and up the number of cross core crossings can change in mid run (on my hyper threaded quad core anyways), leading to great variance in results (which is legitimate, but annoying). I will need to pin threads and thread 'roles' to stabilize the benchmark for this use case, probably using JTA (Java Thread Affinity - by Sir Lawrey).
Thanks to Darach, Norman, and Peter for reviewing the post, and Aleksey for reviewing the benchmark. Peer review FTW!