Re: Demo for Parallel Core Collection API
Hi, Some high-level points first: - try and use static import where possible. - suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages. - "ParallelCore" is not a very descriptive name. Suggest "streams". - suggest moving the data classes and XML parsing code to a separate package. - Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that. More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments. Conversion -- A more compact form of mostTProductsByCategory (without comments) is: public static <T extends Comparable<T>> Product[] mostTProductsByCategory( Function<Product, T> func){ Map<String, Optional<Product>> m = products.stream(). collect(groupingBy(Product::getCategory, maxBy(comparing(func)))); return m.values().stream(). map(Optional::get). toArray(Product[]::new); } i.e. show the Map rather than the Collector. A DRYer form: public static Collection<String> countries(boolean ordered) { Stream<String> countries = customers.stream().map(Customer::getCountry); if (ordered) { return countries.distinct().collect(toCollection(LinkedList::new)); } else { return countries.collect(Collectors.<String>toSet()); } } Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value. Elements -- Simpler form. The function to apply the terminal op obfuscates: public static Optional<Supplier> suppliersInCountry(boolean findAny, String country) { Stream<Supplier> s = suppliers.stream(). //filter supplier who is same sa given country filter(supplier -> country. equals(supplier.getCountry())); return findAny ? s.findAny() : s.findFirst(); } The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0: public static Map<String, Product> inStockProductsByCategory( boolean findAny) { BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null) ? l : (r != null && r.getUnitsInStock() > 0) ? r : null; return products.stream().collect( groupingBy(Product::getCategory, reducing(null, reduceToFirstMatch))); } The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate Grouping -- public static Map<String, List<Order>> groupOrdersByCustomer() { //a collector that generate a order list from a customer Collector<Customer, List<Order>, List<Order>> collectOrder = Collector. of(ArrayList<Order>::new, (orders, customer) -> { orders.addAll(customer.getOrders()); }, (left, right) -> { left.addAll(right); return left; }); return customers.parallelStream(). //convert customers to a Map which key is it name, value is its //orders collect(Collectors. groupingBy(Customer::getCompanyName, collectOrder)); } Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage? Can simplify: public static <S, T extends Comparable<T>> Map<S, Optional<Product>> productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc, Function<Product, T> compFunc){ //Comparator of Product which will compare on T by given function Comparator<Product> comp = Comparator.comparing(compFunc); return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp))); } Simplify: public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT( Function<Order, R> func1,Function<Order, T> func2){ return customers.stream(). //map every customer to orders flatMap(customer -> customer.getOrders().stream()). //group products to Map<R, T> by given two functions collect(groupingBy(func1, groupingBy(func2))); } Selection -- public static Map<Customer,Double> bigOrderCustomers(double bigValue) { Function<Customer, Double> orderTotal = c -> c.getOrders().stream(). //map order to order price mapToDouble(Order::getTotal). //calculate total price sum(); //A predicate which filter customer whose total order price is greater //than bigValue Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue; return customers.parallelStream(). //filter customer whose total order price is greater than bigValue filter(bigOrder). //grouping customer to a Map which key is customer, value is //total order price collect( Collectors.toMap(Function.identity(), orderTotal)); } This is essentially performing the sum twice on each element that passes through the filter. Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on. It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match. Subset -- public static List<Order> firstNOrdersFromState(int number, String state, Consumer<Order> action){ return customers.parallelStream(). //only take customers from a particular state filter(c -> state.equals(c.getRegion())). //get orders of those customers flatMap(c -> c.getOrders().stream()). //get first number of orders substream(0, number). //apply the supplied action peek(action). //collect to a list collect(Collectors.toList()); } Replace substream(0, number) with limit(number). The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer. Tabulate -- This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful. It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it. For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair: Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect( // group suppliers by their country Collectors.groupingBy(Supplier::getCountry)); Map<Customer, List<Supplier>> s = customers.stream().collect( Collectors.toMap(Function::identity, c -> countryToSuppliers.getOrDefault(c.getCountry(), null))); Simplify: public static Set<String> bothCustomerAndSupplier() { Set<String> suppliersName = suppliers.stream(). map(Supplier::getSupplierName). collect(toSet()); return customers.stream(). map(Customer::getCompanyName). flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()). collect(toSet()); } Hth, Paul. On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi all
Could you please help to review the demo code for parallel core collection API?
http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
Thank you very much
Tristan Yan(Haibo Yan)
Office : 8610-61066212
Fax : 8610-61065441
Cell : 86-18610696822
2F, Building No. 24, Zhongguancun Software Park
Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
oracle
Thank you Paul There is one minor question, I can't compile one of your code, then I realized that's because grouping by signature is public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) Which makes me wonder why we need type A here, why we don't use variant here. Why the method signature is not public static <T, K, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, ?, D> downstream) I do remember there was a discussion about this, but I couldn't find the answer for this. Code can't be passed compilation return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp))); -----邮件原件----- 发件人: Paul Sandoz 发送时间: Monday, October 14, 2013 11:28 PM 收件人: Tristan Yan 抄送: core-libs-dev@openjdk.java.net; lambda-dev@openjdk.java.net; Taras Ledkov; Andrey Nazarov; Aleksandre Iline 主题: Re: Demo for Parallel Core Collection API Hi, Some high-level points first: - try and use static import where possible. - suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages. - "ParallelCore" is not a very descriptive name. Suggest "streams". - suggest moving the data classes and XML parsing code to a separate package. - Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that. More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments. Conversion -- A more compact form of mostTProductsByCategory (without comments) is: public static <T extends Comparable<T>> Product[] mostTProductsByCategory( Function<Product, T> func){ Map<String, Optional<Product>> m = products.stream(). collect(groupingBy(Product::getCategory, maxBy(comparing(func)))); return m.values().stream(). map(Optional::get). toArray(Product[]::new); } i.e. show the Map rather than the Collector. A DRYer form: public static Collection<String> countries(boolean ordered) { Stream<String> countries = customers.stream().map(Customer::getCountry); if (ordered) { return countries.distinct().collect(toCollection(LinkedList::new)); } else { return countries.collect(Collectors.<String>toSet()); } } Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value. Elements -- Simpler form. The function to apply the terminal op obfuscates: public static Optional<Supplier> suppliersInCountry(boolean findAny, String country) { Stream<Supplier> s = suppliers.stream(). //filter supplier who is same sa given country filter(supplier -> country. equals(supplier.getCountry())); return findAny ? s.findAny() : s.findFirst(); } The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0: public static Map<String, Product> inStockProductsByCategory( boolean findAny) { BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null) ? l : (r != null && r.getUnitsInStock() > 0) ? r : null; return products.stream().collect( groupingBy(Product::getCategory, reducing(null, reduceToFirstMatch))); } The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate Grouping -- public static Map<String, List<Order>> groupOrdersByCustomer() { //a collector that generate a order list from a customer Collector<Customer, List<Order>, List<Order>> collectOrder = Collector. of(ArrayList<Order>::new, (orders, customer) -> { orders.addAll(customer.getOrders()); }, (left, right) -> { left.addAll(right); return left; }); return customers.parallelStream(). //convert customers to a Map which key is it name, value is its //orders collect(Collectors. groupingBy(Customer::getCompanyName, collectOrder)); } Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage? Can simplify: public static <S, T extends Comparable<T>> Map<S, Optional<Product>> productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc, Function<Product, T> compFunc){ //Comparator of Product which will compare on T by given function Comparator<Product> comp = Comparator.comparing(compFunc); return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp))); } Simplify: public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT( Function<Order, R> func1,Function<Order, T> func2){ return customers.stream(). //map every customer to orders flatMap(customer -> customer.getOrders().stream()). //group products to Map<R, T> by given two functions collect(groupingBy(func1, groupingBy(func2))); } Selection -- public static Map<Customer,Double> bigOrderCustomers(double bigValue) { Function<Customer, Double> orderTotal = c -> c.getOrders().stream(). //map order to order price mapToDouble(Order::getTotal). //calculate total price sum(); //A predicate which filter customer whose total order price is greater //than bigValue Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue; return customers.parallelStream(). //filter customer whose total order price is greater than bigValue filter(bigOrder). //grouping customer to a Map which key is customer, value is //total order price collect( Collectors.toMap(Function.identity(), orderTotal)); } This is essentially performing the sum twice on each element that passes through the filter. Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on. It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match. Subset -- public static List<Order> firstNOrdersFromState(int number, String state, Consumer<Order> action){ return customers.parallelStream(). //only take customers from a particular state filter(c -> state.equals(c.getRegion())). //get orders of those customers flatMap(c -> c.getOrders().stream()). //get first number of orders substream(0, number). //apply the supplied action peek(action). //collect to a list collect(Collectors.toList()); } Replace substream(0, number) with limit(number). The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer. Tabulate -- This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful. It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it. For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair: Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect( // group suppliers by their country Collectors.groupingBy(Supplier::getCountry)); Map<Customer, List<Supplier>> s = customers.stream().collect( Collectors.toMap(Function::identity, c -> countryToSuppliers.getOrDefault(c.getCountry(), null))); Simplify: public static Set<String> bothCustomerAndSupplier() { Set<String> suppliersName = suppliers.stream(). map(Supplier::getSupplierName). collect(toSet()); return customers.stream(). map(Customer::getCompanyName). flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()). collect(toSet()); } Hth, Paul. On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi all
Could you please help to review the demo code for parallel core collection API?
http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
Thank you very much
Tristan Yan(Haibo Yan)
Office : 8610-61066212
Fax : 8610-61065441
Cell : 86-18610696822
2F, Building No. 24, Zhongguancun Software Park
Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
oracle
On Oct 15, 2013, at 4:20 PM, Tristan Yan <tristan.yan@oracle.com> wrote:
Thank you Paul
There is one minor question, I can't compile one of your code, then I realized that's because grouping by signature is public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream)
Which makes me wonder why we need type A here, why we don't use variant here. Why the method signature is not public static <T, K, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, ?, D> downstream)
I do remember there was a discussion about this, but I couldn't find the answer for this.
Code can't be passed compilation return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp)));
Drat, i just assumed that since the IDE did not show squiggly red lines that it would compile. Downstream receiving collectors refer to the intermediate accumulation type, so we should abstract what it is, be it a ? or something else of the downstream collector. Unfortunately in this case it looks like the compiler is thrown off the scent by ternary operator. If you remove the operator and just express say "minBy(comp)" it should compile. I don't know whether this is something that can be fixed in the compiler or not. Paul.
Hi Paul you have comments "suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages." We do not really understand what exactly is meant, could you elaborate a little bit. Is it because we want to show ppl that we should use stream more than parallelStream? Thank you -----邮件原件----- 发件人: Paul Sandoz 发送时间: Monday, October 14, 2013 11:28 PM 收件人: Tristan Yan 抄送: core-libs-dev@openjdk.java.net; lambda-dev@openjdk.java.net; Taras Ledkov; Andrey Nazarov; Aleksandre Iline 主题: Re: Demo for Parallel Core Collection API Hi, Some high-level points first: - try and use static import where possible. - suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages. - "ParallelCore" is not a very descriptive name. Suggest "streams". - suggest moving the data classes and XML parsing code to a separate package. - Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that. More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments. Conversion -- A more compact form of mostTProductsByCategory (without comments) is: public static <T extends Comparable<T>> Product[] mostTProductsByCategory( Function<Product, T> func){ Map<String, Optional<Product>> m = products.stream(). collect(groupingBy(Product::getCategory, maxBy(comparing(func)))); return m.values().stream(). map(Optional::get). toArray(Product[]::new); } i.e. show the Map rather than the Collector. A DRYer form: public static Collection<String> countries(boolean ordered) { Stream<String> countries = customers.stream().map(Customer::getCountry); if (ordered) { return countries.distinct().collect(toCollection(LinkedList::new)); } else { return countries.collect(Collectors.<String>toSet()); } } Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value. Elements -- Simpler form. The function to apply the terminal op obfuscates: public static Optional<Supplier> suppliersInCountry(boolean findAny, String country) { Stream<Supplier> s = suppliers.stream(). //filter supplier who is same sa given country filter(supplier -> country. equals(supplier.getCountry())); return findAny ? s.findAny() : s.findFirst(); } The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0: public static Map<String, Product> inStockProductsByCategory( boolean findAny) { BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null) ? l : (r != null && r.getUnitsInStock() > 0) ? r : null; return products.stream().collect( groupingBy(Product::getCategory, reducing(null, reduceToFirstMatch))); } The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate Grouping -- public static Map<String, List<Order>> groupOrdersByCustomer() { //a collector that generate a order list from a customer Collector<Customer, List<Order>, List<Order>> collectOrder = Collector. of(ArrayList<Order>::new, (orders, customer) -> { orders.addAll(customer.getOrders()); }, (left, right) -> { left.addAll(right); return left; }); return customers.parallelStream(). //convert customers to a Map which key is it name, value is its //orders collect(Collectors. groupingBy(Customer::getCompanyName, collectOrder)); } Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage? Can simplify: public static <S, T extends Comparable<T>> Map<S, Optional<Product>> productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc, Function<Product, T> compFunc){ //Comparator of Product which will compare on T by given function Comparator<Product> comp = Comparator.comparing(compFunc); return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp))); } Simplify: public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT( Function<Order, R> func1,Function<Order, T> func2){ return customers.stream(). //map every customer to orders flatMap(customer -> customer.getOrders().stream()). //group products to Map<R, T> by given two functions collect(groupingBy(func1, groupingBy(func2))); } Selection -- public static Map<Customer,Double> bigOrderCustomers(double bigValue) { Function<Customer, Double> orderTotal = c -> c.getOrders().stream(). //map order to order price mapToDouble(Order::getTotal). //calculate total price sum(); //A predicate which filter customer whose total order price is greater //than bigValue Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue; return customers.parallelStream(). //filter customer whose total order price is greater than bigValue filter(bigOrder). //grouping customer to a Map which key is customer, value is //total order price collect( Collectors.toMap(Function.identity(), orderTotal)); } This is essentially performing the sum twice on each element that passes through the filter. Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on. It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match. Subset -- public static List<Order> firstNOrdersFromState(int number, String state, Consumer<Order> action){ return customers.parallelStream(). //only take customers from a particular state filter(c -> state.equals(c.getRegion())). //get orders of those customers flatMap(c -> c.getOrders().stream()). //get first number of orders substream(0, number). //apply the supplied action peek(action). //collect to a list collect(Collectors.toList()); } Replace substream(0, number) with limit(number). The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer. Tabulate -- This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful. It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it. For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair: Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect( // group suppliers by their country Collectors.groupingBy(Supplier::getCountry)); Map<Customer, List<Supplier>> s = customers.stream().collect( Collectors.toMap(Function::identity, c -> countryToSuppliers.getOrDefault(c.getCountry(), null))); Simplify: public static Set<String> bothCustomerAndSupplier() { Set<String> suppliersName = suppliers.stream(). map(Supplier::getSupplierName). collect(toSet()); return customers.stream(). map(Customer::getCompanyName). flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()). collect(toSet()); } Hth, Paul. On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi all
Could you please help to review the demo code for parallel core collection API?
http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
Thank you very much
Tristan Yan(Haibo Yan)
Office : 8610-61066212
Fax : 8610-61065441
Cell : 86-18610696822
2F, Building No. 24, Zhongguancun Software Park
Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
oracle
On Oct 15, 2013, at 4:35 PM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi Paul you have comments "suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages."
We do not really understand what exactly is meant, could you elaborate a little bit. Is it because we want to show ppl that we should use stream more than parallelStream?
Going parallel is easy to do but not always the right thing to do. Going parallel almost always requires more work with the expectation that work will complete sooner than the work required to get the same result sequentially. There are a number of factors that affect whether parallel is faster than sequential. Two of those factors are N, the size of the data, and Q the cost of processing an element in the pipeline. N * Q is a simple cost model, the large that product the better the chances of parallel speed up. N is easy to know, Q not so easy but can often be intuitively guessed. (Note that there are other factors such as the properties of the stream source and operations that Brian and I talked about in our J1 presentation.) Demo code that just makes everything (or most streams) parallel is sending out the wrong message. So i think the demo code should present two general things: 1) various stream functionality, as you have done; 2) parallel vs. sequential for various cases where it is known that parallel is faster on a multi-core system. For 2) i strongly recommend measuring using jmh [1]. The data sets you have may or may not be amenable to parallel processing, it's worth investigating though. I have ideas for other parallel demos. One is creating probably primes (now that SecureRandom is replaced with ThreadLocalRandom), creating a probably prime that is a BigInteger is an relatively expensive operation so Q should be high. Another more advanced demo is a Monte-Carlo calculation of PI using SplittableRandom and a special Spliterator, in this case N should be largish. But there are other simpler demonstrations like sum of squares etc to get across that N should be large. Another demo could be calculation of a mandelbrot set, which is embarrassingly parallel over an area in the complex plane. So while you should try and fit some parallel vs. sequential execution into your existing demos i do think it worth having a separate set of demos that get across the the simple cost model of N * Q. So feel free to use some of those ideas previously mentioned, i find those ideas fun so perhaps others will too :-) Paul. [1] http://openjdk.java.net/projects/code-tools/jmh/ On Oct 15, 2013, at 4:37 PM, Tristan Yan <tristan.yan@oracle.com> wrote:
Also there is one more question I missed
You suggested ""ParallelCore" is not a very descriptive name. Suggest "streams"." 1) yes we agree this demo is not for parallel computation per se 2) but we do not have a clear demo for parallel computation 3) if we are to rename this, we need to develop another one, do you have a scenario for that?
Hi Paul And Everyone Sorry for getting back late. I took Paul's suggestion and have written other two demos which presents usage of parallel computation. One is using Monte-Carlo to calculate value of PI. Other is find a big prime by given length. Please review it. http://cr.openjdk.java.net/~tyan/sample/webrev.00/ <http://cr.openjdk.java.net/%7Etyan/sample/webrev.00/> There is another demo which present mandelbrot set was designed Alexander Kouznetsov has been already in reviewing. It's not my code review request. Thank you very much Tristan On 10/15/2013 11:20 PM, Paul Sandoz wrote:
On Oct 15, 2013, at 4:35 PM, Tristan Yan <tristan.yan@oracle.com <mailto:tristan.yan@oracle.com>> wrote:
Hi Paul you have comments "suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages."
We do not really understand what exactly is meant, could you elaborate a little bit. Is it because we want to show ppl that we should use stream more than parallelStream?
Going parallel is easy to do but not always the right thing to do. Going parallel almost always requires more work with the expectation that work will complete sooner than the work required to get the same result sequentially. There are a number of factors that affect whether parallel is faster than sequential. Two of those factors are N, the size of the data, and Q the cost of processing an element in the pipeline. N * Q is a simple cost model, the large that product the better the chances of parallel speed up. N is easy to know, Q not so easy but can often be intuitively guessed. (Note that there are other factors such as the properties of the stream source and operations that Brian and I talked about in our J1 presentation.)
Demo code that just makes everything (or most streams) parallel is sending out the wrong message.
So i think the demo code should present two general things:
1) various stream functionality, as you have done;
2) parallel vs. sequential for various cases where it is known that parallel is faster on a multi-core system.
For 2) i strongly recommend measuring using jmh [1]. The data sets you have may or may not be amenable to parallel processing, it's worth investigating though.
I have ideas for other parallel demos. One is creating probably primes (now that SecureRandom is replaced with ThreadLocalRandom), creating a probably prime that is a BigInteger is an relatively expensive operation so Q should be high. Another more advanced demo is a Monte-Carlo calculation of PI using SplittableRandom and a special Spliterator, in this case N should be largish. But there are other simpler demonstrations like sum of squares etc to get across that N should be large. Another demo could be calculation of a mandelbrot set, which is embarrassingly parallel over an area in the complex plane.
So while you should try and fit some parallel vs. sequential execution into your existing demos i do think it worth having a separate set of demos that get across the the simple cost model of N * Q. So feel free to use some of those ideas previously mentioned, i find those ideas fun so perhaps others will too :-)
Paul.
[1] http://openjdk.java.net/projects/code-tools/jmh/
On Oct 15, 2013, at 4:37 PM, Tristan Yan <tristan.yan@oracle.com <mailto:tristan.yan@oracle.com>> wrote:
Also there is one more question I missed
You suggested ""ParallelCore" is not a very descriptive name. Suggest "streams"." 1) yes we agree this demo is not for parallel computation per se 2) but we do not have a clear demo for parallel computation 3) if we are to rename this, we need to develop another one, do you have a scenario for that?
Hi Tristan, Thanks, I need to look at this in more detail, but here are some quick comments. - recommend you try and avoid using limit with parallel ops, for example the Pi example cam be reformulated as: long M = LongStream.range(0, N).parallel().filter(sr -> { double x = ThreadLocalRandom.current().nextDouble(-1, 1); double y = ThreadLocalRandom.current().nextDouble(-1, 1); return x * x + y * y < R * R; // Don't use need to use sqrt }).count(); double pi = (M / N) * 4.0; the Primes example could be reformulated as: LongStream.range(0, limit).parallel().map(/odd values/).filter(RandomPrimeNumber::isPrime).findAny(); you don't need to declare unordered() since findAny implicitly makes the stream unordered by definition. The key message here is range has better decomposition characteristics than generate or iterate. More later, probably after the break, Paul. On Dec 19, 2013, at 1:16 PM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi Paul And Everyone Sorry for getting back late. I took Paul's suggestion and have written other two demos which presents usage of parallel computation. One is using Monte-Carlo to calculate value of PI. Other is find a big prime by given length. Please review it. http://cr.openjdk.java.net/~tyan/sample/webrev.00/ There is another demo which present mandelbrot set was designed Alexander Kouznetsov has been already in reviewing. It's not my code review request. Thank you very much Tristan
Hi Tristan, See below for a patch. The location of the code seems a little out of place with the other code. I would have expected a structure such as: stream/parallel/*.java where the source code is in the default no-package. I am not yet convinced of the value of the RandomPrimeNumber example. In your original code you had a parallel stream invoking another parallel stream in the filter (the simple division to return a prime), this has the effect of actually slowing down the computation due to each calculation fighting for F/J resources and threads. Remove the additional parallelism and it is not clear when eyeballing the execution time that parallel is faster than sequential (it probably is, but it is handy to have obvious examples). A simpler example is to generate a list of probable primes of a certain bit length. IMHO a better example in the category of "slightly more complex" is the rendering of the Mandelbrot set, parallelized along the real or imaginary axis. Once can easily generate some nice ASCII-like art :-) Paul. On Dec 20, 2013, at 6:25 PM, Paul Sandoz <Paul.Sandoz@oracle.com> wrote:
Thanks, I need to look at this in more detail, but here are some quick comments.
- recommend you try and avoid using limit with parallel ops, for example the Pi example cam be reformulated as:
long M = LongStream.range(0, N).parallel().filter(sr -> { double x = ThreadLocalRandom.current().nextDouble(-1, 1); double y = ThreadLocalRandom.current().nextDouble(-1, 1);
return x * x + y * y < R * R; // Don't use need to use sqrt }).count(); double pi = (M / N) * 4.0;
the Primes example could be reformulated as:
LongStream.range(0, limit).parallel().map(/odd values/).filter(RandomPrimeNumber::isPrime).findAny();
you don't need to declare unordered() since findAny implicitly makes the stream unordered by definition.
The key message here is range has better decomposition characteristics than generate or iterate.
More later, probably after the break, Paul.
diff -r d56cd0872ec4 src/share/sample/demo/parallel/MonteCarloPI.java --- a/src/share/sample/demo/parallel/MonteCarloPI.java Tue Jan 14 13:34:22 2014 +0100 +++ b/src/share/sample/demo/parallel/MonteCarloPI.java Tue Jan 14 14:15:22 2014 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -36,16 +36,14 @@ * input validation and proper error handling, might not be present in * this sample code. */ -package demo.parallel; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.LongStream; /** - * This demo shows how to use the parallel mode and the Monte Carlo method to - * calculate the value of PI + * This demo shows how to use parallel streams to approximately calculate the + * value of π using the Monte Carlo method. * - * @author tyan */ public class MonteCarloPI { @@ -62,39 +60,32 @@ } /** - * Use the Monte Carlo method to calculate the value of PI. basic algorithm + * Use the Monte Carlo method to calculate the value of π. basic algorithm * is: 1. Draw a square on the ground, then inscribe a circle within it. 2. * Scatter some objects of uniform size (grains of rice or sand) over the * square. 3. Count the total number of objects inside the circle and the - * total number of objects overall. 4. The ratio of the two total is an - * estimate of the ratio of the two areas, which is PI/4. Multiply the - * result by 4 to estimate PI. + * total number of objects overall. 4. The ratio of the two totals is an + * estimate of the ratio of the two areas, which is pi/4. Multiply the + * result by 4 to estimate π. * - * @param x how many times randomly selected a point - * @return value of π by x times calculation + * @param n how many times to randomly selected a point + * @return the approximate value of π */ - private static double pi(long x) { - return LongStream.generate(() -> hit()). - // using parallel mode - parallel(). - // select only x elements - limit(x).sum() - // perform division before multiplication to reduce ovefflow - // risk - / (double) x * 4; + private static double pi(long n) { + long m = LongStream.range(0, n).parallel().filter(i -> hit()).count(); + return (m / (double) n) * 4.0; } /** - * Use ThreadLocalRandom to simulate that whether a point is inside the - * circle or outside the circle + * Use ThreadLocalRandom to simulate that whether a point is inside a + * circle, of radius 1, or outside. * - * @return 1 randomly selected point is inside the circle 0 randomly - * selected point is outside the circle + * @return true if within the circle, otherwise false */ - private static long hit() { + private static boolean hit() { ThreadLocalRandom lr = ThreadLocalRandom.current(); - double x = lr.nextDouble(1.0); - double y = lr.nextDouble(1.0); - return Math.sqrt(y * y + x * x) <= 1.0 ? 1 : 0; + double x = lr.nextDouble(-1.0, 1.0); + double y = lr.nextDouble(-1.0, 1.0); + return x * x + y * y <= 1.0; } } diff -r d56cd0872ec4 src/share/sample/demo/parallel/RandomPrimeNumber.java --- a/src/share/sample/demo/parallel/RandomPrimeNumber.java Tue Jan 14 13:34:22 2014 +0100 +++ b/src/share/sample/demo/parallel/RandomPrimeNumber.java Tue Jan 14 14:15:22 2014 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -36,19 +36,19 @@ * input validation and proper error handling, might not be present in * this sample code. */ -package demo.parallel; import java.math.BigInteger; import static java.math.BigInteger.ONE; import java.util.BitSet; import java.util.OptionalLong; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.LongStream; /** - * This demo shows how to use the parallel mode to find an unknown length�s - * prime number. The process includes dividing the computation area to an m * n - * segment and then randomly selecting mth and nth segments to try to find a + * This demo shows how to use parallel streams to find a prime number. + * The process includes dividing the computation area to an m * n + * segments and then randomly selecting mth and nth segments to try to find a * prime number in the computation area. If there is no prime number in this * segment, then this process is repeated until the entire segment has been * examined. Either there will be no prime number in this area, or the first @@ -91,7 +91,7 @@ + "prime number"); return; } - Random random = new Random(System.currentTimeMillis()); + Random random = ThreadLocalRandom.current(); /* * Prime number can not be a even number; it must be odd number. */ @@ -140,23 +140,15 @@ long start = first_number.longValue() + randomHighSeg * LOW_BIT_SEGMENT * SEGMENT_SIZE + randomLowSeg * SEGMENT_SIZE; - OptionalLong anyPrime = LongStream.iterate(start, l -> l + 2). - // You must first specify the limit to guarantee that - // the length of the number is an acceptable length. - limit(limit). - // From this point, use parallel(). - // You have to find only one prime number, so the - // order is not important. Also, an unordered stream - // is more efficient than an ordered stream. + + OptionalLong anyPrime = LongStream.range(0, limit). parallel(). - // we only need find one prime number, so keeping - // order is not needed. Also unordered stream is - // more efficient than ordered stream. - unordered(). + // Generate a sequence of limit elements as as follows + // start, start + 2, start + 4, start + 6 + map(n -> n * 2 + start). // Filter only the prime number filter(RandomPrimeNumber::isPrime). - //Stop the process as soon as you find a prime - //number. + // Stop the process as soon as you find a prime number. findAny(); if (anyPrime.isPresent()) { System.out.println(String.format( @@ -176,13 +168,12 @@ /** * Decide if a BigInteger is a prime number * - * @param integer a number + * @param number a number * @return true if integer is a prime number false if integer is not a prime * number */ private static boolean isPrime(long number) { - //This is a parall version that checks if a number is a prime number - return !LongStream.range(2L, Math.round(Math.sqrt(number))).parallel(). + return !LongStream.range(2L, Math.round(Math.sqrt(number))). anyMatch(divisor -> number % divisor == 0); }
Hi Paul I know this may be a little bit late. But I am still asking you review this. http://cr.openjdk.java.net/~tyan/JDK-8033358/webrev.01/ This is a whole demo code include the stream demo code you reviewed before also included parallel part. There is one other parallel demo that Paul has already pushed. Could you please review it again? Thank you so much Tristan -----邮件原件----- 发件人: Paul Sandoz 发送时间: Tuesday, January 14, 2014 9:27 PM 收件人: core-libs-dev@openjdk.java.net 抄送: Tristan Yan 主题: Re: Demo for Parallel Core Collection API Hi Tristan, See below for a patch. The location of the code seems a little out of place with the other code. I would have expected a structure such as: stream/parallel/*.java where the source code is in the default no-package. I am not yet convinced of the value of the RandomPrimeNumber example. In your original code you had a parallel stream invoking another parallel stream in the filter (the simple division to return a prime), this has the effect of actually slowing down the computation due to each calculation fighting for F/J resources and threads. Remove the additional parallelism and it is not clear when eyeballing the execution time that parallel is faster than sequential (it probably is, but it is handy to have obvious examples). A simpler example is to generate a list of probable primes of a certain bit length. IMHO a better example in the category of "slightly more complex" is the rendering of the Mandelbrot set, parallelized along the real or imaginary axis. Once can easily generate some nice ASCII-like art :-) Paul. On Dec 20, 2013, at 6:25 PM, Paul Sandoz <Paul.Sandoz@oracle.com> wrote:
Thanks, I need to look at this in more detail, but here are some quick comments.
- recommend you try and avoid using limit with parallel ops, for example the Pi example cam be reformulated as:
long M = LongStream.range(0, N).parallel().filter(sr -> { double x = ThreadLocalRandom.current().nextDouble(-1, 1); double y = ThreadLocalRandom.current().nextDouble(-1, 1);
return x * x + y * y < R * R; // Don't use need to use sqrt }).count(); double pi = (M / N) * 4.0;
the Primes example could be reformulated as:
LongStream.range(0, limit).parallel().map(/odd values/).filter(RandomPrimeNumber::isPrime).findAny();
you don't need to declare unordered() since findAny implicitly makes the stream unordered by definition.
The key message here is range has better decomposition characteristics than generate or iterate.
More later, probably after the break, Paul.
diff -r d56cd0872ec4 src/share/sample/demo/parallel/MonteCarloPI.java --- a/src/share/sample/demo/parallel/MonteCarloPI.java Tue Jan 14 13:34:22 2014 +0100 +++ b/src/share/sample/demo/parallel/MonteCarloPI.java Tue Jan 14 14:15:22 2014 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -36,16 +36,14 @@ * input validation and proper error handling, might not be present in * this sample code. */ -package demo.parallel; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.LongStream; /** - * This demo shows how to use the parallel mode and the Monte Carlo method to - * calculate the value of PI + * This demo shows how to use parallel streams to approximately + calculate the + * value of π using the Monte Carlo method. * - * @author tyan */ public class MonteCarloPI { @@ -62,39 +60,32 @@ } /** - * Use the Monte Carlo method to calculate the value of PI. basic algorithm + * Use the Monte Carlo method to calculate the value of π. basic + algorithm * is: 1. Draw a square on the ground, then inscribe a circle within it. 2. * Scatter some objects of uniform size (grains of rice or sand) over the * square. 3. Count the total number of objects inside the circle and the - * total number of objects overall. 4. The ratio of the two total is an - * estimate of the ratio of the two areas, which is PI/4. Multiply the - * result by 4 to estimate PI. + * total number of objects overall. 4. The ratio of the two totals is an + * estimate of the ratio of the two areas, which is pi/4. Multiply the + * result by 4 to estimate π. * - * @param x how many times randomly selected a point - * @return value of π by x times calculation + * @param n how many times to randomly selected a point + * @return the approximate value of π */ - private static double pi(long x) { - return LongStream.generate(() -> hit()). - // using parallel mode - parallel(). - // select only x elements - limit(x).sum() - // perform division before multiplication to reduce ovefflow - // risk - / (double) x * 4; + private static double pi(long n) { + long m = LongStream.range(0, n).parallel().filter(i -> hit()).count(); + return (m / (double) n) * 4.0; } /** - * Use ThreadLocalRandom to simulate that whether a point is inside the - * circle or outside the circle + * Use ThreadLocalRandom to simulate that whether a point is inside a + * circle, of radius 1, or outside. * - * @return 1 randomly selected point is inside the circle 0 randomly - * selected point is outside the circle + * @return true if within the circle, otherwise false */ - private static long hit() { + private static boolean hit() { ThreadLocalRandom lr = ThreadLocalRandom.current(); - double x = lr.nextDouble(1.0); - double y = lr.nextDouble(1.0); - return Math.sqrt(y * y + x * x) <= 1.0 ? 1 : 0; + double x = lr.nextDouble(-1.0, 1.0); + double y = lr.nextDouble(-1.0, 1.0); + return x * x + y * y <= 1.0; } } diff -r d56cd0872ec4 src/share/sample/demo/parallel/RandomPrimeNumber.java --- a/src/share/sample/demo/parallel/RandomPrimeNumber.java Tue Jan 14 13:34:22 2014 +0100 +++ b/src/share/sample/demo/parallel/RandomPrimeNumber.java Tue Jan 14 14:15:22 2014 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -36,19 +36,19 @@ * input validation and proper error handling, might not be present in * this sample code. */ -package demo.parallel; import java.math.BigInteger; import static java.math.BigInteger.ONE; import java.util.BitSet; import java.util.OptionalLong; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.LongStream; /** - * This demo shows how to use the parallel mode to find an unknown lengths - * prime number. The process includes dividing the computation area to an m * n - * segment and then randomly selecting mth and nth segments to try to find a + * This demo shows how to use parallel streams to find a prime number. + * The process includes dividing the computation area to an m * n + * segments and then randomly selecting mth and nth segments to try to + find a * prime number in the computation area. If there is no prime number in this * segment, then this process is repeated until the entire segment has been * examined. Either there will be no prime number in this area, or the first @@ -91,7 +91,7 @@ + "prime number"); return; } - Random random = new Random(System.currentTimeMillis()); + Random random = ThreadLocalRandom.current(); /* * Prime number can not be a even number; it must be odd number. */ @@ -140,23 +140,15 @@ long start = first_number.longValue() + randomHighSeg * LOW_BIT_SEGMENT * SEGMENT_SIZE + randomLowSeg * SEGMENT_SIZE; - OptionalLong anyPrime = LongStream.iterate(start, l -> l + 2). - // You must first specify the limit to guarantee that - // the length of the number is an acceptable length. - limit(limit). - // From this point, use parallel(). - // You have to find only one prime number, so the - // order is not important. Also, an unordered stream - // is more efficient than an ordered stream. + + OptionalLong anyPrime = LongStream.range(0, limit). parallel(). - // we only need find one prime number, so keeping - // order is not needed. Also unordered stream is - // more efficient than ordered stream. - unordered(). + // Generate a sequence of limit elements as as follows + // start, start + 2, start + 4, start + 6 + map(n -> n * 2 + start). // Filter only the prime number filter(RandomPrimeNumber::isPrime). - //Stop the process as soon as you find a prime - //number. + // Stop the process as soon as you find a prime number. findAny(); if (anyPrime.isPresent()) { System.out.println(String.format( @@ -176,13 +168,12 @@ /** * Decide if a BigInteger is a prime number * - * @param integer a number + * @param number a number * @return true if integer is a prime number false if integer is not a prime * number */ private static boolean isPrime(long number) { - //This is a parall version that checks if a number is a prime number - return !LongStream.range(2L, Math.round(Math.sqrt(number))).parallel(). + return !LongStream.range(2L, Math.round(Math.sqrt(number))). anyMatch(divisor -> number % divisor == 0); }
On Feb 4, 2014, at 2:58 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi Paul I know this may be a little bit late.
Yes, likely too late... (see below).
But I am still asking you review this.
http://cr.openjdk.java.net/~tyan/JDK-8033358/webrev.01/
This is a whole demo code include the stream demo code you reviewed before also included parallel part. There is one other parallel demo that Paul has already pushed. Could you please review it again?
Fibonacci - There is no parallel execution; one can write as follows for better splitting: return IntStream.range(0, n).mapToObj(i -> matrix). parallel(). reduce(Fibonacci::times). get(); - While the approach is interesting (elegant in terms of the reduction) one is not likely to observe any speed up (did you measure?) since the long value will overflow on the 93rd number [1]. You would need to use BigInteger and a larger number of iterations. ImageTransform - In rotate/shift/zoom/invert the inner range (over height) should not be parallel, it's just gonna fight for resources with the outer range (over width). - It's not entirely clear to me if BufferedImage is thread safe to concurrently set the pixels MonteCarloPI - hit() method is no longer used. RandomPrimeNumber - better to use ThreadLocalRandom or SplittableRandom, as both of those produce higher quality sequences of random numbers. - i don't think this example has much value as there are better ways to determine primes. As i have said before, if you want to show a simple example it is better to show the parallel generation of probable primes using BigInteger. -- I am sorry to have to say this, i know if you have worked on this for a while, but i think this is now too late to go through another round of reviews. Paul. [1] Think rabbits! http://www.maths.surrey.ac.uk/hosted-sites/R.Knott/Fibonacci/fibtable.html You can think
Thank you Paul. I was told maybe we will target this on 8u20 if we can't catch 8. Let's do the open review first. http://cr.openjdk.java.net/~tyan/JDK-8033358/webrev.02/ Fibonacci: Update as you suggested. Test shows parallel version is faster than sequential version when the input is greater than 100,000. ImageTransform Inner parallel was eliminated. Also BufferedImage.set is a thread safe method. MonteCarloPI Unused hit method was removed. RandomPrimeNumber It's changed into ProbablePrimeNumber. Following your suggestion to use BigInteger.probablePrime method. Thank you Tristan -----邮件原件----- 发件人: Paul Sandoz 发送时间: Tuesday, February 04, 2014 5:34 PM 收件人: Tristan Yan 抄送: core-libs-dev@openjdk.java.net; Aleksandre Iline; Mikhail Kondratyev 主题: Re: Demo for Parallel Core Collection API On Feb 4, 2014, at 2:58 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi Paul I know this may be a little bit late.
Yes, likely too late... (see below).
But I am still asking you review this.
http://cr.openjdk.java.net/~tyan/JDK-8033358/webrev.01/
This is a whole demo code include the stream demo code you reviewed before also included parallel part. There is one other parallel demo that Paul has already pushed. Could you please review it again?
Fibonacci - There is no parallel execution; one can write as follows for better splitting: return IntStream.range(0, n).mapToObj(i -> matrix). parallel(). reduce(Fibonacci::times). get(); - While the approach is interesting (elegant in terms of the reduction) one is not likely to observe any speed up (did you measure?) since the long value will overflow on the 93rd number [1]. You would need to use BigInteger and a larger number of iterations. ImageTransform - In rotate/shift/zoom/invert the inner range (over height) should not be parallel, it's just gonna fight for resources with the outer range (over width). - It's not entirely clear to me if BufferedImage is thread safe to concurrently set the pixels MonteCarloPI - hit() method is no longer used. RandomPrimeNumber - better to use ThreadLocalRandom or SplittableRandom, as both of those produce higher quality sequences of random numbers. - i don't think this example has much value as there are better ways to determine primes. As i have said before, if you want to show a simple example it is better to show the parallel generation of probable primes using BigInteger. -- I am sorry to have to say this, i know if you have worked on this for a while, but i think this is now too late to go through another round of reviews. Paul. [1] Think rabbits! http://www.maths.surrey.ac.uk/hosted-sites/R.Knott/Fibonacci/fibtable.html You can think
Also there is one more question I missed You suggested ""ParallelCore" is not a very descriptive name. Suggest "streams"." 1) yes we agree this demo is not for parallel computation per se 2) but we do not have a clear demo for parallel computation 3) if we are to rename this, we need to develop another one, do you have a scenario for that? Thank you -----邮件原件----- 发件人: Paul Sandoz 发送时间: Monday, October 14, 2013 11:28 PM 收件人: Tristan Yan 抄送: core-libs-dev@openjdk.java.net; lambda-dev@openjdk.java.net; Taras Ledkov; Andrey Nazarov; Aleksandre Iline 主题: Re: Demo for Parallel Core Collection API Hi, Some high-level points first: - try and use static import where possible. - suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages. - "ParallelCore" is not a very descriptive name. Suggest "streams". - suggest moving the data classes and XML parsing code to a separate package. - Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that. More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments. Conversion -- A more compact form of mostTProductsByCategory (without comments) is: public static <T extends Comparable<T>> Product[] mostTProductsByCategory( Function<Product, T> func){ Map<String, Optional<Product>> m = products.stream(). collect(groupingBy(Product::getCategory, maxBy(comparing(func)))); return m.values().stream(). map(Optional::get). toArray(Product[]::new); } i.e. show the Map rather than the Collector. A DRYer form: public static Collection<String> countries(boolean ordered) { Stream<String> countries = customers.stream().map(Customer::getCountry); if (ordered) { return countries.distinct().collect(toCollection(LinkedList::new)); } else { return countries.collect(Collectors.<String>toSet()); } } Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value. Elements -- Simpler form. The function to apply the terminal op obfuscates: public static Optional<Supplier> suppliersInCountry(boolean findAny, String country) { Stream<Supplier> s = suppliers.stream(). //filter supplier who is same sa given country filter(supplier -> country. equals(supplier.getCountry())); return findAny ? s.findAny() : s.findFirst(); } The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0: public static Map<String, Product> inStockProductsByCategory( boolean findAny) { BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null) ? l : (r != null && r.getUnitsInStock() > 0) ? r : null; return products.stream().collect( groupingBy(Product::getCategory, reducing(null, reduceToFirstMatch))); } The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate Grouping -- public static Map<String, List<Order>> groupOrdersByCustomer() { //a collector that generate a order list from a customer Collector<Customer, List<Order>, List<Order>> collectOrder = Collector. of(ArrayList<Order>::new, (orders, customer) -> { orders.addAll(customer.getOrders()); }, (left, right) -> { left.addAll(right); return left; }); return customers.parallelStream(). //convert customers to a Map which key is it name, value is its //orders collect(Collectors. groupingBy(Customer::getCompanyName, collectOrder)); } Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage? Can simplify: public static <S, T extends Comparable<T>> Map<S, Optional<Product>> productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc, Function<Product, T> compFunc){ //Comparator of Product which will compare on T by given function Comparator<Product> comp = Comparator.comparing(compFunc); return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp))); } Simplify: public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT( Function<Order, R> func1,Function<Order, T> func2){ return customers.stream(). //map every customer to orders flatMap(customer -> customer.getOrders().stream()). //group products to Map<R, T> by given two functions collect(groupingBy(func1, groupingBy(func2))); } Selection -- public static Map<Customer,Double> bigOrderCustomers(double bigValue) { Function<Customer, Double> orderTotal = c -> c.getOrders().stream(). //map order to order price mapToDouble(Order::getTotal). //calculate total price sum(); //A predicate which filter customer whose total order price is greater //than bigValue Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue; return customers.parallelStream(). //filter customer whose total order price is greater than bigValue filter(bigOrder). //grouping customer to a Map which key is customer, value is //total order price collect( Collectors.toMap(Function.identity(), orderTotal)); } This is essentially performing the sum twice on each element that passes through the filter. Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on. It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match. Subset -- public static List<Order> firstNOrdersFromState(int number, String state, Consumer<Order> action){ return customers.parallelStream(). //only take customers from a particular state filter(c -> state.equals(c.getRegion())). //get orders of those customers flatMap(c -> c.getOrders().stream()). //get first number of orders substream(0, number). //apply the supplied action peek(action). //collect to a list collect(Collectors.toList()); } Replace substream(0, number) with limit(number). The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer. Tabulate -- This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful. It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it. For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair: Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect( // group suppliers by their country Collectors.groupingBy(Supplier::getCountry)); Map<Customer, List<Supplier>> s = customers.stream().collect( Collectors.toMap(Function::identity, c -> countryToSuppliers.getOrDefault(c.getCountry(), null))); Simplify: public static Set<String> bothCustomerAndSupplier() { Set<String> suppliersName = suppliers.stream(). map(Supplier::getSupplierName). collect(toSet()); return customers.stream(). map(Customer::getCompanyName). flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()). collect(toSet()); } Hth, Paul. On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi all
Could you please help to review the demo code for parallel core collection API?
http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
Thank you very much
Tristan Yan(Haibo Yan)
Office : 8610-61066212
Fax : 8610-61065441
Cell : 86-18610696822
2F, Building No. 24, Zhongguancun Software Park
Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
oracle
Hi, I notice in some grouping by cases that it appears as if you want to stream downstream. Here is a simple example (not very interesting but it gets the point across): /** * A downstream collector that streams elements to a further downstream * collector. */ public static <T, U, A, R> Collector<T, ?, R> streaming(Function<? super T, ? extends Stream<? extends U>> mapper, Collector<? super U, A, R> downstream) { BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator(); return Collector.of(downstream.supplier(), (r, t) -> mapper.apply(t).sequential().forEach(u -> downstreamAccumulator.accept(r, u)), downstream.combiner(), downstream.finisher(), downstream.characteristics().stream().toArray(Collector.Characteristics[]::new)); } public static void main(String[] args) throws Exception { Pattern dot = Pattern.compile("\\."); { Map<String, Long> m = System.getProperties().keySet().stream() .map(Object::toString) .collect(groupingBy(s -> s, streaming(dot::splitAsStream, counting()))); System.out.println(m); } { Map<String, List<String>> m = System.getProperties().keySet().stream() .map(Object::toString) .collect(groupingBy(s -> s, streaming(dot::splitAsStream, toList()))); System.out.println(m); } } It behaves like Stream.flatMap. If that collector is useful for you, then perhaps consider adding it to your demo as an advance use of Collector. Unfortunately it is probably too late to add such a collector to the API. Paul. On Oct 14, 2013, at 5:27 PM, Paul Sandoz <Paul.Sandoz@oracle.com> wrote:
Hi,
Some high-level points first:
- try and use static import where possible.
- suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages.
- "ParallelCore" is not a very descriptive name. Suggest "streams".
- suggest moving the data classes and XML parsing code to a separate package.
- Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that.
More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments.
Conversion --
A more compact form of mostTProductsByCategory (without comments) is:
public static <T extends Comparable<T>> Product[] mostTProductsByCategory( Function<Product, T> func){ Map<String, Optional<Product>> m = products.stream(). collect(groupingBy(Product::getCategory, maxBy(comparing(func))));
return m.values().stream(). map(Optional::get). toArray(Product[]::new); }
i.e. show the Map rather than the Collector.
A DRYer form:
public static Collection<String> countries(boolean ordered) { Stream<String> countries = customers.stream().map(Customer::getCountry);
if (ordered) { return countries.distinct().collect(toCollection(LinkedList::new)); } else { return countries.collect(Collectors.<String>toSet());
} }
Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value.
Elements --
Simpler form. The function to apply the terminal op obfuscates:
public static Optional<Supplier> suppliersInCountry(boolean findAny, String country) { Stream<Supplier> s = suppliers.stream(). //filter supplier who is same sa given country filter(supplier -> country. equals(supplier.getCountry()));
return findAny ? s.findAny() : s.findFirst(); }
The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0:
public static Map<String, Product> inStockProductsByCategory( boolean findAny) { BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null) ? l : (r != null && r.getUnitsInStock() > 0) ? r : null; return products.stream().collect( groupingBy(Product::getCategory, reducing(null, reduceToFirstMatch)));
}
The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate
Grouping --
public static Map<String, List<Order>> groupOrdersByCustomer() { //a collector that generate a order list from a customer Collector<Customer, List<Order>, List<Order>> collectOrder = Collector. of(ArrayList<Order>::new, (orders, customer) -> { orders.addAll(customer.getOrders()); }, (left, right) -> { left.addAll(right); return left; }); return customers.parallelStream(). //convert customers to a Map which key is it name, value is its //orders collect(Collectors. groupingBy(Customer::getCompanyName, collectOrder)); }
Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage?
Can simplify:
public static <S, T extends Comparable<T>> Map<S, Optional<Product>> productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc, Function<Product, T> compFunc){ //Comparator of Product which will compare on T by given function Comparator<Product> comp = Comparator.comparing(compFunc);
return products.stream(). //collect products into a Map, which key is S by function of //groupFunc, value is max or min by given max and function of //compFunc collect(groupingBy( groupFunc, maxBy ? maxBy(comp) : minBy(comp))); }
Simplify:
public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT( Function<Order, R> func1,Function<Order, T> func2){ return customers.stream(). //map every customer to orders flatMap(customer -> customer.getOrders().stream()). //group products to Map<R, T> by given two functions collect(groupingBy(func1, groupingBy(func2))); }
Selection --
public static Map<Customer,Double> bigOrderCustomers(double bigValue) { Function<Customer, Double> orderTotal = c -> c.getOrders().stream(). //map order to order price mapToDouble(Order::getTotal). //calculate total price sum(); //A predicate which filter customer whose total order price is greater //than bigValue Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue; return customers.parallelStream(). //filter customer whose total order price is greater than bigValue filter(bigOrder). //grouping customer to a Map which key is customer, value is //total order price collect( Collectors.toMap(Function.identity(), orderTotal)); }
This is essentially performing the sum twice on each element that passes through the filter.
Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on.
It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match.
Subset --
public static List<Order> firstNOrdersFromState(int number, String state, Consumer<Order> action){ return customers.parallelStream(). //only take customers from a particular state filter(c -> state.equals(c.getRegion())). //get orders of those customers flatMap(c -> c.getOrders().stream()). //get first number of orders substream(0, number). //apply the supplied action peek(action). //collect to a list collect(Collectors.toList()); }
Replace substream(0, number) with limit(number).
The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer.
Tabulate --
This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful.
It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it.
For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair:
Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect( // group suppliers by their country Collectors.groupingBy(Supplier::getCountry));
Map<Customer, List<Supplier>> s = customers.stream().collect( Collectors.toMap(Function::identity, c -> countryToSuppliers.getOrDefault(c.getCountry(), null)));
Simplify:
public static Set<String> bothCustomerAndSupplier() { Set<String> suppliersName = suppliers.stream(). map(Supplier::getSupplierName). collect(toSet());
return customers.stream(). map(Customer::getCompanyName). flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()). collect(toSet()); }
Hth, Paul.
On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan@oracle.com> wrote:
Hi all
Could you please help to review the demo code for parallel core collection API?
http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
Thank you very much
Tristan Yan(Haibo Yan)
Office : 8610-61066212
Fax : 8610-61065441
Cell : 86-18610696822
2F, Building No. 24, Zhongguancun Software Park
Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
oracle
participants (2)
-
Paul Sandoz
-
Tristan Yan