Friday, 21 June 2013

Java Concurrent Counters By Numbers

{UPDATE 03/09/14: If you come here looking for JMH related content start at the new and improved JMH Resources Page and branch out from there!}
It is common practice to use AtomicLong as a concurrent counter for reporting such metrics as number of messages received, number of bytes processed etc. Turns out it's a bad idea in busy multi-threaded environments... I explore some available/up-and-coming alternatives and present a proof of concept for another alternative of my own.
When monitoring an application we often expose certain counters via periodical logging, JMX or some other mechanism. These counters need to be incremented and read concurrently, and we rely on the values they present for evaluating load, performance or similar aspects of our systems. In such systems there are often business threads working concurrently on the core application logic, and one or more reporting/observing threads which periodically sample the event counters incremented by the business threads.
Given the above use case, lets explore our options (or if you are the impatient kind skip to the summary and work your way backwards).

Atomic Mother: too much love

We can use AtomicLong, and for certain things it's a damn fine choice. Here's what happens when you count an event using AtomicLong (as of JDK7, see comments for JDK8 related discussion):
This is a typical CAS retry loop. To elaborate:

  1. Volatile read current value
  2. Increment
  3. Compare and swap current with new
  4. If we managed to swap, joy! else go back to 1
Now imagine lots of threads hitting this code, how will we suffer?
  • We could loop forever (in theory)!  
  • We are bound to experience the evil step sister of false sharing, which is real sharing. All writer threads are going to be contending on this memory location and thus one cache line and digging their heels until they get what they want. This will result is much cache coherency traffic.
  • We are using CAS (translates to lock cmpxchg on intel, read more here) which is a rather expensive instruction (see the wonderful Agner reference tables). This is much like a volatile write in essence.
But the above code also has advantages:
  • As it says on the tin, this is an atomic update of the underlying value. That value will go through each sequential value and will only continue execution once we ticked that value.
  • It follows that values returned by this method are unique.
  • The code is readable, the complex underside neatly cared for by the JMM(Java Memory Model) and a CAS.
There's nothing wrong with AtomicLong, but it seems like it gives us more than we asked for in this use case, and thus delivers less in the way of writer performance. This is your usual tradeoff of specialisation/generalisation vs performance. To gain performance we can focus our requirements a bit:
  1. Writer performance is more important than reader performance: We have many logic threads incrementing, they need to suffer as little as possible for the added reporting requirement.
  2. We don't require a read for each write, in fact we're happy with just incrementing the counters: We can live without the atomic joining of the inc and the get.
  3. We're willing to sacrifice the somewhat theoretical accuracy of the read value if it makes things better: I say theoretical because at the end of the day if you are incrementing this value millions of times per second you are not going to report on each value and if you try and capture the value non-atomically then the value reported is inherently inaccurate.
The above use case is in fact benchmarked in the Asymmetric example bundled with JMH. Running a variation with N writers and one reader we get the following (code is here):
AtomicCounterBenchmark.rw:get(N=1)  - 4.149 nsec/op
AtomicCounterBenchmark.rw:inc(N=1)  - 105.439 nsec/op

AtomicCounterBenchmark.rw:get(N=11) - 43.421 nsec/op
AtomicCounterBenchmark.rw:inc(N=11) - 748.520 nsec/op
AtomicCounterBenchmark.rw:get(N=23) - 115.268 nsec/op
AtomicCounterBenchmark.rw:inc(N=23) - 2122.446 nsec/op

Cry beloved writers! the reader pays nothing while our important business threads thrash about! If you are the sort of person who finds joy in visualising concurrent behaviour think of 11 top paid executives frantically fighting over a pen to fill in an expenses report to be read by some standards committee.

WARNING: Hacks & Benchmarks ahead!

