Monday, 21 October 2013

A Lock Free Multi Producer Single Consumer Queue - Round 1

{This post is part of a long running series on lock free queues, checkout the full index to get more context here}
Writing a lock free MPSC queue based on the previous presented SPSC queues and exploring some of the bottlenecks inherent in the requirements and strategies to clearing them. 
2 or more...

Having just recovered from a long battle with the last of SPSC queue series posts I'm going to keep this short. This time we are looking at a slightly more common pattern, many producing threads put stuff on a queue which funnels into a single consumer thread. Maybe that queue is the only queue allowed to perform certain tasks (like accessing a Selector, or writing to a socket or a file). What could we possibly do for this use case?

Benchmark

We'll be comparing the performance of the different alternatives using this benchmark which basically does the following:
  1. Allocate a queue of the given size (always 32k in my tests) and type
  2. Start N producer threads:
    1. A producer thread offers 10M/N items (yield if offer fails)
    2. Start time before first item is enqueued
  3. In main thread consume [10M * N] items from queue, keep last item dequeued
  4. Stop timing: END
  5. Overall time is [END - minimum start from producers], print ops/sec and the last dequeued item
  6. Repeat steps 2 to 5, 14 times, keep ops/sec result in an array
  7. Print out summary line with average ops/sec for last 7 iterations
Very similar to previous benchmark for SPSC. The producers all wait for the consumer to give the go ahead, so contention is at it's worse (insert evil laughter, howling dogs). I ran the tests a fair few  times to expose run to run variance on a test machine (Ubuntu13.04/JDK7u40/i7-4700MQ) with 1 to 6 producer threads. To make the graphs a little easier to grok I took a representative run and for now will leave run to run variance to be discussed separately.


Baseline JDK Queues: ArrayBlockingQueue, ConcurrentLinkedQueue and LinkedTransferQueue

These are battle tested long standing favourites and actually support MPMC, so are understandably less specialized. ArrayBlockingQueue is also a blocking implementation, so we can expect it to experience some pain from that. Here's how they do:

We can see that CLQ/LTQ perform best when there's a single producer and quickly degrades to a stable baseline after a few producers join the party. The difference between 1 producer and many is the contention on offering items into the queue. Add to that the fact that the consumer is not suffering from the same amount of contention on it's side and we get a pile up of threads. ABQ gains initially from the addition of threads but then degrades back to initial performance. A full analysis of the behaviour is interesting, but I have no time for that at the moment. Let us compare them to something new instead.

From SPSC to MPSC

There's a few modifications required to make the SPSC implemented in previous posts into an MPSC. Significantly we need to control access to the tail such that publishers don't over write each others elements in the queue and are successful in incrementing the tail in a sequential manner. To do that we will:
  1. Have to replace the prev. lazy setting of the tail to it's new value (the privilege of single writers) with a CAS loop similar to the one employed in AtomicLong. 
  2. Once we successfully claimed our slot we will write to it. Note that this is changing the order of events to "claim then write" from "write then claim". 
  3. To ensure the orderly write of the element we will use a lazy set into the array
Here's the code:

I've dropped the use of a head cache, it's a pain to maintain across multiple threads. Lets start with this and consider reintroducing it later. In theory it should be a help as the queue is mostly empty.
Because of the above reversal of writes we now must contend in the consumer with the possibility of the tail index being visible before the element is:
  1. NOTE: This code does NOT implement Queue::poll according to the contract specified in the interface. In particular the method may return null when the queue is not empty, but the next element is not yet visible. This is expanded, explored and lamented in the following post: Poll me, maybe?. See the JCTools MpscArrayQueue implementation for Queue interface compliant code.
  2. Since the tail is no longer the definitive indicator of presence in the queue we can side step the issue by using the FastFlow algorithm for poll which simply tests the head element is not null without inspecting the tail counter. Note that the index is still the definitive indicator of queue emptiness: (tail != head).
  3. Since the producers are interested in the value of head we must orderly publish it by using lazy set otherwise it will be open to be assigned to a register...

The above code is worth working through and making sure you understand the reasoning behind every line (ask in the comments if it makes no sense). Hopefully I do too ;-). Was it worth it?

We get some improvement beyond CLQ (x2.5 with 1 producer, x1.7-1.3 as more producers are added), and that is maintained as threads are added, but we are basically stuck on the same issue CLQ is as the producers pile up, namely that we are spinning around the CAS. These queue basically scale like AtomicLong increments from multiple threads, this has been discuss previously here, benchmarking a counter is an included sample in JMH so I took the opportunity to checkout the latest and greatest in that awesome project and just tweaked it to my needs. Here is what AtomicLong scales like:
It's even worse than the queues at some point because the producer threads at least have other stuff to do apart from beating each other up. How can we contend with this issue?

