[concurrency-interest] Numerical Stream code

Howard Lovatt howard.lovatt at gmail.com
Thu Feb 21 01:39:21 PST 2013


Hi,

I have taken a look at the Numerical Stream Code example previously posted
for the serial case, hence no cross  posting to the concurrency interest
group. I tried a number of different programming styles and the timings are
below and the code at the end:

CLike: time = 2329 ms, result = 99.99581170383331
SerialStream: time = 20441 ms, result = 99.99581170383331
SerialStreamMock: time = 5870 ms, result = 99.99581170383331
SerialStreamManualAverage: time = 18296 ms, result = 99.99581170383331
SerialStreamRange: time = 21775 ms, result = 99.99581170383331
SerialStreamInLine: time = 21775 ms, result = 99.99581170383331
SerialStreamCollect: time = 21754 ms, result = 99.99581170383331

1. CLike is not using streams at all and is the base case
2. SerialStream is using the Stream API in a 'natural' way; 9 times slower
than base
3. SerialStreamMock is using a quick and dirty stream implementation; 3
times slower than base but 3 times quicker than Stream API
4. SerialStreamManualAverage is as SerialStream but instead of calling
average() the calculation from CLike is used; little difference
5. SerialStreamRange uses Streams.intRange instead of Arrays.indices;
little difference
6. SerialStreamInLine inlines the function calls (as suggested by Brian
Goetz); little difference
7. SerialStreamCollect replaces average with a collect call (as suggested
by Brian Goetz); little difference

So I am left with the overhead of the stream API being larger than I
expected; I was expecting more like the overhead of SerialStreamMock, not 3
times worse again. It is probably me doing something wrong so any
suggestions greatly appreciated.

Thanks is advance:

 -- Howard.

=======================================================================

package fdm;

import static java.lang.System.*;
import java.util.Arrays;
import java.util.OptionalDouble;
import java.util.function.BinaryOperator;
import java.util.function.DoubleConsumer;
import java.util.function.IntConsumer;
import java.util.function.IntUnaryOperator;
import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Streams;

/**
 * Numerical solution to: alpha d^2u/dt^2=du/dt for alpha = 1, u(t,0)=100,
u(t,1)=100, u(0,x)=0, see
 * Hornbeck, "Numerical Methods", section 11.1, pp 283. Apples an
explicit-finite-difference-method
 * kernel as a specific example of applying a kernel. Applying a kernel to
a vector/matrix of data
 * is a common numerical operation.
 *
 * @author Howard Lovatt
 */
public final class FDMSerial {
  // Problem constants
  private static final double alpha = 1;
  private static final double uT0 = 100;
  private static final double uT1 = 100;
  private static final double u0X = 0;
  private static final double maxT = 1;
  private static final double deltaX = 0.001; // <= 1
  private static final double deltaT = 0.75 * deltaX * deltaX / 2 / alpha;
// 0.75 is a typical constant used, must be < 1 and > 0
  private static final double k1 = alpha * deltaT / (deltaX * deltaX); //
Constant in front of deltaT / 2
  private static final double k2 = 1 - 2 * k1; // 1 - constant in front of
deltaT
  private static final int numXs = (int) (1 / deltaX) + 1;
  private static final int numTs = (int) (maxT / deltaT) + 1;

  @FunctionalInterface public interface Kernel1D { double u00(double uM1M1,
double uM10, double uM1P1); }

  private static final Kernel1D explicitFDM = (uM1M1, uM10, uM1P1) -> k1 *
(uM1M1 + uM1P1) + k2 * uM10;

  @FunctionalInterface private interface Style { double solve(); }

  private enum Styles implements Style {
    CLike {
      @Override public double solve() {
        uM1[0] = uT0; // t = 0
        for (int xi = 1; xi < numXs - 1; xi++) { uM1[xi] = u0X; }
        uM1[numXs - 1] = uT1;
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          u0[0] = uT0; // x = 0
          for (int xi = 1; xi < numXs - 1; xi++) { u0[xi] =
explicitFDM.u00(uM1[xi - 1], uM1[xi], uM1[xi + 1]); } // 0 < x < 1
          u0[numXs - 1] = uT1; // x = 1
        }
        double sum = 0; // Calculate average of last us
        for (final double u : uM1) { sum += u; }
        return sum / numXs;
      }
    },