In order to make the JDK8 classes used below run on JDK7 and in order to support some of my own code used below I had to run the benchmarks with my own Thread class. This led to the code included being a hack of the original JDK8 code. A further hack was required to make JMH load my thread factory instead of the one included in JMH. The JMH jar included is my hacked version and the changed class is included in the project as well. All in the name of experimentation and education...
The benchmarks were all run on an intel Xeon dual socket, 24 logical core machine. Because of the way JMH groups work (you have to specify the exact number of threads for asymmetric tests) I benchmarked on the above setup and did not bother going further(1/11/23 writers to 1 reader). I did not benchmark in situations were the number of threads exceeds the number of cores. Your mileage may wildly vary etc, etc, etc. The code is on GitHub as well as the results summary and notes on how the benchmarks were run.

The Long Adder

If ever there was a knight in shining armour for all things java concurrency it is Doug Lea, and Doug has a solution to the above problem coming to you in JDK 8 (or sooner if you want to grab it and use it now) in the form of LongAdder (extends Striped64). LongAdder serves the use case illustrated above by replacing the above increment logic with something a bit more involved:

Well... alot more involved really. 'Whatup Doug?' you must be asking yourself (I assume, maybe it makes instant sense to you, in which case shut up smart asses). Here's a quick break down:
Look! No contention!
  • LongAdder has a field called cells which is an array of Cells whose length is a power of 2, each Cell is a padded against false sharing volatile long (padding is counting on field order, they should probably have used inheritance to enforce that, like JMH's BlackHole). Kudos to Doug for the padded cells pun!
  • LongAdder has a field called base which is a volatile long (no padding here), 
  • if ((as = cells) != null || !casBase(b = base, b + x)) - Load the volatile cells field into a local variable. If it's not null keep going. If it is null try and CAS the locally read value of base with the incremented value. In other words, if we got no cells we give the old AtomicLong method a go. If there's no contention this is where it ends. If there is contention, the plot thickens...
  • So we've either hit contention now or before, our next step is to find our thread's hash, to be used to find our index in the cells array.
  • if (as == null || (n = as.length) < 1 || (a = as[(n - 1) & h]) == null || !(uncontended = a.cas(v = a.value, v + x))) - a big boolean expression, innit? let's elaborate:
    • as == null - as is the local cells copy, if it's null we ain't got no cells.
    • (m = as.length - 1) < 0 - we have a cells array, but it's empty. Compute mask while we're here.
    • (a = as[getProbe() & m]) == null - get the cell at our thread index (using the old mask trick to avoid modulo), check if it's null.
    • !(uncontended = a.cas(v = a.value, v + x)) - we got a cell and it's not null, try and CAS it's value to the new value.
  • So... we either have no cell for this thread, or we failed to increment the value for this thread, lets call longAccumulate(x, null, uncontended) where the plot thickens
If you thought the above was complex, longAccumulate will completely fry your noodle. Before I embark on explaining it, let us stop and admire the optimisations which go into each line of code above, in particular note the careful effort made to avoid redundant reads of any data external to the function and the need to cache locally all values of volatile reads.
So... what does longAccumulate do? The code explains itself:

Got it? Let's step through (I'm not going to do line by line for it, no time):
  • Probe is a new field on Thread, put there to support classes which require Thread id hashing. It's initialised/accessed via  ThreadLocalRandom and used by ForkJoin Pool and ConcurrentHashMap (This is JDK8, none of it happens yet in JDK7 land), getProbe is stealing it directly from Thread using Unsafe and field offset, cheeky! in my version I added a field to my CounterThread to support this.
  • If there's no array we create it. The cells array creation/replacement is a contention point shared between all threads, but should not happen often. We use cellsBusy as a spin lock. The cells array is sort of a copy on write array if you will, but the write spin lock is not the array, it's cellsBusy.
  • If there's no cell at our index we create one. We use cellsBusy to lock for writing to the table.
  • If we contend on the cell we expand the cells array, the array is only expanded if it's smaller than the number of available CPUs.
  • If CAS on the cell we have fails we modify our probe value in hope of hitting an empty/uncontended slot.
