Writing a lock free MPSC queue based on the previous presented SPSC queues and exploring some of the bottlenecks inherent in the requirements and strategies to clearing them.
2 or more... |
Having just recovered from a long battle with the last of SPSC queue series posts I'm going to keep this short. This time we are looking at a slightly more common pattern, many producing threads put stuff on a queue which funnels into a single consumer thread. Maybe that queue is the only queue allowed to perform certain tasks (like accessing a Selector, or writing to a socket or a file). What could we possibly do for this use case?
Benchmark
We'll be comparing the performance of the different alternatives using this benchmark which basically does the following:
- Allocate a queue of the given size (always 32k in my tests) and type
- Start N producer threads:
- A producer thread offers 10M/N items (yield if offer fails)
- Start time before first item is enqueued
- In main thread consume [10M * N] items from queue, keep last item dequeued
- Stop timing: END
- Overall time is [END - minimum start from producers], print ops/sec and the last dequeued item
- Repeat steps 2 to 5, 14 times, keep ops/sec result in an array
- Print out summary line with average ops/sec for last 7 iterations
Baseline JDK Queues: ArrayBlockingQueue, ConcurrentLinkedQueue and LinkedTransferQueue
These are battle tested long standing favourites and actually support MPMC, so are understandably less specialized. ArrayBlockingQueue is also a blocking implementation, so we can expect it to experience some pain from that. Here's how they do:
We can see that CLQ/LTQ perform best when there's a single producer and quickly degrades to a stable baseline after a few producers join the party. The difference between 1 producer and many is the contention on offering items into the queue. Add to that the fact that the consumer is not suffering from the same amount of contention on it's side and we get a pile up of threads. ABQ gains initially from the addition of threads but then degrades back to initial performance. A full analysis of the behaviour is interesting, but I have no time for that at the moment. Let us compare them to something new instead.
From SPSC to MPSC
There's a few modifications required to make the SPSC implemented in previous posts into an MPSC. Significantly we need to control access to the tail such that publishers don't over write each others elements in the queue and are successful in incrementing the tail in a sequential manner. To do that we will:
- Have to replace the prev. lazy setting of the tail to it's new value (the privilege of single writers) with a CAS loop similar to the one employed in AtomicLong.
- Once we successfully claimed our slot we will write to it. Note that this is changing the order of events to "claim then write" from "write then claim".
- To ensure the orderly write of the element we will use a lazy set into the array
I've dropped the use of a head cache, it's a pain to maintain across multiple threads. Lets start with this and consider reintroducing it later. In theory it should be a help as the queue is mostly empty.
Because of the above reversal of writes we now must contend in the consumer with the possibility of the tail index being visible before the element is:
- NOTE: This code does NOT implement Queue::poll according to the contract specified in the interface. In particular the method may return null when the queue is not empty, but the next element is not yet visible. This is expanded, explored and lamented in the following post: Poll me, maybe?. See the JCTools MpscArrayQueue implementation for Queue interface compliant code.
- Since the tail is no longer the definitive indicator of presence in the queue we can side step the issue by using the FastFlow algorithm for poll which simply tests the head element is not null without inspecting the tail counter. Note that the index is still the definitive indicator of queue emptiness: (tail != head).
- Since the producers are interested in the value of head we must orderly publish it by using lazy set otherwise it will be open to be assigned to a register...
The above code is worth working through and making sure you understand the reasoning behind every line (ask in the comments if it makes no sense). Hopefully I do too ;-). Was it worth it?
We get some improvement beyond CLQ (x2.5 with 1 producer, x1.7-1.3 as more producers are added), and that is maintained as threads are added, but we are basically stuck on the same issue CLQ is as the producers pile up, namely that we are spinning around the CAS. These queue basically scale like AtomicLong increments from multiple threads, this has been discuss previously here, benchmarking a counter is an included sample in JMH so I took the opportunity to checkout the latest and greatest in that awesome project and just tweaked it to my needs. Here is what AtomicLong scales like:
It's even worse than the queues at some point because the producer threads at least have other stuff to do apart from beating each other up. How can we contend with this issue?
It's even worse than the queues at some point because the producer threads at least have other stuff to do apart from beating each other up. How can we contend with this issue?
A CAS spin backoff strategy
If you've used the Disruptor before you will find the idea familiar, instead of naively spinning we will allow some configuration of what to do when we are not getting our way, in the hope that time heals all things. I introduced the strategy in the form of an enum, but it's much of a muchness really:
And this is what the offer looks like with the CAS miss strategy in place:
There's a trade off here to be considered between a producers latency and the overall throughput of the system, I have not had time to experiment to that effect. Here's how the different back off strategies do:
As we can see the Park strategy offers the best throughput and seems to maintain it as the number of threads increase. The end result is a fair consistent 50M ops/sec throughput.
Notes and Reservations
I've not had time yet to explore this queue/problem as much as I would like and as such this post should be seen a rough first stab rather than an accomplished summary. In particular:- I'm unhappy with the benchmark/test harness. The producer threads are bound to start and stop out of sync with each other which will lead to uneven levels of contention on the queue. A better way to measure the throughput would require taking multiple samples during a run and discarding the samples where not all producers are offering.
- Given time I would have liked to test on larger producer counts, at the very least expanding the graphs to 20 or so producers.
- While running the benchmarks I encountered some run to run variation. A larger data set of runs would have been the right way to go.
- An MPSC queue with looser FIFO requirements would allow much greater throughput. The challenges there are avoiding producer starvation and offering a degree of timeliness/fairness.
- In producing the MPSC queue I have taken another dive into the Disruptor code, a comparison between the Disruptor and the MPSC queue might prove interesting.