    SerialStream {
      @Override public double solve() {
        Arrays.indices(uM1).forEach(this::t0);
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          Arrays.indices(uM1).forEach(this::tg0);
        }
        return Arrays.stream(uM1).average().getAsDouble(); // Really slow!
      }
    },

    SerialStreamMock {
      @Override public double solve() {
        IntStreamMock.indices(uM1).forEach(this::t0);
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          IntStreamMock.indices(uM1).forEach(this::tg0);
        }
        return Arrays.stream(uM1).average().getAsDouble();
      }
    },

    SerialStreamManualAverage {
      @Override public double solve() {
        Arrays.indices(uM1).forEach(this::t0);
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          Arrays.indices(uM1).forEach(this::tg0);
        }
        double sum = 0;
        for (final double u : uM1) { sum += u; }
        return sum / numXs;
      }
    },

    SerialStreamRange {
      @Override public double solve() {
        Streams.intRange(0, numXs).forEach(this::t0);
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          Streams.intRange(0, numXs).forEach(this::tg0);
        }
        return Arrays.stream(uM1).average().getAsDouble(); // Really slow!
      }
    },

    SerialStreamInLine {
      @Override public double solve() {
        Arrays.indices(uM1).forEach( (xi) -> {
          if (xi == 0) { uM1[0] = uT0; }
          else if (xi == numXs - 1) { uM1[numXs - 1] = uT1; }
          else { uM1[xi] = u0X; }
        } );
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          Arrays.indices(uM1).forEach( (xi) -> {
            if (xi == 0) { u0[0] = uT0; }
            else if (xi == numXs - 1) { u0[numXs - 1] = uT1; }
            else { u0[xi] = explicitFDM.u00(uM1[xi - 1], uM1[xi], uM1[xi +
1]); }
          } );
        }
        return Arrays.stream(uM1).average().getAsDouble(); // Really slow!
      }
    },

    SerialStreamCollect {
      @Override public double solve() {
        Arrays.indices(uM1).forEach(this::t0);
        for (int ti = 1; ti < numTs; ti++, uTemp = uM1, uM1 = u0, u0 =
uTemp) { // t > 0
          Arrays.indices(uM1).forEach(this::tg0);
        }
        final double[] sum = Arrays.stream(uM1).collectUnordered( new
Collector.OfDouble<double[]>() {
          public Supplier<double[]> resultSupplier() { return () -> {
return new double[1]; }; }
          public ObjDoubleConsumer<double[]> doubleAccumulator() { return
(sum, value) -> { sum[0] += value; }; }
          public BinaryOperator<double[]> combiner() { return (sum1, sum2)
-> {
            sum1[0] += sum2[0];
            return sum1;
          }; }
        } ); // Also tried sum, reduce, & collect - all really slow
including collectUnordered
        return sum[0] / numXs;
      }
    };

    double[] u0 = new double[numXs];
    double[] uM1 = new double[numXs];
    double[] uTemp;

    void t0(final int xi) {
      if (xi == 0) { uM1[0] = uT0; }
      else if (xi == numXs - 1) { uM1[numXs - 1] = uT1; }
      else { uM1[xi] = u0X; }
    }

    void tg0(final int xi) {
      if (xi == 0) { u0[0] = uT0; }
      else if (xi == numXs - 1) { u0[numXs - 1] = uT1; }
      else { u0[xi] = explicitFDM.u00(uM1[xi - 1], uM1[xi], uM1[xi + 1]); }
    }
  }

  private static void debug(final int ti, final double[] us) {
out.println(ti + ": " + Arrays.toString(us)); }

  private static void time(final Style s) {
      gc(); // Paranoid precaution
      gc();
      final long start = currentTimeMillis();
      final double result = s.solve();
      final long end = currentTimeMillis();
      out.println(s + ": time = " + (end - start) + " ms, result = " +
result);
    }

  /**
   * Time implementations and then print the time and the result for each.
   *
   * @param notUsed the command line arguments are ignored
   */
  public static void main(final String... notUsed) throws Exception {
//    Arrays.stream(Styles.values()).forEach( (s) -> s.solve() ); // Warm
up - doesn't seem to make any difference!
    Arrays.stream(Styles.values()).forEach(FDMSerial::time);
  }

  /* Stuff for stream mocks. Which I have tried to make representative so
that a half decent
   * comparison with Stream can be made, but to keep the size down I have
only priovided a few
   * methods. The mock steams have one abstract method next(), ontop of
which everything else is
   * built. The JVM kicked up when I tried to use interfaces with default
methods, therefore
   * abstract classes used instead and forgo lambdas :(. */
  private static final class Break extends Exception {
    private static final long serialVersionUID = 201302211810L;
    private Break() {}
  }

  private static final Break BREAK = new Break();

  private static final class Continue extends Exception {
    private static final long serialVersionUID = 201302211820L;
    private Continue() {}
  }

  private static final Continue CONTINUE = new Continue();

  private static abstract class IntStreamMock {
    abstract int next() throws Break, Continue;
    void forEach(final IntConsumer consumer) {
      try {
        for (;;) {
          try { consumer.accept(next()); }
          catch (final Continue c) {}
        }
      }
      catch (final Break e) {}
    }
    IntStreamMock map(final IntUnaryOperator unary) {
      return new IntStreamMock() {
        @Override public int next() throws Break, Continue {
          return unary.applyAsInt(IntStreamMock.this.next());
        }
      };
    }
    // ...
    static IntStreamMock indices(final double[] array) { return
indices(array.length); }
    static IntStreamMock indices(final int size) {
      return new IntStreamMock() {
        int index = 0;
        @Override public int next() throws Break {
          if (index == size) { throw BREAK; }
          return index++;
        }
      };
    }
    // ...
  }

  private static abstract class DoubleStreamMock {
    abstract double next() throws Break, Continue;
    void forEach(final DoubleConsumer consumer) {
      try {
        for (;;) {
          try { consumer.accept(next()); }
          catch (final Continue c) {}
        }
      }
      catch (final Break e) {}
    }
    OptionalDouble average() {
      final long size[] = new long[1];
      size[0] = 0;
      final double[] sum = new double[1];
      sum[0] = 0;
      forEach( (v) -> { sum[0] += v; size[0]++; } );
      if (size[0] == 0) { return OptionalDouble.empty(); }
      return OptionalDouble.of(sum[0] / size[0]);
    }
    // ...
    static DoubleStreamMock stream(final double[] array) {
      return new DoubleStreamMock() {
        final int size = array.length;
        int index = 0;
        @Override public double next() throws Break {
          if (index == size) { throw BREAK; }
          return array[index++];
        }
      };
    }
    // ...
  }
}