The above design (as I read it) is about compromise between contention, CPU cycles and memory. We could for instance simplify it by allocating all the cells upfront and set the number of cells upfront to the number of CPUs (nearest power of 2 >=  NCPU to be exact). But imagine this counter on a 128 CPU beast, we've just allocated 128 * (56 * 2 + 8 + change) bytes -> 14k for a counter that may or may not be contended. We pay for our frugal approach every time we hit contention between threads. On the plus side, every time we grow the cells array the likelihood of collision goes down. To further help things settle, threads will change cells to eventually find less contended cells if their probe value is an issue.
Here's a discussion between Doug and Nathan Raynolds on the merits of having per CPU counters which may help shed light on the design further. I got the feeling from reading it that the cells are an abstraction of the CPU counters, and given no facility in the language to discover processor ID on the fly it seems like a good approach. Doug also mentions @Contended as a means to replace the manual padding, which is probably on it's way.
I can think of 2 ways to improve the above slightly:
  1. Use an array of longs instead of cells, padding by leaving slots of the array empty. This is essentially the approach taken by Cliff Click here: ConcurrentAutoTable (One reviewer pointed out the CAT vs Doug pun...). This should reduce the space requirement (only need half the padding) and will inline the values in the array (one less pointer to chase). If we had value type/structs array support in Java we would need this. The even stride in memory should also improve the read performance.
  2. Under the assumption of cells correctly reflecting the CPU affinity of threads we could co-host counters in the same cell. Of-course if the same cell is hit from several processors we've just re-introduced false sharing :(... maybe hold off on that one until we have processor ID.
Assuming all threads settle on top of their respective CPUs and no contention happens, this implementation is still bound by doing volatile writes and the logistics surrounding the cell acquisition.
How does it perform (benchmark code is here)?
LongAdderCounterBenchmark.rw:get(N=1)  - 10.600 nsec/op
LongAdderCounterBenchmark.rw:inc(N=1)  - 249.703 nsec/op
LongAdderCounterBenchmark.rw:get(N=11) - 752.561 nsec/op
LongAdderCounterBenchmark.rw:inc(N=11) - 44.696 nsec/op
LongAdderCounterBenchmark.rw:get(N=23) - 1380.936 nsec/op
LongAdderCounterBenchmark.rw:inc(N=23) - 38.209 nsec/op

Nice! low cost updates and high (but acceptable) read cost. This better reflects our use case, and if your are implementing performance counters in your application you should definitely consider using a back ported LongAdder. This is the approach taken in in Yammer Metrics project. Their Counter holds a back ported LongAdder whose performance is (code here):
LongAdderBackPortedCounterBenchmark.rw:get(N=1)  - 5.854 nsec/op
LongAdderBackPortedCounterBenchmark.rw:inc(N=1)  - 192.414 nsec/op
LongAdderBackPortedCounterBenchmark.rw:get(N=11) - 704.011 nsec/op
LongAdderBackPortedCounterBenchmark.rw:inc(N=11) - 40.886 nsec/op
LongAdderBackPortedCounterBenchmark.rw:get(N=23) - 1188.589 nsec/op
LongAdderBackPortedCounterBenchmark.rw:inc(N=23) - 38.309 nsec/op

Perhaps because of the changes between the 2 versions, or because LongAdder was designed to run on a JDK8 VM the results favour the backport over the JDK8 version when running on a JDK7 install. Note that the 1 writer/reader case performs badly, far worse than AtomicLong. On the high contention path this reverts to expected behaviour.

For completeness I tested how Cliff's CAT (I took the code from the link provided earlier) performs (here's the benchmark). CAT has a nice _sum_cache field which is used to avoid reading the table if it has not been invalidated. This saves the reader alot of work at theoretically little cost for the writers. The results I got running the benchmark varied wildly with get() looking great when inc() wasn't and get() performing very badly when inc() performs superbly. I suspect the shortcut above to be at the heart of this instability, but have not had time to get to the root of it. I exclude the results as they varied too wildly from run to run to be helpful.

Why Should You Care?

