[concurrency-interest] Numerical Stream code
Howard Lovatt
howard.lovatt at gmail.com
Thu Feb 21 01:39:21 PST 2013
Hi,
I have taken a look at the Numerical Stream Code example previously posted
for the serial case, hence no cross posting to the concurrency interest
group. I tried a number of different programming styles and the timings are
below and the code at the end:
CLike: time = 2329 ms, result = 99.99581170383331
SerialStream: time = 20441 ms, result = 99.99581170383331
SerialStreamMock: time = 5870 ms, result = 99.99581170383331
SerialStreamManualAverage: time = 18296 ms, result = 99.99581170383331
SerialStreamRange: time = 21775 ms, result = 99.99581170383331
SerialStreamInLine: time = 21775 ms, result = 99.99581170383331
SerialStreamCollect: time = 21754 ms, result = 99.99581170383331
1. CLike is not using streams at all and is the base case
2. SerialStream is using the Stream API in a 'natural' way; 9 times slower
than base
3. SerialStreamMock is using a quick and dirty stream implementation; 3
times slower than base but 3 times quicker than Stream API
4. SerialStreamManualAverage is as SerialStream but instead of calling
average() the calculation from CLike is used; little difference
5. SerialStreamRange uses Streams.intRange instead of Arrays.indices;
little difference
6. SerialStreamInLine inlines the function calls (as suggested by Brian
Goetz); little difference
7. SerialStreamCollect replaces average with a collect call (as suggested
by Brian Goetz); little difference
So I am left with the overhead of the stream API being larger than I
expected; I was expecting more like the overhead of SerialStreamMock, not 3
times worse again. It is probably me doing something wrong so any
suggestions greatly appreciated.
Thanks is advance:
-- Howard.
=======================================================================
package fdm;
import static java.lang.System.*;
import java.util.Arrays;
import java.util.OptionalDouble;
import java.util.function.BinaryOperator;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.IntUnaryOperator;
import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Streams;
/**
* Numerical solution to: alpha d^2u/dt^2=du/dt for alpha = 1, u(t,0)=100,
u(t,1)=100, u(0,x)=0, see
* Hornbeck, "Numerical Methods", section 11.1, pp 283. Apples an
explicit-finite-difference-method
* kernel as a specific example of applying a kernel. Applying a kernel to
a vector/matrix of data
* is a common numerical operation.
*
* @author Howard Lovatt
*/
public final class FDMSerial {
// Problem constants
private static final double alpha = 1;
private static final double uT0 = 100;
private static final double uT1 = 100;
private static final double u0X = 0;
private static final double maxT = 1;
private static final double deltaX = 0.001; // <= 1
private static final double deltaT = 0.75 * deltaX * deltaX / 2 / alpha;
// 0.75 is a typical constant used, must be < 1 and > 0
private static final double k1 = alpha * deltaT / (deltaX * deltaX); //
Constant in front of deltaT / 2
private static final double k2 = 1 - 2 * k1; // 1 - constant in front of
deltaT
private static final int numXs = (int) (1 / deltaX) + 1;
private static final int numTs = (int) (maxT / deltaT) + 1;
@FunctionalInterface public interface Kernel1D { double u00(double uM1M1,
double uM10, double uM1P1); }
private static final Kernel1D explicitFDM = (uM1M1, uM10, uM1P1) -> k1 *
(uM1M1 + uM1P1) + k2 * uM10;
@FunctionalInterface private interface Style { double solve(); }
private enum Styles implements Style {
CLike {
@Override public double solve() {
uM1[0] = uT0; // t = 0
for (int xi = 1; xi < numXs - 1; xi++) { uM1[xi] = u0X; }
uM1[numXs - 1] = uT1;
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
u0[0] = uT0; // x = 0
for (int xi = 1; xi < numXs - 1; xi++) { u0[xi] =
explicitFDM.u00(uM1[xi - 1], uM1[xi], uM1[xi + 1]); } // 0 < x < 1
u0[numXs - 1] = uT1; // x = 1
}
double sum = 0; // Calculate average of last us
for (final double u : uM1) { sum += u; }
return sum / numXs;
}
},
SerialStream {
@Override public double solve() {
Arrays.indices(uM1).forEach(this::t0);
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
Arrays.indices(uM1).forEach(this::tg0);
}
return Arrays.stream(uM1).average().getAsDouble(); // Really slow!
}
},
SerialStreamMock {
@Override public double solve() {
IntStreamMock.indices(uM1).forEach(this::t0);
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
IntStreamMock.indices(uM1).forEach(this::tg0);
}
return Arrays.stream(uM1).average().getAsDouble();
}
},
SerialStreamManualAverage {
@Override public double solve() {
Arrays.indices(uM1).forEach(this::t0);
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
Arrays.indices(uM1).forEach(this::tg0);
}
double sum = 0;
for (final double u : uM1) { sum += u; }
return sum / numXs;
}
},
SerialStreamRange {
@Override public double solve() {
Streams.intRange(0, numXs).forEach(this::t0);
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
Streams.intRange(0, numXs).forEach(this::tg0);
}
return Arrays.stream(uM1).average().getAsDouble(); // Really slow!
}
},
SerialStreamInLine {
@Override public double solve() {
Arrays.indices(uM1).forEach( (xi) -> {
if (xi == 0) { uM1[0] = uT0; }
else if (xi == numXs - 1) { uM1[numXs - 1] = uT1; }
else { uM1[xi] = u0X; }
} );
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
Arrays.indices(uM1).forEach( (xi) -> {
if (xi == 0) { u0[0] = uT0; }
else if (xi == numXs - 1) { u0[numXs - 1] = uT1; }
else { u0[xi] = explicitFDM.u00(uM1[xi - 1], uM1[xi], uM1[xi +
1]); }
} );
}
return Arrays.stream(uM1).average().getAsDouble(); // Really slow!
}
},
SerialStreamCollect {
@Override public double solve() {
Arrays.indices(uM1).forEach(this::t0);
for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
Arrays.indices(uM1).forEach(this::tg0);
}
final double[] sum = Arrays.stream(uM1).collectUnordered( new
Collector.OfDouble<double[]>() {
public Supplier<double[]> resultSupplier() { return () -> {
return new double[1]; }; }
public ObjDoubleConsumer<double[]> doubleAccumulator() { return
(sum, value) -> { sum[0] += value; }; }
public BinaryOperator<double[]> combiner() { return (sum1, sum2)
-> {
sum1[0] += sum2[0];
return sum1;
}; }
} ); // Also tried sum, reduce, & collect - all really slow
including collectUnordered
return sum[0] / numXs;
}
};
double[] u0 = new double[numXs];
double[] uM1 = new double[numXs];
double[] uTemp;
void t0(final int xi) {
if (xi == 0) { uM1[0] = uT0; }
else if (xi == numXs - 1) { uM1[numXs - 1] = uT1; }
else { uM1[xi] = u0X; }
}
void tg0(final int xi) {
if (xi == 0) { u0[0] = uT0; }
else if (xi == numXs - 1) { u0[numXs - 1] = uT1; }
else { u0[xi] = explicitFDM.u00(uM1[xi - 1], uM1[xi], uM1[xi + 1]); }
}
}
private static void debug(final int ti, final double[] us) {
out.println(ti + ": " + Arrays.toString(us)); }
private static void time(final Style s) {
gc(); // Paranoid precaution
gc();
final long start = currentTimeMillis();
final double result = s.solve();
final long end = currentTimeMillis();
out.println(s + ": time = " + (end - start) + " ms, result = " +
result);
}
/**
* Time implementations and then print the time and the result for each.
*
* @param notUsed the command line arguments are ignored
*/
public static void main(final String... notUsed) throws Exception {
// Arrays.stream(Styles.values()).forEach( (s) -> s.solve() ); // Warm
up - doesn't seem to make any difference!
Arrays.stream(Styles.values()).forEach(FDMSerial::time);
}
/* Stuff for stream mocks. Which I have tried to make representative so
that a half decent
* comparison with Stream can be made, but to keep the size down I have
only priovided a few
* methods. The mock steams have one abstract method next(), ontop of
which everything else is
* built. The JVM kicked up when I tried to use interfaces with default
methods, therefore
* abstract classes used instead and forgo lambdas :(. */
private static final class Break extends Exception {
private static final long serialVersionUID = 201302211810L;
private Break() {}
}
private static final Break BREAK = new Break();
private static final class Continue extends Exception {
private static final long serialVersionUID = 201302211820L;
private Continue() {}
}
private static final Continue CONTINUE = new Continue();
private static abstract class IntStreamMock {
abstract int next() throws Break, Continue;
void forEach(final IntConsumer consumer) {
try {
for (;;) {
try { consumer.accept(next()); }
catch (final Continue c) {}
}
}
catch (final Break e) {}
}
IntStreamMock map(final IntUnaryOperator unary) {
return new IntStreamMock() {
@Override public int next() throws Break, Continue {
return unary.applyAsInt(IntStreamMock.this.next());
}
};
}
// ...
static IntStreamMock indices(final double[] array) { return
indices(array.length); }
static IntStreamMock indices(final int size) {
return new IntStreamMock() {
int index = 0;
@Override public int next() throws Break {
if (index == size) { throw BREAK; }
return index++;
}
};
}
// ...
}
private static abstract class DoubleStreamMock {
abstract double next() throws Break, Continue;
void forEach(final DoubleConsumer consumer) {
try {
for (;;) {
try { consumer.accept(next()); }
catch (final Continue c) {}
}
}
catch (final Break e) {}
}
OptionalDouble average() {
final long size[] = new long[1];
size[0] = 0;
final double[] sum = new double[1];
sum[0] = 0;
forEach( (v) -> { sum[0] += v; size[0]++; } );
if (size[0] == 0) { return OptionalDouble.empty(); }
return OptionalDouble.of(sum[0] / size[0]);
}
// ...
static DoubleStreamMock stream(final double[] array) {
return new DoubleStreamMock() {
final int size = array.length;
int index = 0;
@Override public double next() throws Break {
if (index == size) { throw BREAK; }
return array[index++];
}
};
}
// ...
}
}
On 16 February 2013 08:13, Brian Goetz <brian.goetz at oracle.com> wrote:
> Try computing average as:
>
> int[] collected = stream.collect(() -> new int[2],
> (arr, i) -> { arr[0] += i; arr[1]++; }
> (a1, a2) -> { a1[0] += a2[0]; a1[1] += a2[1] });
> double avg = (collected[1] == 0)
> ? 0
> : (double) collected[0]/collected[1];
>
> since the current implementation of average() is doing way more work than
> you need.
>
>
> On 2/15/2013 3:26 PM, Howard Lovatt wrote:
>
>> Hi,
>>
>> Thanks for all the replies. This is largely a holding email. I am
>> travelling with work and don't have my laptop. When get home I will post
>> some more code.
>>
>> @Jin: I did warm up the code, but I do agree that benchmarks are tricky.
>> As I said I was expecting some overhead but was surprised at how much.
>>
>> @Brian: The reason I factored t0 and tg0 out into methods is that they
>> are common between the serial and parallel versions and I thought the
>> code read better. I don't think it makes any difference, but I will check.
>>
>> @Others: To avoid writing over an old array I will have to allocate each
>> time round the t loop. I will give this a try and see if it helps. The
>> discussion about the parallel problems is interesting, but how come the
>> serial version is so slow? Could a problem with the Stream code in
>> general be the underlying problem with the parallel version?
>>
>> Sent from my iPad
>>
>> On 15/02/2013, at 3:48 AM, Stanimir Simeonoff <stanimir at riflexo.com
>> <mailto:stanimir at riflexo.com>> wrote:
>>
>>
>>> > Do element sizes matter (byte vs. short vs. int vs. long)?
>>>
>>> I don't think so. All of this assumes that the proper instruction
>>> is used. For example, if 2 threads are writing to adjacent bytes,
>>> then the "mov" instruction has to only write the byte. If the
>>> compiler, decides to read 32-bits, mask in the 8-bits and write
>>> 32-bits then the data will be corrupted.
>>>
>>> JLS mandates no corruption for neighbor writes.
>>>
>>> I believe that HotSpot will only generate the write byte mov
>>> instruction.
>>>
>>> That would be the correct one. The case affects only
>>> boolean[]/byte[]/short[]/char[**] as simple primitive fields are always
>>> at least 32bits.
>>>
>>> Stanimir
>>>
>>>
>>> Nathan Reynolds
>>> <http://psr.us.oracle.com/**wiki/index.php/User:Nathan_**Reynolds<http://psr.us.oracle.com/wiki/index.php/User:Nathan_Reynolds>>
>>> |
>>> Architect | 602.333.9091 <tel:602.333.9091>
>>> Oracle PSR Engineering <http://psr.us.oracle.com/> | Server
>>> Technology
>>>
>>> On 2/14/2013 8:56 AM, Peter Levart wrote:
>>>
>>>> On 02/14/2013 03:45 PM, Brian Goetz wrote:
>>>>
>>>>> The parallel version is almost certainly suffering false cache
>>>>>> line
>>>>>> sharing when adjacent tasks are writing to the shared arrays
>>>>>> u0, etc.
>>>>>> Nothing to do with streams, just a standard parallelism gotcha.
>>>>>>
>>>>> Cure: don't write to shared arrays from parallel tasks.
>>>>>
>>>>>
>>>>> Hi,
>>>>
>>>> I would like to discuss this a little bit (hence the cc:
>>>> concurrency-interest - the conversation can continue on this list
>>>> only).
>>>>
>>>> Is it really important to avoid writing to shared arrays from
>>>> multiple threads (of course without synchronization, not even
>>>> volatile writes/reads) when indexes are not shared (each thread
>>>> writes/reads it's own disjunct subset).
>>>>
>>>> Do element sizes matter (byte vs. short vs. int vs. long)?
>>>>
>>>> I had a (false?) feeling that cache lines are not invalidated
>>>> when writes are performed without fences.
>>>>
>>>> Also I don't know how short (byte, char) writes are combined into
>>>> memory words on the hardware when they come from different cores
>>>> and whether this is connected to any performance issues.
>>>>
>>>> Thanks,
>>>>
>>>> Peter
>>>>
>>>> ______________________________**_________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.**oswego.edu<Concurrency-interest at cs.oswego.edu>
>>>> <mailto:Concurrency-interest@**cs.oswego.edu<Concurrency-interest at cs.oswego.edu>
>>>> >
>>>> http://cs.oswego.edu/mailman/**listinfo/concurrency-interest<http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>>
>>>
>>>
>>> ______________________________**_________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.**oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> <mailto:Concurrency-interest@**cs.oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> >
>>>
>>> http://cs.oswego.edu/mailman/**listinfo/concurrency-interest<http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>
>>>
>>> ______________________________**_________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.**oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> <mailto:Concurrency-interest@**cs.oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> >
>>> http://cs.oswego.edu/mailman/**listinfo/concurrency-interest<http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>
>>
--
-- Howard.
More information about the lambda-dev
mailing list