On 16 February 2013 08:13, Brian Goetz <brian.goetz at oracle.com> wrote:

> Try computing average as:
>
>   int[] collected = stream.collect(() -> new int[2],
>            (arr, i) -> { arr[0] += i; arr[1]++; }
>            (a1, a2) -> { a1[0] += a2[0]; a1[1] += a2[1] });
>   double avg = (collected[1] == 0)
>              ? 0
>              : (double) collected[0]/collected[1];
>
> since the current implementation of average() is doing way more work than
> you need.
>
>
> On 2/15/2013 3:26 PM, Howard Lovatt wrote:
>
>> Hi,
>>
>> Thanks for all the replies. This is largely a holding email. I am
>> travelling with work and don't have my laptop. When get home I will post
>> some more code.
>>
>> @Jin: I did warm up the code, but I do agree that benchmarks are tricky.
>> As I said I was expecting some overhead but was surprised at how much.
>>
>> @Brian: The reason I factored t0 and tg0 out into methods is that they
>> are common between the serial and parallel versions and I thought the
>> code read better. I don't think it makes any difference, but I will check.
>>
>> @Others: To avoid writing over an old array I will have to allocate each
>> time round the t loop. I will give this a try and see if it helps. The
>> discussion about the parallel problems is interesting, but how come the
>> serial version is so slow? Could a problem with the Stream code in
>> general be the underlying problem with the parallel version?
>>
>> Sent from my iPad
>>
>> On 15/02/2013, at 3:48 AM, Stanimir Simeonoff <stanimir at riflexo.com
>> <mailto:stanimir at riflexo.com>> wrote:
>>
>>
>>>     > Do element sizes matter (byte vs. short vs. int  vs. long)?
>>>
>>>     I don't think so.  All of this assumes that the proper instruction
>>>     is used.  For example, if 2 threads are writing to adjacent bytes,
>>>     then the "mov" instruction has to only write the byte.  If the
>>>     compiler, decides to read 32-bits, mask in the 8-bits and write
>>>     32-bits then the data will be corrupted.
>>>
>>> JLS mandates no corruption for neighbor writes.
>>>
>>>     I believe that HotSpot will only generate the write byte mov
>>>     instruction.
>>>
>>> That would be the correct one. The case affects only
>>> boolean[]/byte[]/short[]/char[**]  as simple primitive fields are always
>>> at least 32bits.
>>>
>>> Stanimir
>>>
>>>
>>>     Nathan Reynolds
>>>     <http://psr.us.oracle.com/**wiki/index.php/User:Nathan_**Reynolds<http://psr.us.oracle.com/wiki/index.php/User:Nathan_Reynolds>>
>>> |
>>>     Architect | 602.333.9091 <tel:602.333.9091>
>>>     Oracle PSR Engineering <http://psr.us.oracle.com/> | Server
>>> Technology
>>>
>>>     On 2/14/2013 8:56 AM, Peter Levart wrote:
>>>
>>>>     On 02/14/2013 03:45 PM, Brian Goetz wrote:
>>>>
>>>>>     The parallel version is almost certainly suffering false cache
>>>>>>     line
>>>>>>     sharing when adjacent tasks are writing to the shared arrays
>>>>>>     u0, etc.
>>>>>>     Nothing to do with streams, just a standard parallelism gotcha.
>>>>>>
>>>>>     Cure: don't write to shared arrays from parallel tasks.
>>>>>
>>>>>
>>>>>      Hi,
>>>>
>>>>     I would like to discuss this a little bit (hence the cc:
>>>>     concurrency-interest - the conversation can continue on this list
>>>>     only).
>>>>
>>>>     Is it really important to avoid writing to shared arrays from
>>>>     multiple threads (of course without synchronization, not even
>>>>     volatile writes/reads) when indexes are not shared (each thread
>>>>     writes/reads it's own disjunct subset).
>>>>
>>>>     Do element sizes matter (byte vs. short vs. int  vs. long)?
>>>>
>>>>     I had a (false?) feeling that cache lines are not invalidated
>>>>     when writes are performed without fences.
>>>>
>>>>     Also I don't know how short (byte, char) writes are combined into
>>>>     memory words on the hardware when they come from different cores
>>>>     and whether this is connected to any performance issues.
>>>>
>>>>     Thanks,
>>>>
>>>>     Peter
>>>>
>>>>     ______________________________**_________________
>>>>     Concurrency-interest mailing list
>>>>     Concurrency-interest at cs.**oswego.edu<Concurrency-interest at cs.oswego.edu>
>>>>     <mailto:Concurrency-interest@**cs.oswego.edu<Concurrency-interest at cs.oswego.edu>
>>>> >
>>>>     http://cs.oswego.edu/mailman/**listinfo/concurrency-interest<http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>>
>>>
>>>
>>>     ______________________________**_________________
>>>     Concurrency-interest mailing list
>>>     Concurrency-interest at cs.**oswego.edu<Concurrency-interest at cs.oswego.edu>
>>>     <mailto:Concurrency-interest@**cs.oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> >
>>>
>>>     http://cs.oswego.edu/mailman/**listinfo/concurrency-interest<http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>
>>>
>>> ______________________________**_________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.**oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> <mailto:Concurrency-interest@**cs.oswego.edu<Concurrency-interest at cs.oswego.edu>
>>> >
>>> http://cs.oswego.edu/mailman/**listinfo/concurrency-interest<http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>>
>>


-- 
  -- Howard.


More information about the lambda-dev mailing list