RFR 8005311: Add Scalable Updatable Variables, DoubleAccumulator, DoubleAdder, LongAccumulator, LongAdder
Peter Levart
peter.levart at gmail.com
Tue Jan 8 15:01:31 UTC 2013
On 01/05/2013 07:10 PM, Chris Hegarty wrote:
> As part of JEP 155 we are proposing to add the following public
> classes to support Scalable Updatable Variables, DoubleAccumulator,
> DoubleAdder, LongAccumulator and LongAdder.
>
> These have been written by Doug Lea, with assistance from members of
> the former JCP JSR-166 Expert Group.
>
Hello Doug,
These are very interesting and useful classes already as they are. They
might be also used for other unintended purposes like scalable random
generators, or unique identifier generators if their API would expose
some more info. For example, I did an experiment and patched the code
paths used in LongAccumulator so that:
- accumulate(long x) returns the post-modification value of the modified
cell or base (the returned value of the function that was used to update
the state)
- the accumulator function is always called for initial allocations of
cells (with identity value as 1st argument, like when accumulating on
the base) - the original code optimizes this situation and just installs
the parameter x into the cell.
With this modified API and a little copy-and-paste, I could quickly
assemble the following MultiThreadedRandom:
public class MultiThreadedRandom extends Random {
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
private static final AtomicLong seedUniquifier
= new AtomicLong(8682522807148012L);
private static final LongSupplier defaultSeedSupplier = () -> {
// L'Ecuyer, "Tables of Linear Congruential Generators of
// Different Sizes and Good Lattice Structure", 1999
for (; ; ) {
long current = seedUniquifier.get();
long next = current * 181783497276652981L;
if (seedUniquifier.compareAndSet(current, next))
return next;
}
};
private final LongAccumulator rnd;
public MultiThreadedRandom() {
this(defaultSeedSupplier);
}
public MultiThreadedRandom(final LongSupplier seedSupplier) {
super(0L); // ignored
Objects.requireNonNull(seedSupplier);
this.rnd = new LongAccumulator(
(seed, _ignore) -> {
if (seed == 0L) seed = (seedSupplier.getAsLong() ^
multiplier) & mask;
return (seed * multiplier + addend) & mask;
},
0L
);
}
public void setSeed(long seed) {
if (rnd != null)
throw new UnsupportedOperationException();
}
@Override
protected int next(int bits) {
long nextseed = rnd.accumulate(0L);
return (int) (nextseed >>> (48 - bits));
}
}
...which has great scalability, although it is about 3x slower then
TreadLocalRandom.current().next()...
I'm also thinking about what would be possible to do with DoubleAdder it
it had a method like getAndAdd(double x) which returned the
pre-modification state of the cell/base. It might be possible to use two
DoubleAdders to devise a more accurate adder for example:
public class MyDoubleAdder {
private final DoubleAdder primary = new DoubleAdder();
private final DoubleAdder secondary = new DoubleAdder();
public void add(double x) {
double prev = primary.getAndAdd(x);
double next = prev + x;
double diff = next - prev;
double c = x - diff;
secondary.add(c);
}
public double sum()
{
return primary.sum() + secondary.sum();
}
}
Regards, Peter
P.S. Here's a patch I used:
diff --git a/jdk/src/java/util/concurrent/atomic/LongAccumulator.java
b/jdk/src/java/util/concurrent/atomic/LongAccumulator.java
index 720722e..e84b9b3 100644
--- a/jdk/src/java/util/concurrent/atomic/LongAccumulator.java
+++ b/jdk/src/java/util/concurrent/atomic/LongAccumulator.java
@@ -86,8 +86,9 @@ public class LongAccumulator extends Striped64
implements Serializable {
* Updates with the given value.
*
* @param x the value
+ * @return the returned value of the accumulator function that was
used to update the state of this accumulator
*/
- public void accumulate(long x) {
+ public long accumulate(long x) {
Cell[] as; long b, v, r; CellHashCode hc; Cell a; int m;
if ((as = cells) != null ||
(r = function.applyAsLong(b = base, x)) != b &&
!casBase(b, r)) { // ## rename when JDK8 syncs with lambda, applyAsLong
@@ -98,8 +99,9 @@ public class LongAccumulator extends Striped64
implements Serializable {
!(uncontended =
(r = function.applyAsLong(v = a.value, x)) == v ||
// ## rename when JDK8 syncs with lambda, applyAsLong
a.cas(v, r)))
- longAccumulate(x, hc, function, uncontended);
+ r = longAccumulate(x, hc, identity, function, uncontended);
}
+ return r;
}
/**
diff --git a/jdk/src/java/util/concurrent/atomic/LongAdder.java
b/jdk/src/java/util/concurrent/atomic/LongAdder.java
index f459dcc..c57afc5 100644
--- a/jdk/src/java/util/concurrent/atomic/LongAdder.java
+++ b/jdk/src/java/util/concurrent/atomic/LongAdder.java
@@ -98,7 +98,7 @@ public class LongAdder extends Striped64 implements
Serializable {
as == null || (m = as.length - 1) < 0 ||
(a = as[m & hc.code]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
- longAccumulate(x, hc, null, uncontended);
+ longAccumulate(x, hc, 0L, null, uncontended);
}
}
diff --git a/jdk/src/java/util/concurrent/atomic/Striped64.java
b/jdk/src/java/util/concurrent/atomic/Striped64.java
index 6a87020..4adde3c 100644
--- a/jdk/src/java/util/concurrent/atomic/Striped64.java
+++ b/jdk/src/java/util/concurrent/atomic/Striped64.java
@@ -213,13 +213,15 @@ abstract class Striped64 extends Number {
*
* @param x the value
* @param hc the hash code holder
+ * @param identity the identity value used when {@code fn} is non-null
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
- final void longAccumulate(long x, CellHashCode hc,
- LongBinaryOperator fn,
+ final long longAccumulate(long x, CellHashCode hc,
+ long identity, LongBinaryOperator fn,
boolean wasUncontended) {
+ long res;
int h;
if (hc == null) {
hc = new CellHashCode();
@@ -235,7 +237,8 @@ abstract class Striped64 extends Number {
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
- Cell r = new Cell(x); // Optimistically create
+ Cell r = new Cell(res = ((fn == null) ? x :
+ fn.applyAsLong(identity, x))); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
@@ -258,7 +261,7 @@ abstract class Striped64 extends Number {
}
else if (!wasUncontended) // CAS already known
to fail
wasUncontended = true; // Continue after rehash
- else if (a.cas(v = a.value, ((fn == null) ? v + x :
+ else if (a.cas(v = a.value, res = ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // ## rename when JDK8 syncs with lambda,
applyAsLong
break;
else if (n >= NCPU || cells != as)
@@ -284,25 +287,24 @@ abstract class Striped64 extends Number {
h ^= h << 5;
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
- boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
- rs[h & 1] = new Cell(x);
+ rs[h & 1] = new Cell(res = ((fn == null) ? x :
+ fn.applyAsLong(identity, x)));
cells = rs;
- init = true;
+ break;
}
} finally {
cellsBusy = 0;
}
- if (init)
- break;
}
- else if (casBase(v = base, ((fn == null) ? v + x :
+ else if (casBase(v = base, res = ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) //
## rename when JDK8 syncs with lambda, applyAsLong
break; // Fall back on using base
}
hc.code = h; // Record index for
next time
+ return res;
}
/**
More information about the core-libs-dev
mailing list