D. Vyukov is an awesome lock-free dude, and I often refer to his instructive and invaluable website 1024cores.net in my posts. On his site he covers lock free queue implementations and in particular a wait-free MPSC linked node queue. This is really rather special when you consider that normally MP would imply lock-free rather than wait-free guarantees. I've ported his algorithm to Java (and so have many others: Netty/Akka/RxJava etc.), and had to tweak it to match the Queue interface. In this post I'd like to explain the algorithm, it's translation to Java, and the implications of making it a j.u.Queue.
Lock free vs. Wait free
Let's review the definitions:- Wait-free: thread progress is guaranteed, all operations finish in a determined number of steps.
- Lock-free: global progress is guaranteed, though a particular thread may be blocked.
An example of a transition from lock free to wait free is available with JDK8 changes to AtomicReference::getAndSet(). The change was made by utilizing the newly available Unsafe::getAndSetObject intrinsic which translates directly to XCHG (on x86). So where we used to have for AtomicReference:
T getAndSet(T newVal) {
T currentValue;
do {
currentValue = val; // val is a volatile field
} while (!Unsafe.compareAndSwapObject(this, VAL_FIELD_OFFSET, currentValue, newValue));
return currentValue;
}
Now we have:
T getAndSet(T newVal) {
return Unsafe.getAndSetObject(this, VAL_FIELD_OFFSET, newValue);
}
I discussed a similar change to AtomicLong.getAndAdd in a previous post, replacing the CAS loop with LOCK XADD.
The Vyukov Wait Free MPSC queue
This is a LinkedList type structure and the interesting methods are offer and poll, here's the original (I did some formatting):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
struct mpscq_node_t { | |
mpscq_node_t* volatile next; | |
void* state; | |
}; | |
struct mpscq_t { | |
mpscq_node_t* volatile head; | |
mpscq_node_t* tail; | |
}; | |
void mpscq_create(mpscq_t* self, mpscq_node_t* stub) { | |
stub->next = 0; | |
self->head = stub; | |
self->tail = stub; | |
} | |
void mpscq_offer(mpscq_t* self, mpscq_node_t* n) { | |
n->next = NULL; | |
mpscq_node_t* prev = XCHG(&self->head, n); // serialization-point wrt producers, acquire-release | |
prev->next = n; // serialization-point wrt consumer, release | |
} | |
mpscq_node_t* mpscq_poll(mpscq_t* self) { | |
mpscq_node_t* tail = self->tail; | |
mpscq_node_t* next = tail->next; // serialization-point wrt producers, acquire | |
if (next != NULL) { | |
self->tail = next; | |
tail->state = next->state; | |
return tail; | |
} | |
return NULL; | |
} |
But how do we manage the same memory barriers in Java? We can be nice about it and use the AtomicFieldUpdater or more brutal and use Unsafe. I find you get better performance with Unsafe, but you should consider how appropriate this is for you. In any case, here's what we end up with:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MpscQueue<T> { | |
// This song and dance is required to get field offset which we need for getAndSet | |
private static final long HEAD_FIELD_OFFSET; | |
static { | |
try { | |
HEAD_FIELD_OFFSET = UNSAFE.objectFieldOffset(MpscQueue.class.getDeclaredField("head")); | |
} catch (NoSuchFieldException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
static class Node<T> { | |
T val; | |
// C 'volatile' is not the same as Java 'volatile', but in this case they happen to | |
// apply to the same fields. | |
volatile Node<T> next; | |
} | |
volatile Node<T> head; | |
Node<T> tail; | |
MpscQueue() { | |
head = tail = new Node<T>(); | |
} | |
public boolean offer(T val) { | |
if (val == null) throw new IllegalArgumentException("You bastard!!!"); | |
Node<T> n = new Node<T>(); | |
n.val = val; | |
// getAndSet acts as an effective StoreLoad[0] | |
// see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html | |
// The LOCK XCHG mechanics create the ordering between producers, hence Vyukov comment on | |
// serialization. | |
Node<T> prev = (Node<T>) UNSAFE.getAndSetObject(this, HEAD_FIELD_OFFSET, n); | |
// StoreLoad[1] (StoreStore would do we can weaken this write to a lazySet/putOrdered) | |
prev.next = n; | |
return true; | |
} | |
public T poll() { | |
// LoadLoad matching StoreLoad[1] | |
Node<T> next = tail.next; | |
// this establishes the HB relationship between producer and consumer | |
if (next != null) { | |
T val = next.val; | |
next.val = null; | |
tail = next; | |
return val; | |
} | |
return null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// to save us some effort we extend AbstractQueue, this requires we implement | |
// 3 more methods. | |
class MpscQueue<T> extends AbstractQueue<T> { | |
@Override | |
public T peek() { | |
Node<T> next = tail.next; | |
if (next != null) { | |
return next.val; | |
} | |
return null; | |
} | |
@Override | |
public int size() { | |
int size = 0; | |
Node<T> chaser = tail; | |
while ((chaser = chaser.next) != null) { | |
size++; | |
} | |
return size; | |
} | |
@Override | |
public Iterator<T> iterator() { | |
return null; // fuck it... | |
} | |
} |
Vyukov::poll() ain't Queue::poll!
I've discussed this particular annoyance in a previous post which has some overlap with this one. The names are the same, but as it turns out the guarantees are quite different. While Vyukov is doing a
fine job implementing a queue, not any queue is a j.u.Queue. In particular for this case, the poll()
method has a different contract:- j.u.Queue: Retrieves and removes the head of this queue, or returns null if this queue is empty.
- Vyukov: Retrieves and removes the head of this queue, or returns null if next element is not available.
- Producer 1: we step 1 line, we just replaced head with node n. We are suspended before executing line 35.
- Producer 2: we let the program continue. We've replaced head and linked it to the previous head.
- head = Node[2], this is the node created by Producer 2. We also know that Node[1].next = Node[2], because we let offer run it's course on Producer 2.
- tail = Node[0] the node we allocated in the constructor. This is the node head was before the first producer came along. This is what prev is equal to for Producer 1, but because we suspended that thread it never set it's next value. Node[0].next is still null!
If a consumer came along now they would get a null from poll(), indicating the queue is empty. But the queue is obviously not empty!
So it seems we cannot deduce the queue is empty from looking at tail.next here's 2 valid indicators that the queue is empty:
- head == tail : this is the starting point set in the constructor and where the consumer ends up after consuming the last element
- head.val == null : head can only have a value of null if it is tail
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public boolean isEmpty() { | |
return head == tail; | |
} | |
@Override | |
public T poll() { | |
Node<T> t = tail; // avoid reloading after volatile load | |
Node<T> next = t.next; // volatile load | |
if (next != null) { | |
return consumeNode(next); | |
} | |
else if (t != head) { // same as isEmpty, but no reason to reload tail | |
// spin wait for next to show up | |
while ((next = t.next) == null); | |
return consumeNode(next); | |
} | |
return null; | |
} | |
@Override | |
public T peek() { | |
Node<T> t = tail; | |
Node<T> next = t.next; | |
if (next != null) { | |
return next.val; | |
} | |
else if (t != head) { | |
// spin wait for next to show up | |
while ((next = t.next) == null); | |
return next.val; | |
} | |
return null; | |
} | |
private T consumeNode(Node<T> next) { | |
T val = next.val; | |
next.val = null; | |
tail = next; | |
return val; | |
} |
This pitfall is tricky enough that not only did I fall for it (on another queue algorithm, but same mistake), it also took by surprise Mr. Manes writing an interesting variant on this queue for his high performance cache implementation (I filed an issue which he promptly fixed), and struck the notorious Dave Dice when considering Vyukov's MPMC queue (see comments for discussion).

Almost... size() is still broken.
A Good Size
It's not really surprising that size is broken given the terminating condition for the size loop was relying on next == null to terminate the count. Size was also broken in 2 other subtle ways:
- The interface for size dictates that it returns a positive int. But given that the queue is unbounded it is possible (though very unlikely) for it to have more than 2^31 elements. This would require that the linked list consume over 64GB (16b + 8b + 8b=32b per element, refs are 8b since this requires more than 32GB heap so no compressed oops). Unlikely, but not impossible. This edge condition is handled for ConcurrentLinkedQueue (same as here), and for LinkedBlockingQueue (by bounding it's size to MAX_INT, so it's not really unbounded after all) but not for LinkedList (size is maintained in an int and is blindly incremented).
- Size is chasing a moving target and as such can in theory never terminate as producers keep adding nodes. We should limit the scope of the measurement to the size of the queue at the time of the call. This is not done for CLQ and should perhaps be considered.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public int size() { | |
Node<T> chaserNode = tail; | |
final Node<T> currentHead = head; | |
int size = 0; | |
while (chaserNode != currentHead && size < Integer.MAX_VALUE) { | |
Node<T> next; | |
while((next = chaserNode.next) == null); | |
chaserNode = next; | |
size++; | |
} | |
return size; | |
} |
Special thanks to Doug and Ben for reviewing!