A CAS spin backoff strategy

If you've used the Disruptor before you will find the idea familiar, instead of naively spinning we will allow some configuration of what to do when we are not getting our way, in the hope that time heals all things. I introduced the strategy in the form of an enum, but it's much of a muchness really:

And this is what the offer looks like with the CAS miss strategy in place:

There's a trade off here to be considered between a producers latency and the overall throughput of the system, I have not had time to experiment to that effect. Here's how the different back off strategies do:
As we can see the Park strategy offers the best throughput and seems to maintain it as the number of threads increase. The end result is a fair consistent 50M ops/sec throughput.

Notes and Reservations

I've not had time yet to explore this queue/problem as much as I would like and as such this post should be seen a rough first stab rather than an accomplished summary. In particular:
  1. I'm unhappy with the benchmark/test harness. The producer threads are bound to start and stop out of sync with each other which will lead to uneven levels of contention on the queue. A better way to measure the throughput would require taking multiple samples during a run and discarding the samples where not all producers are offering.
  2. Given time I would have liked to test on larger producer counts, at the very least expanding the graphs to 20 or so producers.
  3. While running the benchmarks I encountered some run to run variation. A larger data set of runs would have been the right way to go.
That said I think the result is interesting and I plan to explore further variations on this theme. In particular:
  1. An MPSC queue with looser FIFO requirements would allow much greater throughput. The challenges there are avoiding producer starvation and offering a degree of timeliness/fairness.
  2. In producing the MPSC queue I have taken another dive into the Disruptor code, a comparison between the Disruptor and the MPSC queue might prove interesting.
Finally, I was intending to port some of these rough experiments into an open source collection of queues for general use (some readers expressed interest). This will happen in the near future, time and space permitting.


Monday, 7 October 2013

SPSC revisited part III - FastFlow + Sparse Data


{This post is part of a long running series on lock free queues, checkout the full index to get more context here}
After I published the last post on this topic (if you want to catchup on the whole series see references below) I got an interesting comment from Pressenna Sockalingasamy who is busy porting the FastFlow library to Java. He shared his current implementation for porting the FastFlow SPSC queue (called FFBuffer) and pointed me at a great paper on it's implementation. The results Pressenna was quoting were very impressive, but when I had a close look at the code I found what I believed to be a concurrency issue, namely there were no memory barriers employed in the Java port. Our exchange is in the comments of the previous post. My thoughts on the significance of memory barriers, and why you can't just not put them in are recorded here.
Apart from omitting the memory barriers the FFBuffer offered two very interesting optimizations:
  1. Simplified offer/poll logic by using the data in the queue as an indicator. This removes the need for tracking the relative head/tail position and using those to detect full/empty conditions.
  2. Sparse use of the queue to reduce contention when the queue is near full or near empty (see musings below). This bloats the queue, but guarantees there are less elements per cache line (this one is an improvement put in by Pressenna himself and not part of the original)
So after getting off my "Memory Barriers are important" high horse, I stopped yapping and got to work seeing what I can squeeze from these new found toys.

FFBuffer - Simplified Logic

The simplified logic is really quite compelling. If you take the time to read the paper, it describes the queue with offer/poll loop I've been using until now as Lamport's circular buffer (the original paper by Lamport is here, not an easy read), full code is here, but the gist is:

I've been using a modified version which I got from a set of examples put forth by Martin Thompson as part of his lock free algos presentation and evolved further as previously discussed(padding is omitted for brevity):

With the FFBuffer variant Pressenna sent my way we no longer need to do all that accounting around the wrap point (padding is omitted for brevity):

The new logic is simpler, has less moving parts (no head/tail caching) and in theory looks like cache coherency traffic should be reduced by only bringing in the array elements which are required in any case.

The Near Empty/Full Queue Problem

