D. Vyukov is an awesome lock-free dude, and I often refer to his instructive and invaluable website 1024cores.net in my posts. On his site he covers lock free queue implementations and in particular a wait-free MPSC linked node queue. This is really rather special when you consider that normally MP would imply lock-free rather than wait-free guarantees. I've ported his algorithm to Java (and so have many others: Netty/Akka/RxJava etc.), and had to tweak it to match the Queue interface. In this post I'd like to explain the algorithm, it's translation to Java, and the implications of making it a j.u.Queue.
Lock free vs. Wait free
Let's review the definitions:- Wait-free: thread progress is guaranteed, all operations finish in a determined number of steps.
- Lock-free: global progress is guaranteed, though a particular thread may be blocked.
An example of a transition from lock free to wait free is available with JDK8 changes to AtomicReference::getAndSet(). The change was made by utilizing the newly available Unsafe::getAndSetObject intrinsic which translates directly to XCHG (on x86). So where we used to have for AtomicReference:
T getAndSet(T newVal) {
T currentValue;
do {
currentValue = val; // val is a volatile field
} while (!Unsafe.compareAndSwapObject(this, VAL_FIELD_OFFSET, currentValue, newValue));
return currentValue;
}
Now we have:
T getAndSet(T newVal) {
return Unsafe.getAndSetObject(this, VAL_FIELD_OFFSET, newValue);
}
I discussed a similar change to AtomicLong.getAndAdd in a previous post, replacing the CAS loop with LOCK XADD.
The Vyukov Wait Free MPSC queue
This is a LinkedList type structure and the interesting methods are offer and poll, here's the original (I did some formatting):Awesome in it's simplicity, deceptive in it's genius. Be aware that head/tail meaning is the other way around than what most people are used to. I personally go for producer/consumerNode (head = producer side, tail = consumer side in the above snippet) in my code, but for consistency I'll stick with Mr. Vs notation for the porting exercise.
But how do we manage the same memory barriers in Java? We can be nice about it and use the AtomicFieldUpdater or more brutal and use Unsafe. I find you get better performance with Unsafe, but you should consider how appropriate this is for you. In any case, here's what we end up with:
The code is pretty similar. Now if we wanted to complete the j.u.Queue we could extend AbstractQueue and implement only size()/peek() and we're done (I'm not going to bother with the iterator()):
There we go, seems reasonable don't it? off to the pub to celebrate some kick ass lock free programming!
Vyukov::poll() ain't Queue::poll!
I've discussed this particular annoyance in a previous post which has some overlap with this one. The names are the same, but as it turns out the guarantees are quite different. While Vyukov is doing a fine job implementing a queue, not any queue is a j.u.Queue. In particular for this case, the poll() method has a different contract:- j.u.Queue: Retrieves and removes the head of this queue, or returns null if this queue is empty.
- Vyukov: Retrieves and removes the head of this queue, or returns null if next element is not available.
- Producer 1: we step 1 line, we just replaced head with node n. We are suspended before executing line 35.
- Producer 2: we let the program continue. We've replaced head and linked it to the previous head.
- head = Node[2], this is the node created by Producer 2. We also know that Node[1].next = Node[2], because we let offer run it's course on Producer 2.
- tail = Node[0] the node we allocated in the constructor. This is the node head was before the first producer came along. This is what prev is equal to for Producer 1, but because we suspended that thread it never set it's next value. Node[0].next is still null!
If a consumer came along now they would get a null from poll(), indicating the queue is empty. But the queue is obviously not empty!
So it seems we cannot deduce the queue is empty from looking at tail.next here's 2 valid indicators that the queue is empty:
- head == tail : this is the starting point set in the constructor and where the consumer ends up after consuming the last element
- head.val == null : head can only have a value of null if it is tail
This is a bit annoying because now we have a wait-free offer(), but poll() and peek() are lock-free(only block one thread, producers can make progress).
This pitfall is tricky enough that not only did I fall for it (on another queue algorithm, but same mistake), it also took by surprise Mr. Manes writing an interesting variant on this queue for his high performance cache implementation (I filed an issue which he promptly fixed), and struck the notorious Dave Dice when considering Vyukov's MPMC queue (see comments for discussion).
So is this it? Are we done?
Almost... size() is still broken.
A Good Size
It's not really surprising that size is broken given the terminating condition for the size loop was relying on next == null to terminate the count. Size was also broken in 2 other subtle ways:
- The interface for size dictates that it returns a positive int. But given that the queue is unbounded it is possible (though very unlikely) for it to have more than 2^31 elements. This would require that the linked list consume over 64GB (16b + 8b + 8b=32b per element, refs are 8b since this requires more than 32GB heap so no compressed oops). Unlikely, but not impossible. This edge condition is handled for ConcurrentLinkedQueue (same as here), and for LinkedBlockingQueue (by bounding it's size to MAX_INT, so it's not really unbounded after all) but not for LinkedList (size is maintained in an int and is blindly incremented).
- Size is chasing a moving target and as such can in theory never terminate as producers keep adding nodes. We should limit the scope of the measurement to the size of the queue at the time of the call. This is not done for CLQ and should perhaps be considered.
Special thanks to Doug and Ben for reviewing!
Hi Nitsan, thanks -- I enjoyed the posting.
ReplyDeleteAs an aside, it's interesting to note the duality between queues and mutual exclusion locks. It's usually trivial to convert a lock-free MPSC queue -- multiple-producer-single-consumer -- into a lock. Threads in lock() arrive and enqueue an "acquire" request upon which they wait. Only the owner can dequeue at unlock()-time, thus the "SC" constraint. Conversely, you can take a queue-based lock such as CLH and deconstruct it to yield an MPSC queue such as Vyukov's MPSC queue.
Regarding size: I have seen Doug Lea saying this somewhere, that value of size in concurrent queue is pretty meaningless so you might just as well return 0 if it is empty and 42 otherwise. Might have been a joke but I can't say its not tempting :)
ReplyDeleteThank for the post, has anybody run benchmarks comparing CLH and this implementation?
ReplyDeleteI haven't, you can extract the queue from AbstractQueuedSynchronizer and see where you get.
DeleteGreat work, Nitsan!
ReplyDeleteI played with D.Vyukov MPSC about 3 years ago by porting it in Scala: https://github.com/plokhotnyuk/actors/commit/f852ba9882ce8200b60ef37316e06a54e98d239e
Than it was grown to bounded and unbounded MPMC versions of Akka mailbox:
https://github.com/plokhotnyuk/actors/blob/6eb4d8c98a1226777293169f8148716ad3d515bc/src/test/scala/akka/dispatch/Mailboxes.scala
And MPSC versions with functional batch handler for Scalaz and The Minimalist actors accordingly:
https://github.com/scalaz/scalaz/blob/dce8a804cfb79e5904cace2e6655dbc933c1d756/concurrent/src/main/scala/scalaz/concurrent/Actor.scala
https://github.com/plokhotnyuk/actors/blob/6eb4d8c98a1226777293169f8148716ad3d515bc/src/test/scala/com/github/gist/viktorklang/Actor.scala
This is an interesting study in interpretations :)
DeleteThe offer implementations are all similar (except the bounded version which blocks like BlockingQueue.put). The poll versions have slight variations but are also blocking (except for Mailbox one which may eventually return null).
The Mailbox solution to O(n) size is nice.
I contributed some minor fixes in this area: https://github.com/akka/akka/pull/15596
Hi, Nitsan!
DeleteI counted iterations of this spin loop and found that for some workload it shows 100K or even 1M iterations.
Here is how I try to mitigate that CPU wasting by limiting number of spin iterations and then resheduling current thread and consumer:
https://github.com/plokhotnyuk/actors/blob/c5bac2e1bf70bab518102c13269838ea9b9e90e6/src/test/scala/com/github/gist/viktorklang/Actor.scala#L52
A bit of a side note, back in 1987, Mellor Crummey (the MC in MCS lock) already had a queue using getAndSet() (it was called fetch-and-store back then) with the same enqueueing mechanism as Dmitry's. The dequeuing is more complex because MC was aiming for full linearizability, while Dmitry's is MPSC and has "serializability" for the enqueues, i.e. Dmitry's queue may return an empty when it isn't.
ReplyDeleteHere is the link to the paper in case you're interested:
https://www.cs.rice.edu/~johnmc/papers/cqueues-mellor-crummey-TR229-1987.pdf
pages 4 and 5 have the relevant code.
Cool post!