- An array backed queue (circular array/ring buffer etc.)
- A linked list queue
The trade off in the Java world seems to be that array backed queues offer better throughput, but are always bounded and allocated upfront, and linked queues offer smaller footprint when empty, but worse throughput and higher overhead per element when full. Linked queues are also often unbounded.
In JCTools we have the above duality, but in later versions introduced a hybrid queue which attempts to offer a middle ground between the 2 extremes above. This hybrid is the linked array queue:
- Queue is backed by one or more arrays of references.
- As long as the queue can fit in a single array no further arrays are allocated.
- When empty the queue can naturally shrink to a single backing array.
In implementing these queues I have relied heavily on the feedback and support of @akarnokd, @pcholakov and others who contributed fixes, filed bugs, and so forth. Thanks guys!
3 variations on linked array queues have been explored in JCTools:
- Chunked: Each backing array is a fixed chunk size. The queue is bounded.
- Unbounded: Each backing array is a fixed chunk size. The queue is unbounded.
- Growable: Chunk size doubles every time until the full blown backing array is used. The queue is bounded.
Hot Path offer/poll
The queues all share the same polling logic and on the fast path share the same offer logic as well:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public boolean offer(final E e) { | |
Objects.requireNonNull(e); | |
// local load of fields to avoid repeated loads after volatile reads | |
final E[] buffer = producerBuffer; | |
final long index = producerIndex; | |
final long mask = producerMask; | |
final long offset = calcElementOffset(index, mask); | |
// expected hot path | |
if (index < producerBufferLimit) { | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
return offerColdPath(buffer, mask, e, index, offset); | |
} | |
protected final void writeToQueue(final E[] buffer, final E e, final long index, final long offset) { | |
soElement(buffer, offset, e); // ordered: buffer[index&mask] = e; | |
soProducerIndex(index + 1); // ordered: producerIndex = index + 1; | |
} | |
@Override | |
public E poll() { | |
// local load of fields to avoid repeated loads after volatile reads | |
final E[] buffer = consumerBuffer; | |
final long index = consumerIndex; | |
final long mask = consumerMask; | |
final long offset = calcElementOffset(index, mask); | |
final Object e = lvElement(buffer, offset); // volatile: e = buffer[index&mask]; | |
final boolean jumpToNextBuffer = (e == JUMP); | |
if (null != e && !jumpToNextBuffer) { | |
soConsumerIndex(index + 1); // ordered: consumerIndex = index + 1; | |
soElement(buffer, offset, null); // oredered: buffer[index&mask] = null; | |
return (E) e; | |
} | |
else if (jumpToNextBuffer) { | |
return newBufferPoll(buffer, index); | |
} | |
return null; | |
} |
If you've not read JCTools code before, or maybe you've forgotten, here's the convention:
- sv/lvXXX: Store/Load Volatile, same as a volatile field write/read
- sp/lpXXX: Store/Load Plain, same as a normal field write/read
- soXXX: Store Ordered, same as an AtomicXXX.lazySet
So what's the deal:
- As long as we are not passed the producerLimit, keep writing.
- If we have passed it go to slow path (where the money be)
- As long as there's stuff in the queue, read it.
- Unless it's JUMP, in which case read through to next array.
- offer/poll code is small enough to inline when hot.
- offerColdPath/newBufferPoll are set up to either not inline or, when inlined be out of band code blocks. This should keep size on the hot path small and help the compiler focus on more important things.
- offer/poll should perform similar to the SpscArrayQueue in the common case.
!queue.isEmpty() => queue.poll() != null
NOTE: In some early versions the new array was entered instead of the JUMP constant marker. This required an instanceof check for each element loaded and a compromise to either not allow Object[] to be an element of the queue or introduce some wrapper class. Comparing to a constant turned out to be much simpler and faster.
Cold Path poll
The consumer has hit a JUMP, which indicates the producer has linked a new array to this one. The new array is the last element of the current array. We go to newBufferPoll:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private E newBufferPoll(E[] buffer, final long index) { | |
final E[] newBuffer = lvNextArrayAndUnlink(buffer); | |
final long newMask = newBuffer.length - 2; // because buffer last element is for linking | |
// update consumer view on buffer | |
consumerBuffer = nextBuffer; | |
consumerMask = newMask; | |
// poll element, must be visible in new buffer | |
final long offsetInNew = calcElementOffset(index, newMask); | |
final E n = lvElement(newBuffer, offsetInNew); // volatile: n = nextBuffer[index&newMask]; | |
if (null == n) { | |
throw new IllegalStateException("new buffer must have at least one element"); | |
} | |
soConsumerIndex(index + 1); // ordered: consumerIdex = index + 1; | |
soElement(newBuffer, offsetInNew, null); // ordered: nextBuffer[index&newMask] = null; | |
return n; | |
} | |
protected final E[] lvNextArrayAndUnlink(E[] curr) { | |
final long nextArrayOffset = nextArrayOffset(curr); | |
final E[] nextBuffer = (E[]) lvElement(curr, nextArrayOffset); | |
// prevent GC nepotism | |
soElement(curr, nextArrayOffset, null); | |
return nextBuffer; | |
} |
The consumer therefore has to:
- Load new array from the last element of old array.
- Null out reference from old to new (prevent nepotism).
- Adjust consumer view on buffer and mask.
- Poll (remove element and progress counter) from new buffer.
Cold Path offer: Unbounded
This method is implemented differently for each of the use cases, unbounded is the simplest:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
protected boolean offerColdPath(E[] buffer, long mask, E e, long pIndex, long offset) { | |
// use a fixed lookahead step based on buffer capacity | |
final long lookAheadStep = (mask + 1) / 4; | |
long pBufferLimit = pIndex + lookAheadStep; | |
// go around the buffer or add a new buffer | |
if (null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) { | |
// the element lookAheadStep away is empty, so we can use up to the element before it | |
producerBufferLimit = pBufferLimit - 1; | |
writeToQueue(buffer, e, pIndex, offset); | |
} | |
else if (null == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { | |
// the next element is null, so we can write to the current element | |
writeToQueue(buffer, e, pIndex, offset); | |
} | |
else { | |
// we got one slot left to write into => need to link new buffer. | |
final E[] newBuffer = (E[]) new Object[buffer.length]; | |
producerBuffer = newBuffer; | |
producerBufferLimit = pIndex + mask - 1; | |
linkOldToNew(pIndex, buffer, offset, newBuffer, offset, e); | |
} | |
return true; | |
} | |
void linkOldToNew(long currIndex, | |
E[] oldBuffer, | |
long offsetInOld, | |
E[] newBuffer, | |
long offsetInNew, | |
E e) { | |
soElement(newBuffer, offsetInNew, e); // ordered: newBuffer[index & newMask] = e; | |
soNext(oldBuffer, newBuffer); // ordered: oldBuffer[oldBuffer.length - 1] = newBuffer; | |
soElement(oldBuffer, offsetInOld, JUMP); // ordered: oldBuffer[index & oldMask] = JUMP; | |
soProducerIndex(currIndex + 1);// ordered: producerIndex = currIndex + 1; | |
} |
In the unbounded case we care only about our ability to make progress inside the current buffer:
- Probe inside buffer to see if we have 'many' elements to go before full. If buffer is mostly empty (this is the case for most applications most of the time), than we have successfully saved ourselves loading elements from the queue before writing in. A successful probe will update the producer limit and write to the queue.
- Probe failed, we check if we can write a single element. Remember we always need one free slot to JUMP with, so we look at the slot following the one we are on. If the next slot is empty we write to the current one, but we don't update the limit.
- A single slot remains in the buffer. We allocate a new buffer and link to it.
Cold Path offer: Chunked
With chunked linked array queues we have a fixed chunk size, but an overall bound on the size. This complicates matters because on top of the buffer level limits put on the producer we must also consider the queue level limitation. In particular there might be space available in the current producer buffer, while the queue is in fact full. Here's the implementation:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
protected final boolean offerColdPath(E[] buffer, long mask, E e, long pIndex, long offset) { | |
// use a fixed lookahead step based on buffer capacity | |
final long lookAheadStep = (mask + 1) / 4; | |
long pBufferLimit = pIndex + lookAheadStep; | |
long pQueueLimit = producerQueueLimit; | |
// out of known space | |
if (pIndex >= pQueueLimit) { | |
// we tested against a potentially out of date queue limit, refresh it | |
long cIndex = lvConsumerIndex(); | |
producerQueueLimit = pQueueLimit = cIndex + maxQueueCapacity; | |
// if we're full we're full | |
if (pIndex >= pQueueLimit) { | |
return false; | |
} | |
} | |
// if buffer limit is after queue limit we use queue limit. We need to handle overflow so | |
// cannot use Math.min | |
if (pBufferLimit - pQueueLimit > 0) { | |
pBufferLimit = pQueueLimit; | |
} | |
// go around the buffer or add a new buffer | |
if (pBufferLimit > pIndex + 1 && // there's sufficient room in buffer/queue to use pBufferLimit | |
null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) { | |
producerBufferLimit = pBufferLimit - 1; // joy, there's plenty of room | |
writeToQueue(buffer, e, pIndex, offset); | |
} | |
else if (null == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // buffer is not full | |
writeToQueue(buffer, e, pIndex, offset); | |
} | |
else { | |
// we got one slot left to write into, and we are not full. Need to link new buffer. | |
// allocate new buffer of same length | |
final E[] newBuffer = allocate((int)(mask + 2)); | |
producerBuffer = newBuffer; | |
linkOldToNew(pIndex, buffer, offset, newBuffer, offset, e); | |
} | |
return true; | |
} |
Similar to the above but the difference lies in negotiating the buffer vs. queue limit.
Cold Path offer: Growable
The growable queue is similar in spirit to an ArrayList as it doubles it's backing array capacity when a buffer is full. This adds an interesting bit of information to the game, since:
- If we are not on the last buffer, we've not hit the queue limit,
- If we're on the last buffer, and so is the consumer, we can revert to just checking for null in the buffer.
- If we're on the last buffer, and the consumer isn't, we need to hang tight and let it pass. It's a temporary state.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protected final boolean offerColdPath(final E[] buffer, final long mask, final E e, final long index, | |
final long offset) { | |
final long lookAheadStep = this.lookAheadStep; | |
// normal case, go around the buffer or resize if full (unless we hit max capacity) | |
if (lookAheadStep > 0) { | |
long lookAheadElementOffset = calcElementOffset(index + lookAheadStep, mask); | |
// Try and look ahead a number of elements so we don't have to do this all the time | |
if (null == lvElement(buffer, lookAheadElementOffset)) { | |
producerBufferLimit = index + lookAheadStep - 1; // joy, there's plenty of room | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
// we're at max capacity, can use up last element | |
final int maxCapacity = maxQueueCapacity; | |
if (mask + 1 == maxCapacity) { | |
if (null == lvElement(buffer, offset)) { | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
// we're full and can't grow | |
return false; | |
} | |
// not at max capacity, so must allow extra slot for JUMP | |
if (null == lvElement(buffer, calcElementOffset(index + 1, mask))) { // buffer is not full | |
writeToQueue(buffer, e, index, offset); | |
} else { | |
// allocate new buffer of same length | |
final E[] newBuffer = allocate((int) (2*(mask +1) + 1)); | |
producerBuffer = newBuffer; | |
producerMask = newBuffer.length - 2; | |
final long offsetInNew = calcElementOffset(index, producerMask); | |
linkOldToNew(index, buffer, offset, newBuffer, offsetInNew, e); | |
int newCapacity = (int) (producerMask + 1); | |
if (newCapacity == maxCapacity) { | |
long currConsumerIndex = lvConsumerIndex(); | |
// use lookAheadStep to store the consumer distance from final buffer | |
this.lookAheadStep = -(index - currConsumerIndex); | |
producerBufferLimit = currConsumerIndex + maxCapacity - 1; | |
} else { | |
producerBufferLimit = index + producerMask - 1; | |
adjustLookAheadStep(newCapacity); | |
} | |
} | |
return true; | |
} | |
// the step is negative (or zero) in the period between allocating the max sized buffer and the | |
// consumer starting on it | |
else { | |
final long prevElementsInOtherBuffers = -lookAheadStep; | |
// until the consumer starts using the current buffer we need to check consumer index to | |
// verify size | |
long currConsumerIndex = lvConsumerIndex(); | |
int size = (int) (index - currConsumerIndex); | |
int maxCapacity = (int) mask+1; // we're on max capacity or we wouldn't be here | |
if (size == maxCapacity) { | |
// consumer index has not changed since adjusting the lookAhead index, we're full | |
return false; | |
} | |
// if consumerIndex progressed enough so that current size indicates it is on same buffer | |
long firstIndexInCurrentBuffer = producerBufferLimit - maxCapacity + prevElementsInOtherBuffers; | |
if (currConsumerIndex >= firstIndexInCurrentBuffer) { | |
// job done, we've now settled into our final state | |
adjustLookAheadStep(maxCapacity); | |
} | |
// consumer is still on some other buffer | |
else { | |
// how many elements out of buffer? | |
this.lookAheadStep = (int) (currConsumerIndex - firstIndexInCurrentBuffer); | |
} | |
producerBufferLimit = currConsumerIndex + maxCapacity; | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
} | |
private void adjustLookAheadStep(int capacity) { | |
lookAheadStep = Math.min(capacity / 4, SpscArrayQueue.MAX_LOOK_AHEAD_STEP); | |
} |
The lookAheadStep is dynamically adjusted as the buffer grows, and also acts as an indicator for the transition period which the producer is on the last buffer and the consumer is trailing. It's a mess to look at, but sadly performs better than a neater alternative which builds on the Chunked variant... General idea:
- lookAheadStep is positive => we are either not on last buffer, or on it for both consumer and producer => it is enough to consider the elements in the producer buffer to determine if the queue is full. In particular if the buffer is full then we must resize unless we are on the last buffer in which case we are full. Note that we allow using the last buffer to the last element, since we don't need a JUMP anymore.
- lookAheadStep is negative => we are waiting for consumer to get to the last buffer. We use the lookAheadStep to indicate how far behind the consumer is.
It's not complex, just messy, and if you got an elegant representation please ping me with your suggestions.
Performance?
GODDAMN it! this is the time consuming part! I've benchmarked on a few setups, but not kept track or clear records. I'll need to do it all again, might as well be another post since nobody reads this far into these things anyhow. La la la la la, performance is great, la la la la la, if I look now I might be disappointed, la la la la la, this is better than benchmarking, la la la la la.
Nitsan, Please stop singing, and show me the mon... benchamrks ;-)
ReplyDelete