At some point while testing the different queues I figured the test harness itself may be to blame for the run to run variance I was seeing. The fun and games I had are a story to be told some other time, but one of the thing that emerged from these variations on the test harness was the conclusion that testing a near empty queue produces very different results. By reversing the thread relationship (Main is producer, other thread is consumer, code here) in the test harness I could see great performance gains, the difference being that the producer was getting a head start in the race, creating more slack to work with. This was happening because the consumer and the producer are experiencing contention and false sharing when the queue has 1 to 16 elements in it. The fact that the consumer is reading a cache row that is being changed is bad enough, but it is also changing the same line by setting the old element to null. The nulling out of elements as they are read is required to avoid memory leaks. Also note that when the consumer/producer run out of space to read/write as indicated by their cached view of the tail/head they will most likely experience a read miss with the implications discussed previously here.
Several solutions present themselves:
  • Batch publication: This is a problem for latency sensitive applications and would require the introduction of some timely flush mechanism to the producer. This is not an issue for throughput focused applications. Implementing this solution will involve the introduction of some batch array on the producer side which is 'flushed' into the queue when full. If the batch array is longer than a cache line the sharing issue is resolved. An alternative solution is to maintain a constant distance between producer and consumer by changing the expectation on the consumer side to stop reading at a given distance.
  • Consumer batch nulling: Delay the nulling out of read elements, this would reduce the contention but could also result in a memory leak if some elements are never nulled out. This also means the near empty queue is still subject to a high level of coherency traffic as the line mutates under the feet of the reader.
  • Padding the used elements: We've taken this approach thus far to avoid contention around fields, we can apply the same within the data buffer itself. This will require us to use only one in N cells in the array. We will be trading space for contention, again. This is the approach taken above and it proves to work well.

Good fences make good neighbours

As mentioned earlier, the code above has no memory barriers, which made it very fast but also incorrect in a couple of rather important ways. Having no memory barriers in place means this queue can:
  • Publish half constructed objects, which is generally not what people expect from a queue. In a way it means that notionally stuff can appear on the other side of the queue before you sent it (because the code is free to be reordered by the compiler). 
  • The code can be reordered such that the consumer ends up in an infinite busy loop (the read value from the array can be hoisted out of the reading loop).
I introduced the memory barriers by making the array read volatile and the array write lazy. Note that in the C variation only a Write Memory Barrier is used, I have implemented variations to that effect but while testing those Pressenna informed me they hang when reading/writing in a busy loop instead of yielding (more on this later). Here's the final version (padding is omitted for brevity):

There are a few subtleties explored here:
  • I'm using Unsafe for array access, this is nothing new and is a cut and paste job from the AtomicReferenceArray JDK class. This means I've opted out of the array bound checks we get when we use arrays in Java, but in this case it's fine since the ring buffer wrapping logic already assures correctness there. This is necessary in order to gain access to getVolatile/putOrdered.
  • I switched Pressanas original field padding method with mine, mostly for consistency but also it's a more stable method of padding fields (read more on memory layout here).
  • I doubled the padding to protect against pre-fetch caused false sharing (padding is omitted above, but have a look at the code).
  • I replaced the POW final field with a constant (ELEMENT_SHIFT). This proved surprisingly significant in this case. Final fields are not optimized as aggressively as constants, partly due to the exploited backdoor in Java allowing the modification of final fields (here's Cliff Click's rant on the topic). ELEMENT_SHIFT is the shift for the sparse data and the shift for elements in the array (inferred from Unsafe.arrayIndexScale(Object[].class)) combined.
How well did this work? Here is a chart comparing it to the the latest queue implementation I had (Y5 with/Y4 without Unsafe array access), with sparse data shift set to 0 and 2 (benchmarked on Ubuntu13.04/JDK7u40/i7-4700MQ@2.40GHz using JVM params: -XX:+UseCondCardMark -XX:CompileThreshold=100000 and pinned to run across cores, results from 30 runs are sorted to make the the charts more readable):
Y=Ops per second, X=benchmark run
Shazzam! This is a damn fine improvement note that the algorithm switch is far less significant than the use of sparse data.
To further examine the source of the gain I benchmarked a variant with the element shift parametrized and a further variant with single padding (same machine and settings as above, sparse shift is 2 for all):
Y=Ops per second, X=benchmark run
Single padding seems to make little difference but parametrization is quite significant.

Applying lessons learnt

Along the way, as I was refining my understanding of the FFBuffer optimizations made above I applied the same to the original implementation I had. Here are the incremental steps:

  1. Y4 - Starting point shown above.
  2. Y5 - Use Unsafe to access array (regression).
  3. Y6 - Use sparse elements in the buffer.
  4. Y7 - Double pad the fields/class/array.
  5. Y8 - Put head/tailCache on same line, put tail/headCache on another line.
  6. Y81 - From Y8, Parametrize sparse shift instead of constant (regression).
  7. Y82 - From Y8, Revert to using array instead of Unsafe access (regression).
  8. Y83 - From Y8, Revert double padding of fields/class/array (regression).

