I put FFBufferWithOfferBatch together as part of looking into the performance of BQueue and what with all the excitement about JAQ I never introduced it properly. As it's the best performing SPSC(Single Producer Single Consumer) queue I have thus far I thought I should quickly cover it and tell you some horrific benchmarking tales of how I determined it's the best thus far.
Offer like a BQueue
The BQueue was previously presented and discussed here. Offer logic is simple and improves on the original FFBuffer by ensuring clearance to write ahead for the next OFFER_BATCH_SIZE elements. This improvement has 3 effects:
- Most of the time we can write without reading and checking for null. We do one volatile read for every OFFER_BATCH_SIZE elements, the rest of the time we compare to the cached field which is a normal field that can be held in a register.
- We avoid hitting the full queue induced contention by keeping a certain distance from the head of the queue. Contention can still happen, but it should be quite rare.
- We potentially miss out on part of the queue capacity as we will declare the queue full if there are less than OFFER_BATCH_SIZE available when we hit the cached value.
Poll like an FFBuffer
This poll method is again quite simple. Offset hides the sparse data shift trick that is discussed in depth here. The sparse data trick is a way of reducing contention when nearing the empty queue state. With a sparse shift of 2 only 4 elements in the queue buffer actually share the same cache line (as opposed to 16 when data is not sparse) reducing the frequency with which that state occurs. This has a side effect of hurting memory throughput as each cache line we load has less data in it.
Benchmarks: A Misery Wrapped In An Enema
I've been benchmarking these queues over a long period of time and it is a learning experience (i.e: very painful). Benchmarking is just generally a complex matter, but when it comes to examining and comparing code where a single operation (offer/poll) is in the nano-seconds I quote Mr. Shipilev: "nano-benchmarks are the damned beasts". Now, consider a multi-threaded nano-benchmark...
Let's start with naming the contestants:
The benchmark measures the time it takes for REPETITIONS number of 'messages' to be sent from the producer to the consumer. If the consumer find that the queue is empty it will Thread.yield and try again, similarly if the producer finds that the queue is full it will Thread.yield and try again.
Here's how the queues perform with that benchmark (running on Ubuntu13.10/JDK7u45/i7-4300MQ@2.4 no turbo boost, pinned across cores using taskset, with JVM options: '-server -XX:+UseCondCardMark -XX:CompileThreshold=100000'):
CAQ(sparse=0) - 130M ops/sec
FF1(sparse=0) - 155M ops/sec
FF2(sparse=0) - 185M ops/sec
CAQ(sparse=2) - 288M ops/sec
FF1(sparse=2) - 282M ops/sec
FF2(sparse=2) - 290M ops/sec
When I started on JAQ I took the same benchmark, but changed a few things. I moved the start timestamp to the producing thread and moved the thread join out of the measured section, I also added some counters for when the producer fails to offer (queue is full) and the consumer fails to poll (queue is empty). Finally I took this same benchmark and ported it to use my very own ConcurrentQueue interface. Here's how it looked:
You would think this sort of change could only make performance slightly worse, if it made any difference at all. Certainly that's what I thought, but I was wrong... With the new benchmark I got the following results:
FF2(sparse=0) - 345M ops/sec
Let's start with naming the contestants:
- CAQ - InlinedCountersConcurrentArrayQueue - My final version of Martin Thompsons SPSC algorithm with some extra padding, inlined counters and support for sparse data. This queue was presented in length previously (Intro, benchmarks, inlined vs. floating counters)
- FF1 - FFBuffer - A port to Java of the Fast Flow SPSC. Full padding and support for sparse data. This queue was previously discussed here.
- FF2 - FFBufferWithOfferBatch - described above with and without sparse data.
The benchmark measures the time it takes for REPETITIONS number of 'messages' to be sent from the producer to the consumer. If the consumer find that the queue is empty it will Thread.yield and try again, similarly if the producer finds that the queue is full it will Thread.yield and try again.
Here's how the queues perform with that benchmark (running on Ubuntu13.10/JDK7u45/i7-4300MQ@2.4 no turbo boost, pinned across cores using taskset, with JVM options: '-server -XX:+UseCondCardMark -XX:CompileThreshold=100000'):
CAQ(sparse=0) - 130M ops/sec
FF1(sparse=0) - 155M ops/sec
FF2(sparse=0) - 185M ops/sec
CAQ(sparse=2) - 288M ops/sec
FF1(sparse=2) - 282M ops/sec
FF2(sparse=2) - 290M ops/sec
You would think this sort of change could only make performance slightly worse, if it made any difference at all. Certainly that's what I thought, but I was wrong... With the new benchmark I got the following results:
FF2(sparse=0) - 345M ops/sec
FF2(sparse=2) - 327M ops/sec
WTF? What is the difference?
I looked long and hard at the benchmark and realized the only difference, that shouldn't really make a difference, but the only difference that does make a difference is the localization of the queue field to the producer loop (found by process of elimination). Tweaking the original benchmark to localize the queue reference gives us different results for all queues:
CAQ(sparse=0) - 291M ops/sec
FF1(sparse=0) - 190M ops/sec
FF2(sparse=0) - 348M ops/sec
CAQ(sparse=2) - 303M ops/sec
FF1(sparse=2) - 287M ops/sec
FF2(sparse=2) - 330M ops/sec
So we notice things have not improved much for FF1, but for CAQ we now have only a marginal difference between using sparse data and not using it, and for FF2 it is actually better not to bother at all with sparse data. The localization of the queue reference made the producer faster, reducing the number of times the empty queue state is hit and thus reducing the need for sparse data. We can try to validate this claim by running the Queue variation of this benchmark with the counters. With the reference localized:
CAQ(sparse=0) - 291M ops/sec, poll fails 350, offer fails 0
FF1(sparse=0) - 192M ops/sec, poll fails 32000, offer fails 0
FF2(sparse=0) - 348M ops/sec, poll fails 150, offer fails 13000
CAQ(sparse=2) - 303M ops/sec, poll fails 270, offer fails 0
FF1(sparse=2) - 287M ops/sec, poll fails 200, offer fails 0
FF2(sparse=2) - 330M ops/sec, poll fails 170, offer fails 10
So we can see that adding the extra counters made little difference with the reference localized, but when referencing the field directly in the producer loop:
CAQ(sparse=0) - 291M ops/sec
FF1(sparse=0) - 190M ops/sec
FF2(sparse=0) - 348M ops/sec
CAQ(sparse=2) - 303M ops/sec
FF1(sparse=2) - 287M ops/sec
FF2(sparse=2) - 330M ops/sec
So we notice things have not improved much for FF1, but for CAQ we now have only a marginal difference between using sparse data and not using it, and for FF2 it is actually better not to bother at all with sparse data. The localization of the queue reference made the producer faster, reducing the number of times the empty queue state is hit and thus reducing the need for sparse data. We can try to validate this claim by running the Queue variation of this benchmark with the counters. With the reference localized:
CAQ(sparse=0) - 291M ops/sec, poll fails 350, offer fails 0
FF1(sparse=0) - 192M ops/sec, poll fails 32000, offer fails 0
CAQ(sparse=2) - 303M ops/sec, poll fails 270, offer fails 0
FF1(sparse=2) - 287M ops/sec, poll fails 200, offer fails 0
So we can see that adding the extra counters made little difference with the reference localized, but when referencing the field directly in the producer loop:
CAQ(sparse=0) - 167M ops/sec, poll fails 2400, offer fails 0
FF1(sparse=0) - 160M ops/sec, poll fails 100000, offer fails 0
FF2(sparse=0) - 220M ops/sec, poll fails 2000, offer fails 0
CAQ(sparse=2) - 164M ops/sec, poll fails 31000, offer fails 0
FF1(sparse=2) - 250M ops/sec, poll fails 2000, offer fails 0
FF2(sparse=2) - 255M ops/sec, poll fails 500, offer fails 0
We get a different picture again, in particular for CAQ. To get to the bottom of the exact effect localizing the reference had on this benchmark I will have to dig into the generated assembly. This will have to wait for another post...
Let me elaborate on this last point. What I read into the results above is that the benchmark was initially a benchmark where the queue full state was never hit, and the queue empty state was hit more or less depending on the queue implementation. Given FF2 is only a change to the offer method of FF1 we can see how tilting the balance between the offer cost and poll cost changed the nature of the test. In particular when using no sparse data the producer turned out to be significantly faster than the consumer. But... in changing the implementation we have unbalanced the benchmark, it is hardly ever hitting an empty queue, which would slow down the consumer/producer threads. We have switched to measuring the full speed of consumption as a bottleneck, but only for one queue implementation. So this is, in a way, not the same benchmark for all queues.
While this benchmark is still useful to my mind, it is worth considering that it is in fact benchmarking different use cases for different queues and that the behaviour of the using code will inevitably balance the producer/consumer costs quite differently. All in all, yet another case of your mileage may vary ;-)
Thanks to Peter, Darach and Mr. Gomez
FF1(sparse=0) - 160M ops/sec, poll fails 100000, offer fails 0
CAQ(sparse=2) - 164M ops/sec, poll fails 31000, offer fails 0
FF1(sparse=2) - 250M ops/sec, poll fails 2000, offer fails 0
We get a different picture again, in particular for CAQ. To get to the bottom of the exact effect localizing the reference had on this benchmark I will have to dig into the generated assembly. This will have to wait for another post...
Conclusions And Confusions
The overall winner, with all the different variations on the throughput benchmarks, is the FFBufferWithOfferBatch. It's also the winner of the previously presented/discussed latency benchmarks(part 1, part 2). With turbo boost on it hits a remarkable high of 470M ops/sec. But setting this result to one side, the above results highlight a flaw in the throughput benchmark that is worth considering in the context of other concurrent benchmarks, namely that changing the [queue] implementation under test can change the benchmark.Let me elaborate on this last point. What I read into the results above is that the benchmark was initially a benchmark where the queue full state was never hit, and the queue empty state was hit more or less depending on the queue implementation. Given FF2 is only a change to the offer method of FF1 we can see how tilting the balance between the offer cost and poll cost changed the nature of the test. In particular when using no sparse data the producer turned out to be significantly faster than the consumer. But... in changing the implementation we have unbalanced the benchmark, it is hardly ever hitting an empty queue, which would slow down the consumer/producer threads. We have switched to measuring the full speed of consumption as a bottleneck, but only for one queue implementation. So this is, in a way, not the same benchmark for all queues.
While this benchmark is still useful to my mind, it is worth considering that it is in fact benchmarking different use cases for different queues and that the behaviour of the using code will inevitably balance the producer/consumer costs quite differently. All in all, yet another case of your mileage may vary ;-)
Thanks to Peter, Darach and Mr. Gomez
Thanks Nitsan for the great posts in this series.
ReplyDeleteThe link to the "winner" (FF2) is broken. Is the class available on GIT or somewhere else?
Thanks for pointing it out, fixed all the source links. JAQ is now JCTools and some queues have been renamed and reshuffled...
DeleteLet me know if you find more broken links.
First of All Great work, I really enjoing reading it.
ReplyDeleteI didn't now where exactly to ask this Question to you, so I try it here in the comments.
For a Project of mine I'm in the need of a really Fast SPSC Queue, maybe later a MPSC Queue too.
Sadly I have some additional requirements, so I can not use your Queues without modification.
As hard as I tried, I could not find a solution, so I hope you will have one.
To my Requirements:
my offer & poll methods take an adidtional Parameter which represents a Continuation.
In the case where the offer or poll fails because the queue was empty or full, I want to store the continuation to a field in the queue and as soon as an element is added or removed, the Continuation.shedule function should be called (which schedules the continuation into a ForkJoinPool) and then the continuation should be removed from the queue.
I have to guarantee, that never both sides have saved theire continuation into the queue (one get lost, because theire is only one field to store it) and that whenever the queue is nonfull and nonempty no continuation is stored.
In all may attempts i often ended up in the situation where theire where elements in the queue but the reader was suspended (put their suspension into the field but schedule was never called) and the writer has already produced his whole workload but failed to call schedule.
Any idea how to design a Fast SPSC Queue with Suspension?
I'm not sure I'm getting your requirement correctly:
Delete- On offer/poll you want to detect transition from empty/full. If the transition is detected you will schedule a runnable somewhere. The queues currently do not detect this transition. Detecting it is not hard in the SPSC case at least.
- Only one side can have a 'continuation' is stored. This sounds like a CAS.
Send me your code and I'll see if I can help
Because I failed, I currently falled back to a synchronized variant so i can continue, no i want to make it faster :).
DeleteImportant is, that the runnable I schedule is the other side of the queue, which stopped when they set the runnable.
That is what i currently have;
public class SyncChannel implements Channel {
private final int mask;
private final Object[] buffer; //will be power of 2 in size
private long tail = 0;
private long head = 0;
private Suspension parked;
//max size will be power of 2 in size
public SyncChannel(int maxSize){
mask = maxSize - 1;
buffer = new Object[maxSize];
}
public synchronized boolean offer(Object t, Suspension susp) t{
if((tail - head) == buffer.length){
parked = susp;
return false; //suspended (Task will abort / and continue when susp.schedule() is called)
}
buffer[((int)tail) & mask] = t;
if(tail++ == head && parked != null){
Suspension s = parked;
parked = null;
s.schedule();
}
return true; //suceeded
}
@Override
public synchronized T poll(Suspension susp){
if(tail == head){
parked = susp;
return null; //suspended (Task will abort / and continue when susp.schedule() is called)
}
if(profiler != null)profiler.profileReceive();
T res = (T)buffer[((int)head) & mask];
buffer[((int)head) & mask] = null;
if(tail - head++) == buffer.length && parked != null){
Suspension s = parked;
parked = null;
s.schedule();
}
assert res != null;
return res; //suceeded
}
}
This comment has been removed by the author.
DeleteNote: by removing some asserts some little mistake has sliped in
Deleteif(tail - head++) == buffer.length && parked != null)
should be:
if((tail - head++) == buffer.length && parked != null)
it was:
head++;
if(parked != null){
assert((tail - (head-1)) == buffer.length)
...
but i changed it in both methods, because i thought it should be more clear, but because parked only get seted when (tail - head++) == buffer.length and after that producer stops doing anything until schedule() is called: (tail - head++) == buffer.length should hold always.
and I overlooked the line: if(profiler != null)profiler.profileReceive(); which should be removed, as you see i tried to simplyfy so no unecessery stuff is theire, but i failed in my hurry
Sorry markus, I don't have allot of time on my hands at the moment. I would be easier for me to review the code if you posted it a a compilable java class somewhere...
Delete