Why indeed? Why are Doug and Cliff worried? if you've watched the Future Of The VM talk featuring those very same dudes, you'd have noticed the future has many many many CPUs in it. And if we don't want to hit a wall with our data structures which worked so well back when we had only 4 CPUs to worry about then we need to get cracking on writing some mighty parallel code :-)
And if it turns out your performance counters are in fact the very reason you can't perform, wouldn't that be embracing? We need to be able to track our systems vital stats without crippling these systems, here is a paper from Mr. Dave Dice (follow his blog, great stuff) and friends looking at scalable performance counters. Let me summarise for my more lazy readers:
  • "commonly-used naive concurrent implementations quickly become problematic, especially as thread counts grow, and as they are used in increasingly Non-Uniform Memory Access (NUMA) systems." - The introduction goes on to explore AtomicLong type counters and non-threadsafe hacks which lose updates. Both are unacceptable. Using LOCK ADD  will not save you - "Experiments confirm our intuition that they do not adequately addresses the shortcomings of the commonly used naive counters. "
  • Section 2.1 "Minimal Space Overhead" while interesting is not really an option for a Java hacker as it requires us to know which NUMA node we are on. In future, when processor ID is available to us we may want to go back to that one. Further more conflicts are resolved by letting updaters wait.
  • Section 2.2 "Using A little More Space" outlines something similar to LongAdder above, but again using NUMA node IDs to find the slot and contend locally on it. It then boils down to these implementations - "suffer under high contention levels because of contention between threads on the same node using a single component." - not surprising given that a NUMA node can have 12 logical cores or more hitting that same component, CASing away.
  • "won't be nothing you can't measure anymore"
  • Section 3 onward are damn fascinating, but are well beyond the scope of this humble post. Dice and friends go on to explore statistical counters which using fancy maths reduce the update rate while maintaining accuracy. If you find 'fancy maths' unsatisfying, JUST READ IT!!! The end approach is a combination of statistical counters and the original LongAdder style counters. It is balancing CPU cycles vs. the cost of contention for the sake of memory efficiency and finding the right balance.
The reason I mention this article is to highlight the validity of the use case. Writes MUST be cheap, reads are less of a worry. The more cores are involved, the more critical this issue becomes. The future, and in many ways the present on the server side, is massively parallel environments. We still need counters in these environments, and they must perform. Can we get better performance if we were to give up some of the features offered by LongAdder?

TLC: Love does not stand sharing

What the LongAdder is trying to avoid, and the above article is side stepping as a viable option, is having a counter copy per thread. If we could have ProcessorLocal counters then that would probably be as far as we need to go, but in the absence of that option should we not consider per thread counters? The down side as one would expect is memory consumption, but if we were to off-balance the padding against the duplicates then there are use cases where per thread counters may prove more memory efficient. The up side is that we need not worry about contention, and can replace the costly CAS with the cheap lazySet (see previous post on the topic). Even if we pad, there are ways to minimize the padding such that all counters share a given thread's padding. If you work in an environment where you typically have fixed sized thread pools executing particular tasks, and performance is critical to what you do then this approach might be a good fit.
The simplest approach is to use ThreadLocal as a lookup for the current thread view of the shared counter and have get() sum up the different thread copies. This ThreadLocalCounter (with padding) scales surprisingly well:
ThreadLocalCounterBenchmark.rw:get(N=1)  - 61.563 nsec/op
ThreadLocalCounterBenchmark.rw:inc(N=1)  - 15.138 nsec/op
ThreadLocalCounterBenchmark.rw:get(N=11) - 725.532 nsec/op
ThreadLocalCounterBenchmark.rw:inc(N=11) - 12.898 nsec/op
ThreadLocalCounterBenchmark.rw:get(N=23) - 1584.209 nsec/op
ThreadLocalCounterBenchmark.rw:inc(N=23) - 16.384 nsec/op

