Writing a lock free MPSC queue based on the previous presented SPSC queues and exploring some of the bottlenecks inherent in the requirements and strategies to clearing them.
2 or more... |
Having just recovered from a long battle with the last of SPSC queue series posts I'm going to keep this short. This time we are looking at a slightly more common pattern, many producing threads put stuff on a queue which funnels into a single consumer thread. Maybe that queue is the only queue allowed to perform certain tasks (like accessing a Selector, or writing to a socket or a file). What could we possibly do for this use case?
Benchmark
We'll be comparing the performance of the different alternatives using this benchmark which basically does the following:
- Allocate a queue of the given size (always 32k in my tests) and type
- Start N producer threads:
- A producer thread offers 10M/N items (yield if offer fails)
- Start time before first item is enqueued
- In main thread consume [10M * N] items from queue, keep last item dequeued
- Stop timing: END
- Overall time is [END - minimum start from producers], print ops/sec and the last dequeued item
- Repeat steps 2 to 5, 14 times, keep ops/sec result in an array
- Print out summary line with average ops/sec for last 7 iterations
Baseline JDK Queues: ArrayBlockingQueue, ConcurrentLinkedQueue and LinkedTransferQueue
These are battle tested long standing favourites and actually support MPMC, so are understandably less specialized. ArrayBlockingQueue is also a blocking implementation, so we can expect it to experience some pain from that. Here's how they do:
We can see that CLQ/LTQ perform best when there's a single producer and quickly degrades to a stable baseline after a few producers join the party. The difference between 1 producer and many is the contention on offering items into the queue. Add to that the fact that the consumer is not suffering from the same amount of contention on it's side and we get a pile up of threads. ABQ gains initially from the addition of threads but then degrades back to initial performance. A full analysis of the behaviour is interesting, but I have no time for that at the moment. Let us compare them to something new instead.
From SPSC to MPSC
There's a few modifications required to make the SPSC implemented in previous posts into an MPSC. Significantly we need to control access to the tail such that publishers don't over write each others elements in the queue and are successful in incrementing the tail in a sequential manner. To do that we will:
- Have to replace the prev. lazy setting of the tail to it's new value (the privilege of single writers) with a CAS loop similar to the one employed in AtomicLong.
- Once we successfully claimed our slot we will write to it. Note that this is changing the order of events to "claim then write" from "write then claim".
- To ensure the orderly write of the element we will use a lazy set into the array
I've dropped the use of a head cache, it's a pain to maintain across multiple threads. Lets start with this and consider reintroducing it later. In theory it should be a help as the queue is mostly empty.
Because of the above reversal of writes we now must contend in the consumer with the possibility of the tail index being visible before the element is:
- NOTE: This code does NOT implement Queue::poll according to the contract specified in the interface. In particular the method may return null when the queue is not empty, but the next element is not yet visible. This is expanded, explored and lamented in the following post: Poll me, maybe?. See the JCTools MpscArrayQueue implementation for Queue interface compliant code.
- Since the tail is no longer the definitive indicator of presence in the queue we can side step the issue by using the FastFlow algorithm for poll which simply tests the head element is not null without inspecting the tail counter. Note that the index is still the definitive indicator of queue emptiness: (tail != head).
- Since the producers are interested in the value of head we must orderly publish it by using lazy set otherwise it will be open to be assigned to a register...
The above code is worth working through and making sure you understand the reasoning behind every line (ask in the comments if it makes no sense). Hopefully I do too ;-). Was it worth it?
We get some improvement beyond CLQ (x2.5 with 1 producer, x1.7-1.3 as more producers are added), and that is maintained as threads are added, but we are basically stuck on the same issue CLQ is as the producers pile up, namely that we are spinning around the CAS. These queue basically scale like AtomicLong increments from multiple threads, this has been discuss previously here, benchmarking a counter is an included sample in JMH so I took the opportunity to checkout the latest and greatest in that awesome project and just tweaked it to my needs. Here is what AtomicLong scales like:
It's even worse than the queues at some point because the producer threads at least have other stuff to do apart from beating each other up. How can we contend with this issue?
It's even worse than the queues at some point because the producer threads at least have other stuff to do apart from beating each other up. How can we contend with this issue?
A CAS spin backoff strategy
If you've used the Disruptor before you will find the idea familiar, instead of naively spinning we will allow some configuration of what to do when we are not getting our way, in the hope that time heals all things. I introduced the strategy in the form of an enum, but it's much of a muchness really:
And this is what the offer looks like with the CAS miss strategy in place:
There's a trade off here to be considered between a producers latency and the overall throughput of the system, I have not had time to experiment to that effect. Here's how the different back off strategies do:
As we can see the Park strategy offers the best throughput and seems to maintain it as the number of threads increase. The end result is a fair consistent 50M ops/sec throughput.
Notes and Reservations
I've not had time yet to explore this queue/problem as much as I would like and as such this post should be seen a rough first stab rather than an accomplished summary. In particular:- I'm unhappy with the benchmark/test harness. The producer threads are bound to start and stop out of sync with each other which will lead to uneven levels of contention on the queue. A better way to measure the throughput would require taking multiple samples during a run and discarding the samples where not all producers are offering.
- Given time I would have liked to test on larger producer counts, at the very least expanding the graphs to 20 or so producers.
- While running the benchmarks I encountered some run to run variation. A larger data set of runs would have been the right way to go.
- An MPSC queue with looser FIFO requirements would allow much greater throughput. The challenges there are avoiding producer starvation and offering a degree of timeliness/fairness.
- In producing the MPSC queue I have taken another dive into the Disruptor code, a comparison between the Disruptor and the MPSC queue might prove interesting.
The throughput "improvements" with the back-off strategies are probably not what you expect. For periods of time you effectively make an uncontended queue while the other threads have disengaged. This can result in massive variations in latency. Imagine 2 threads offering at the same time. One wins and they other is parked for over 50us (not the 1ns as the API call suggests) in the best case, rather than a quick retry on the CAS. This type of microbenchmark is very misleading to real world performance.
ReplyDeleteYou effectively end up exchanging in large batches while other threads get starved out while parked or yielding. A progressive backoff strategy of spinning, then yielding, then parking can be more useful in the real world.
I agree, the yield/park strategies improve throughput by introducing latency. I've not measured the impact on latency and you are correct in pointing out this will induce variations in latency that are not appropriate for certain applications. Still the strategy is configurable per use case and as such we can choose a combined spin/yield/park and fine tune it to our use case. But low latency is not the only use case...
DeleteAs stated in the summary this is far from complete work, more an exploration of available alternatives.
Would I recommend people use the PARK strategy in a time critical program (trading/gaming etc)? no. In fact I would not encourage anyone to use the above code in it's current state. Copy and paste at your own risk children.
Nitsan here is a *crazy* idea.
DeleteIn the backoff strategy would it make sense to cache the "rate of change" from the producer and make educated guesses as to what type of backoff strategy to use ? Sort of a feedback loop system ?
The thing you are trying to measure here is the rate of contention/CAS misses. You can certainly keep a last back-off meter around and use it to make a better choice next time around, but you'd have to think long and hard about that heuristic in the context of your application. This is particularly hard as there is no fairness to the contention, so if I have 4 producers the CAS miss rate should be applied to all, but in reality I'm not sure that would happen, you'd need to guard against that.
DeleteI'll go into some alternatives in my next post :-)
I had very good results with an MPSC queue built on top of per-thread SPSC queues. This of course relaxes the FIFO semantics, but there's no contention between producers (each being a single writer still).
ReplyDeleteThe MPSC queue maintains a list of SPSC queues in a CopyOnWriteArrayList (when a new thread joins/leaves), and the dequeueing operation simply loops through them all, accumulating a larger batch (as the single consumer).
Nitsan, what are your thoughts on using multiple SPSC queues instead (you sacrifice FIFO of course)?
ReplyDeleteI've done that, and it is hitting a stable 40m ops/sec. The implementation is on github next to the other ones. I randomised the consumer choice of queue to read from to avoid starvation but it offers no guarantees of fairness or degree of reordering or anything. I've also been looking at the queue per producer implementation but am not happy with it just yet.
DeleteYeah I was thinking one SPSC queue per producer and the consumer doing a round robin read from each.
DeleteThis implementation definitely has a drawback on the side of latency spikes for starvation of Producers, IMHO for latency sensitive applications doing a round robin read on all producer queues would keep the latency within SLAs.
Delete