Demo for Parallel Core Collection API
Paul Sandoz
paul.sandoz at oracle.com
Mon Oct 14 08:27:49 PDT 2013
Hi,
Some high-level points first:
- try and use static import where possible.
- suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages.
- "ParallelCore" is not a very descriptive name. Suggest "streams".
- suggest moving the data classes and XML parsing code to a separate package.
- Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that.
More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments.
Conversion
--
A more compact form of mostTProductsByCategory (without comments) is:
public static <T extends Comparable<T>> Product[] mostTProductsByCategory(
Function<Product, T> func){
Map<String, Optional<Product>> m = products.stream().
collect(groupingBy(Product::getCategory, maxBy(comparing(func))));
return m.values().stream().
map(Optional::get).
toArray(Product[]::new);
}
i.e. show the Map rather than the Collector.
A DRYer form:
public static Collection<String> countries(boolean ordered) {
Stream<String> countries = customers.stream().map(Customer::getCountry);
if (ordered) {
return countries.distinct().collect(toCollection(LinkedList::new));
}
else {
return countries.collect(Collectors.<String>toSet());
}
}
Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value.
Elements
--
Simpler form. The function to apply the terminal op obfuscates:
public static Optional<Supplier> suppliersInCountry(boolean findAny,
String country) {
Stream<Supplier> s = suppliers.stream().
//filter supplier who is same sa given country
filter(supplier -> country.
equals(supplier.getCountry()));
return findAny ? s.findAny() : s.findFirst();
}
The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0:
public static Map<String, Product> inStockProductsByCategory(
boolean findAny) {
BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null)
? l : (r != null && r.getUnitsInStock() > 0)
? r : null;
return products.stream().collect(
groupingBy(Product::getCategory,
reducing(null, reduceToFirstMatch)));
}
The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate
Grouping
--
public static Map<String, List<Order>> groupOrdersByCustomer() {
//a collector that generate a order list from a customer
Collector<Customer, List<Order>, List<Order>> collectOrder = Collector.
of(ArrayList<Order>::new,
(orders, customer) -> { orders.addAll(customer.getOrders()); },
(left, right) -> { left.addAll(right); return left; });
return customers.parallelStream().
//convert customers to a Map which key is it name, value is its
//orders
collect(Collectors.
groupingBy(Customer::getCompanyName,
collectOrder));
}
Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage?
Can simplify:
public static <S, T extends Comparable<T>> Map<S, Optional<Product>>
productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc,
Function<Product, T> compFunc){
//Comparator of Product which will compare on T by given function
Comparator<Product> comp = Comparator.comparing(compFunc);
return products.stream().
//collect products into a Map, which key is S by function of
//groupFunc, value is max or min by given max and function of
//compFunc
collect(groupingBy(
groupFunc,
maxBy ? maxBy(comp) : minBy(comp)));
}
Simplify:
public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT(
Function<Order, R> func1,Function<Order, T> func2){
return customers.stream().
//map every customer to orders
flatMap(customer -> customer.getOrders().stream()).
//group products to Map<R, T> by given two functions
collect(groupingBy(func1, groupingBy(func2)));
}
Selection
--
public static Map<Customer,Double> bigOrderCustomers(double bigValue) {
Function<Customer, Double> orderTotal =
c -> c.getOrders().stream().
//map order to order price
mapToDouble(Order::getTotal).
//calculate total price
sum();
//A predicate which filter customer whose total order price is greater
//than bigValue
Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue;
return customers.parallelStream().
//filter customer whose total order price is greater than bigValue
filter(bigOrder).
//grouping customer to a Map which key is customer, value is
//total order price
collect( Collectors.toMap(Function.identity(),
orderTotal));
}
This is essentially performing the sum twice on each element that passes through the filter.
Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on.
It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match.
Subset
--
public static List<Order> firstNOrdersFromState(int number,
String state, Consumer<Order> action){
return customers.parallelStream().
//only take customers from a particular state
filter(c -> state.equals(c.getRegion())).
//get orders of those customers
flatMap(c -> c.getOrders().stream()).
//get first number of orders
substream(0, number).
//apply the supplied action
peek(action).
//collect to a list
collect(Collectors.toList());
}
Replace substream(0, number) with limit(number).
The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer.
Tabulate
--
This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful.
It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it.
For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair:
Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect(
// group suppliers by their country
Collectors.groupingBy(Supplier::getCountry));
Map<Customer, List<Supplier>> s = customers.stream().collect(
Collectors.toMap(Function::identity,
c -> countryToSuppliers.getOrDefault(c.getCountry(), null)));
Simplify:
public static Set<String> bothCustomerAndSupplier() {
Set<String> suppliersName = suppliers.stream().
map(Supplier::getSupplierName).
collect(toSet());
return customers.stream().
map(Customer::getCompanyName).
flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()).
collect(toSet());
}
Hth,
Paul.
On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan at oracle.com> wrote:
> Hi all
>
>
>
> Could you please help to review the demo code for parallel core collection API?
>
> http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
>
>
>
> Thank you very much
>
>
>
>
>
> Tristan Yan(Haibo Yan)
>
> Office : 8610-61066212
>
> Fax : 8610-61065441
>
> Cell : 86-18610696822
>
>
>
> 2F, Building No. 24, Zhongguancun Software Park
>
> Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
>
> oracle
>
>
>
More information about the lambda-dev
mailing list