Lets start with the journey from Y4 to Y8:
Y=Ops per second, X=benchmark run
Note the end result is slightly better than the FF one, though slightly less stable. Here is a comparison between Y8 and it's regressions Y81 to Y83:
Y=Ops per second, X=benchmark run
So... what does that tell us?

  • The initial switch to using Unsafe for array access was a regression as a first step (Y5), but proves a major boost in the final version. This is the Y82 line. The step change in its performance I think is down to the JIT compiler lucking out on array boundary check elimination, but I didn't check the assembly to verify.
  • Parametrizing is an issue here as well, more so than for the FFBuffer implementation for some reason.
  • I expected Y83 (single padding) to be the same or worse than Y6. The shape is similar but it is in fact much better. I'm not sure why...
  • There was a significant hiccup in the field layout of Y4/5/6 resulting in the bump we see in going from Y6 to Y7 and the difference between Y83 and Y6. As of now I'm not entirely certain what is in play here. It's clear that pre-fetch is part of it, but I still feel I'm missing something.
So certainly not all is clear in this race to the top but the end result is a shiny one, let's take a closer look at the differences between Y8 and FFO3. Here's a comparison of these 2 with sparse data set to 0,1,2,3,4 running cross cores:
From these results it seems that apart from peaking for the s=2 value, Y8 is generally slower than FFO3. FFO3 seems to improve up to s=3, then drop again for s=4. To me this suggests FFO3 is less sensitive to the effect of the near empty queue as an algorithm.
Next I compared the relative performance of the 2 queue when running on the same core, this time only s=0, 2 where considered:
This demonstrates that FFO3 is superior when threads are running on the same core and still benefits from sparse data when running in that mode. Y8 is less successful and seem to actually degrade in this scenario when sparse data is used. Y8 offers a further optimization direction I have yet to fully explore in the form of batching consumer reads, I will get back to it in a further post.

On Testing And A Word Of Warning

I've been toying around with several test harnesses which are all on GitHub for you to enjoy. The scripts folder also allows running the tests in different modes (you may want to adjust the taskset values to match your architecture). Please have a play. Mostly the harnesses differ in the way they measure the scope of execution and which thread (producer/consumer) acts as the trigger for starting the test and so on. Importantly you will find there is the BusyQueuePerfTest which removes the Thread.yield() call when an unsuccessful offer/poll happens.
During the long period in which this post has been in the making (I was busy, wrote other posts... I have a day job... it took forever) Pressana has not been idle and got back to me with news that all the implementations which utilize Unsafe to access the array (Y5/SPSCQueue5 and later,  most of the FFBuffer implementations) were in fact broken when running the busy spin test and would regularly hang indefinitely. I tested locally and found I could not reproduce the issue at all for the SPSCQueue variations and could only reproduce it for the FFBuffer implementations which used no fences (the original) or only the STORE/STORE barrier (FFO1/2). Between us we have managed to verify that this is not an issue for JDK7u40 which had a great number of concurrency related bug fixes in it, but is an issue for previous versions! I strongly suggest you keep this in mind and test thoroughly if you intend to use this code in production.


Summary

This post has been written over a long period, please forgive any inconsistencies in style. It covers a long period of experimentation and is perhaps not detailed enough... please ask for clarifications in comments and I will do my best to  answer. Ultimately, the source is there for you to examine and play with and has the steps broken down such that a diff between step should be helpful.
This is also now part 5 in a series of long running posts, so if you missed any or forgotten it all by now here they are:
  1. Single Producer/Consumer lock free Queue step by step: This is covering the initial move from tradition JDK queues to the lock free variation used as a basis for most of the other posts
  2. 135 Million messages a second between processes in pure Java: Taking the queue presented in the first post and taking it offheap and demonstrating its use as an IPC transport via a memory mapped file
  3. Single Producer Single Consumer Queue Revisited: An empiricist tale - part I: Looking at run to run variation in performance in the original implementation and trying to cure it by inlining the counter fields into the queue class
  4. SPSC Revisited - part II: Floating vs Inlined Counters: Here I tried to access to impact of inlining beyond fixing up the initial implementations exposure to false sharing on one side.
And here we are :-), mostly done I think.
I would like to thank Pressana for his feedback, contribution and helpful discussion. He has a website with some of his stuff on, he is awesome! As mentioned above the idea for using sparse data is his own (quoting from an e-mail):
"All I tried was to cache align access to the elements in the array as it is done in plenty of c/c++ implementations. I myself use arrays of structs to ensure cache alignment in C. The same principle is applied here in Java where we don't have structs but only an array of pointer. "
Finally, the end result (FFO3/Y8) is incredibly sensitive to code changes as the difference between using a constant and field demonstrates, but also performs very consistently between runs. As such I feel there is little scope for improvement left there... so maybe now I can get on to some other topics ;-).