The merging queue is a useful construct for slow consumers. It allows a bounded queue to keep receiving updates with the requirement for space limited to the number of keys. It also allows the consumer to skip old data. This is particularly of interest for systems dealing with fast moving data where old data is not relevant and will just slow you down. I've seen this requirement in many pricing systems in the past few years, but there are other variations.
Here's the interface:
What about LinkedHashMap?
Now it is true that LinkedHashMap offers similar functionality and you could use it to implement a merging queue as follows:This works, but the way we have to implement poll() is clumsy. What I mean by that is that it looks like we are asking for allot more than we want to work around some missing functionality. If you dig into the machinery behind the expression: "lastValMap.remove(lastValMap.keySet().iterator().next())" there's an awful lot of intermediate structures we need to jump through before we get where we are going. LinkedHashMap is simply not geared toward being a queue, we are abusing it to get what we want.
ArrayDeque to the rescue!
ArrayDeque is one of the unsung heroes of the java collections. If you ever need a non-concurrent queue or stack look no further than this class. In it's guts you'll find the familiar ring buffer. It doesn't allocate or copy anything when you pop elements out or put them in(unless you exceed the capacity). It's cache friendly(unlike a linked list). It's LOVELY!Here's a merging queue implementation using a HashMap and ArrayDeque combined:
You can replace the HashMap with an open address implementation to get more cache friendly behaviour for key collisions if you like, but in the name of KISS we won't go down that particular rabbit hole. As the comment states, setting entries to null rather than removing them is an optimization with a trade off. If your key set is not of a finite manageable range then this is perhaps not the way to go. As it stands it saves you some GC overheads. This optimization is not really open to you with LinkedHashMap where the values and their order are managed as one.
ArrayDeque is a better performer than any other queue for the all the reasons discussed in this StackOverflow discussion, which boil down to:
- backed by a ring buffer (yes, like the Disruptor! you clever monkeys)
- it uses a power of 2 sized backing array, which allows it to replace modulo(%) with a bit-wise and(&) which works because x % some-power-of-2 is the same as x & (some-power-of-2 - 1)
- adding and removing elements is all about changing the head/tail counters, no copies, no garbage (until you hit capacity).
- iterating through an array involves no pointer chasing, unlike linked list.
I like the way you walk, I like the way you talk, Susie Q!
I'm using a micro benchmarking framework which is both awesome and secret, so sadly the benchmark code is not entirely peer review-able. I will put the benchmark on GitHub when the framework makers will give me the go ahead which should be soon enough. Here are the benchmarks:
The results(average over multiple runs) are as follows:
Experiment Throughput Cost
array.measureOffer, 100881.077 ops/msec, 10ns
array.measureOffer1Poll1, 41679.299 ops/msec, 24ns
array.measureOffer2Poll1, 30217.424 ops/msec, 33ns
array.measureOffer2Poll2, 21365.283 ops/msec, 47ns
array.measureOffer1000PollUntilEmpty, 102.232 ops/msec, 9804ns
linked.measureOffer, 103403.692 ops/msec, 10ns
linked.measureOffer1Poll1, 24970.200 ops/msec, 40ns
linked.measureOffer2Poll1, 16228.638 ops/msec, 62ns
linked.measureOffer2Poll2, 12874.235 ops/msec, 78ns
linked.measureOffer1000PollUntilEmpty, 92.328 ops/msec, 10830ns
--------
Finally, a small real world gem I hit while writing this blog. When benchmarking the 1k offer/queue drain case for the linked implementation I hit this JVM bug - "command line length affects performance". The way it manifested was bad performance (~50 ops/ms) when running with one set of parameters and much better performance when using some extra parameters to profile GC which I'd have expected to slow things down if anything. It had me banging my head against the wall for a bit, I wrote a second benchmark to validate what I considered the representative performance etc. Eventually I talked to Mr. Shipilev who pointed me at this ticket. I was not suffering the same issue with the other benchmarks, or the same benchmark for the other implementation which goes to show what a slippery sucker this is. The life lesson from this is to only trust what you measure. I can discard the benchmark result if I like, but if you change your command line arguments in a production environment and hit a kink like that you will have a real problem.
Many thanks to Doug Lawrie with whom I had a discussion about his implementation of a merging event queue (a merging queue stuck on the end of a Disruptor) which drove me to write this post.
Update 08/03/2013: Just realized I forgot to include a link to the code. Here it is.
The results(average over multiple runs) are as follows:
Experiment Throughput Cost
array.measureOffer, 100881.077 ops/msec, 10ns
array.measureOffer1Poll1, 41679.299 ops/msec, 24ns
array.measureOffer2Poll1, 30217.424 ops/msec, 33ns
array.measureOffer2Poll2, 21365.283 ops/msec, 47ns
array.measureOffer1000PollUntilEmpty, 102.232 ops/msec, 9804ns
linked.measureOffer, 103403.692 ops/msec, 10ns
linked.measureOffer1Poll1, 24970.200 ops/msec, 40ns
linked.measureOffer2Poll1, 16228.638 ops/msec, 62ns
linked.measureOffer2Poll2, 12874.235 ops/msec, 78ns
linked.measureOffer1000PollUntilEmpty, 92.328 ops/msec, 10830ns
--------
Interpretation:
- Offer method cost for both implementations is quite similar at 10ns, with the linked implementation marginally faster perhaps.
- Poll method cost is roughly 14ns for the array deque based implementation, and 30ns for the linked implementation. Further profiling has also shown that while the deq implementation generates no garbage the linked implementation has some garbage overhead.
- For my idea of a real world load the array deq is 10% faster.
Finally, a small real world gem I hit while writing this blog. When benchmarking the 1k offer/queue drain case for the linked implementation I hit this JVM bug - "command line length affects performance". The way it manifested was bad performance (~50 ops/ms) when running with one set of parameters and much better performance when using some extra parameters to profile GC which I'd have expected to slow things down if anything. It had me banging my head against the wall for a bit, I wrote a second benchmark to validate what I considered the representative performance etc. Eventually I talked to Mr. Shipilev who pointed me at this ticket. I was not suffering the same issue with the other benchmarks, or the same benchmark for the other implementation which goes to show what a slippery sucker this is. The life lesson from this is to only trust what you measure. I can discard the benchmark result if I like, but if you change your command line arguments in a production environment and hit a kink like that you will have a real problem.
Many thanks to Doug Lawrie with whom I had a discussion about his implementation of a merging event queue (a merging queue stuck on the end of a Disruptor) which drove me to write this post.
Update 08/03/2013: Just realized I forgot to include a link to the code. Here it is.
Just a month ago my collegue implement something very similar to concurrent "merge queue" (in your terms, we call it "throttling executor" 'cos of our use case) -- multithreaded put, single/multithreaded consume. We have many interesting discussions about how to design it. Few ideas from such discussions written here (my collegue also write his blog in russian, so you'll need google translate:).
ReplyDeleteSee reply below...
DeleteThe concurrent use case is interesting. I think google translate butchered your friend's post a bit, but would be interested to see the code if you can share. As the single threaded merge operation is very fast you may find that using a disruptor(like Doug did) gives you better performance at the cost of buffering. Only one way to find out though :).
ReplyDeleteNeeds to be benchmarked-ed, but I doubt. Roughly, current design use open-addressing map, and on fast-path there is only 1 CAS to update value on .put(). Implementing same thing with Disruptor as Multiplexor (there exists more efficient design for multi-threaded Multiplexor, than Disruptor, but it's another talk) you'll have same 1 CAS for claim().
DeleteBut in D-case it'll be much more contended CAS, since all put()-er threads will contend on single claiming cursor. While in OOMap .put() threads could contend only by cell basis, which is 1/size() rarer. (In many use cases it is even impossible, since there are exists some domain-specific keys affinity, i.e. specific key usually comes from same thread all the time). So, in best case there are only contention between producer and consumer.
I'll ask Vladimir about to OS his code in some way
Definitely I have to translate it and enrich with the details and measurement results.
ReplyDeleteThe thing is it is not a good idea use Disruptor directly. I used lots of ideas - like applying waiting strategy ( busy spin on card marking + wait method if there are no changes have been made by producer), as well there are lots of binary operations as array length is 2^k and it is possible to use shifts instead of multiplies or divisions, AND masks instead of MOD etc.
But there is no requirement to await for a vacant array item (like Disruptor does).
We've open sourced it now:
ReplyDeletehttps://bitbucket.org/vladimir.dolzhenko/collections/
Hi Nitsan,
ReplyDeleteJust curious, do you have an implementation of Merging-Queue which uses your Single Producer Single Consumer queue?
Thanks
Nope... but there's an LMAX collection that does exactly that: https://github.com/LMAX-Exchange/LMAXCollections/tree/master/CoalescingRingBuffer
Delete