A room of one's own
There are minor details worth considering here such as how ThreadLocal copies are added to the pool of counters and how to deal with dead threads. While joining is easily done via ThreadLocal.initialValue, there is sadly no cleanup callback to utilise on on Thread termination which means dead counters can collect in the pool. In an environment in which threads live short and careless lives only to be replaced by equally foolish threads this approach will not scale as counters are left stuck in the array. A simple solution here is to have the pool check for dead counters when new threads join. Counters may still be left in the pool long after the Thread died, but it will do a rudimentary check for leftovers on each thread added. Another solution is to do the check on each get(), but this will drive the cost of get() up. Yet another approach is to use WeakReferences to determine counters dropping off. Pick whichever suits you, you could even check on each get if it's seldom enough. This post is getting quite long as is so I'll skip the code, but here it is if you want to read it.
For all it's simplicity the above code delivers a triple the performance of LongAdder for writers at the cost of allocating counters per thread. With no contention LongAdder will deliver the cost of a single AtomicLong, but TLC will cost the same. The get() performance is similar to LongAdder, but note that contended or not the get() performance of the TLC is constant and gets worse with every thread added, where the LongAdder will only deteriorate with contention, and only up to the number of CPUs.

Become a member today!

A more specific solution requires us to add a counter field to our thread. This saves us the lookup cost, but may still leave us exposed to false sharing should our thread field be updated from other thread (read this concurrency-interest debate on making Thread @Contended). Here's how this one performs (benchmark is here):
ThreadFieldCounterBenchmark.rw:get(N=1)  - 25.264 nsec/op
ThreadFieldCounterBenchmark.rw:inc(N=1)  - 3.723 nsec/op
ThreadFieldCounterBenchmark.rw:get(N=11) - 483.723 nsec/op
ThreadFieldCounterBenchmark.rw:inc(N=11) - 3.177 nsec/op
ThreadFieldCounterBenchmark.rw:get(N=23) - 952.433 nsec/op
ThreadFieldCounterBenchmark.rw:inc(N=23) - 3.045 nsec/op

This is 10 times faster than LongAdder on the inc(), and 700 times faster than AtomicLong when the shit hits the fan. If you are truly concerned about performance to such an extreme this may be the way to go. In fact this is exactly why ThreadLocalRandom is implemented using a field on Thread in JDK8.
A cooking pun!
Now, cooking your own Thread classes is certainly not something you can rely on in every environment. Still it may prove an interesting option for those with more control over their execution environments. If you are at liberty to add one field, just bunch the lot with no padding between them on your thread and pad the tail to be safe. As they are all written to from the same thread there is no contention to worry about and you may end up using less space than the LongAdder.

Summary

For all counters results reflect 1/11/23 writers busily updating a counter while a single reader thread reads:

  • AtomicLong -> One counter for all threads -> fast get(4ns/43ns/115ns) but slow inc(105ns/748ns/2122ns). inc() is slower as contention grows. Least memory required. 
  • LongAdder -> A table of counters -> slow get(10ns/752ns/1380ns) but faster inc(105ns/40ns/38ns). get() is slower as contention grows, inc() is slow when no cells are present, but settles on a fast figure once it gets going. Memory consumption increases with contention, but little overhead for no contention. Cells are padded so increase has 122b overhead. 
  • ThreadLocalCounter -> Every thread has a counter in it's ThreadLocal map -> slow get(61ns/725ns/1584ns) but even faster inc(15ns/12ns/16ns). get() is slower as the number of threads grow, inc() is pretty contant and low cost. Memory consumption increases with every thread added, regardless of contention. Counters are padded so each has 122b overhead. It is suggested you use a shared counters class to amortise the padding cost.
  • ThreadFieldCounter -> Every thread has a counter field -> get(25ns/483ns/952ns) is slower than AtomicLong, but better than the rest. The fastest inc(4ns/3ns/3ns). get() is slower as the number of threads grow, inc() is contant and low cost. Memory consumption increases with every thread added, regardless of contention. Counters are not padded but you may want to pad them.

I believe that for certain environments the counter per thread approach may prove a good fit. Furthermore for highly contended counters, where LongAdder/similar will expand I believe that this approach can be extended to provide a more memory efficient alternative when the thread sets are fixed or limited and a group of counters is required rather then just one counter.

UPDATE (1/08/2013): If you are looking for more JMH related info, see Shipilev's slides from JVMLS.