Demo for Parallel Core Collection API

Paul Sandoz paul.sandoz at oracle.com
Thu Oct 17 03:57:23 PDT 2013


Hi,

I notice in some grouping by cases that it appears as if you want to stream downstream.

Here is a simple example (not very interesting but it gets the point across):

    /**
     * A downstream collector that streams elements to a further downstream
     * collector.
     */
    public static <T, U, A, R>
    Collector<T, ?, R> streaming(Function<? super T, ? extends Stream<? extends U>> mapper,
                                 Collector<? super U, A, R> downstream) {
        BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return Collector.of(downstream.supplier(),
                            (r, t) -> mapper.apply(t).sequential().forEach(u -> downstreamAccumulator.accept(r, u)),
                            downstream.combiner(),
                            downstream.finisher(),
                            downstream.characteristics().stream().toArray(Collector.Characteristics[]::new));
    }

    public static void main(String[] args) throws Exception {
        Pattern dot = Pattern.compile("\\.");

        {
            Map<String, Long> m = System.getProperties().keySet().stream()
                    .map(Object::toString)
                    .collect(groupingBy(s -> s,
                                        streaming(dot::splitAsStream,
                                                  counting())));
            System.out.println(m);
        }


        {
            Map<String, List<String>> m = System.getProperties().keySet().stream()
                    .map(Object::toString)
                    .collect(groupingBy(s -> s,
                                        streaming(dot::splitAsStream,
                                                  toList())));
            System.out.println(m);
        }
    }

It behaves like Stream.flatMap.

If that collector is useful for you, then perhaps consider adding it to your demo as an advance use of Collector. Unfortunately it is probably too late to add such a collector to the API.

Paul.

On Oct 14, 2013, at 5:27 PM, Paul Sandoz <Paul.Sandoz at oracle.com> wrote:

> Hi,
> 
> Some high-level points first:
> 
> - try and use static import where possible.
> 
> - suggest that all streams are sequential. There is an inconsistency in the use and in some cases it is embedded in other stream usages.
> 
> - "ParallelCore" is not a very descriptive name. Suggest "streams".
> 
> - suggest moving the data classes and XML parsing code to a separate package.
> 
> - Unfortunately Supplier is overloaded with the functional interface in j.u.function. Not sure much could be done about that.
> 
> 
> More details below. I am not really commenting on the specific use-case, just the usages of the API itself, plus for brevity i have removed comments.
> 
> 
> Conversion
> --
> 
> A more compact form of mostTProductsByCategory (without comments) is:
> 
>    public static <T extends Comparable<T>> Product[] mostTProductsByCategory(
>            Function<Product, T> func){
>        Map<String, Optional<Product>> m =  products.stream().
>                collect(groupingBy(Product::getCategory, maxBy(comparing(func))));
> 
>        return m.values().stream().
>                map(Optional::get).
>                toArray(Product[]::new);
>    }
> 
> i.e. show the Map rather than the Collector.
> 
> 
> A DRYer form:
> 
>    public static Collection<String> countries(boolean ordered) {
>        Stream<String> countries = customers.stream().map(Customer::getCountry);
> 
>        if (ordered) {
>            return countries.distinct().collect(toCollection(LinkedList::new));
>        }
>        else {
>            return countries.collect(Collectors.<String>toSet());
> 
>        }
>    }
> 
> Shame that the type witness is required. For sequential streams is probably no advantage here to providing a link list over a list except for API educational value.
> 
> 
> Elements
> --
> 
> 
> Simpler form. The function to apply the terminal op obfuscates:
> 
>    public static Optional<Supplier> suppliersInCountry(boolean findAny,
>            String country) {
>        Stream<Supplier> s = suppliers.stream().
>                //filter supplier who is same sa given country
>                        filter(supplier -> country.
>                        equals(supplier.getCountry()));
> 
>        return findAny ? s.findAny() : s.findFirst();
>    } 
> 
> 
> The use of the collector is complicating matters. Off the top of my head what you require is a reducer that reduces to the first product whose stock units is > 0:
> 
>    public static Map<String, Product> inStockProductsByCategory(
>            boolean findAny) {
>        BinaryOperator<Product> reduceToFirstMatch = (l, r) -> (l != null)
>                                                            ? l : (r != null && r.getUnitsInStock() > 0)
>                                                              ? r : null;
>        return products.stream().collect(
>                groupingBy(Product::getCategory,
>                           reducing(null, reduceToFirstMatch)));
> 
>    }
> 
> The above relies on the associativity of the binary operator. There is no need to collect into a filtered list then stream with findAny/findFirst since we can reduce to the result as each element is received. That reduceToFirstMatch can easily be abstracted into a higher order function taking a predicate
> 
> 
> Grouping
> --
> 
>    public static Map<String, List<Order>> groupOrdersByCustomer() {
>        //a collector that generate a order list from a customer
>        Collector<Customer, List<Order>, List<Order>> collectOrder = Collector.
>                of(ArrayList<Order>::new, 
>                (orders, customer) -> { orders.addAll(customer.getOrders()); }, 
>                (left, right) -> { left.addAll(right); return left; });
>        return customers.parallelStream().
>                //convert customers to a Map which key is it name, value is its
>                //orders
>                collect(Collectors.
>                groupingBy(Customer::getCompanyName, 
>                        collectOrder));      
>    }
> 
> Not clear to me if there are multiple customers with the same name so the catamorphic collect may not be necessary and toMap can be used instead. Perhaps there is another way to show this collect usage?
> 
> 
> Can simplify:
> 
>    public static <S, T extends Comparable<T>> Map<S, Optional<Product>> 
>        productMaxByTGroupByS(boolean maxBy, Function<Product, S> groupFunc,
>                Function<Product, T> compFunc){
>        //Comparator of Product which will compare on T by given function
>        Comparator<Product> comp = Comparator.comparing(compFunc);
> 
>        return products.stream().
>                //collect products into a Map, which key is S by function of
>                //groupFunc, value is max or min by given max and function of
>                //compFunc
>                collect(groupingBy(
>                        groupFunc,
>                        maxBy ? maxBy(comp) : minBy(comp)));
>    }
> 
> 
> Simplify:
> 
>    public static <R, T> Map<R, Map<T, List<Order>>> ordersByRThenT(
>        Function<Order, R> func1,Function<Order, T> func2){
>        return customers.stream().
>                //map every customer to orders
>                flatMap(customer -> customer.getOrders().stream()).
>                //group products to Map<R, T> by given two functions
>                collect(groupingBy(func1, groupingBy(func2)));
>    }
> 
> 
> Selection
> --
> 
>    public static Map<Customer,Double> bigOrderCustomers(double bigValue) {
>        Function<Customer, Double> orderTotal = 
>                c -> c.getOrders().stream().
>                        //map order to order price
>                        mapToDouble(Order::getTotal).
>                        //calculate total price
>                        sum();
>        //A predicate which filter customer whose total order price is greater
>        //than bigValue
>        Predicate<Customer> bigOrder = c -> orderTotal.apply(c) >= bigValue;
>        return customers.parallelStream().
>                //filter customer whose total order price is greater than bigValue
>                filter(bigOrder).
>                //grouping customer to a Map which key is customer, value is 
>                //total order price
>                collect( Collectors.toMap(Function.identity(), 
>                        orderTotal));
>    }
> 
> This is essentially performing the sum twice on each element that passes through the filter.
> 
> Unfortunately we don't have tuples yet and that is what i think you really need to retain the calculation for use later on.
> 
> It might be simpler just to create a Map<Customer, Double> then iterate over entries and remove ones that do not match.
> 
> 
> Subset
> --
> 
>    public static List<Order>  firstNOrdersFromState(int number,
>            String state, Consumer<Order> action){
>        return customers.parallelStream().
>                //only take customers from a particular state
>                filter(c -> state.equals(c.getRegion())).
>                //get orders of those customers
>                flatMap(c -> c.getOrders().stream()).
>                //get first number of orders
>                substream(0, number).
>                //apply the supplied action
>                peek(action).
>                //collect to a list
>                collect(Collectors.toList());
>    }
> 
> Replace substream(0, number) with limit(number).
> 
> The use of peek here is setting a bad precedent, suggest changing to return void and using forEach or removing it. Same applies for topNCustomer.
> 
> 
> Tabulate
> --
> 
> This is another example of where tuples or MapStream (a stream for a tuple of 2 elements) could be useful.
> 
> It would be good to remove the Pair class if at all possible, if not it is best if containing within Tabulate as this is the only place that uses it.
> 
> 
> For sameCountryCustomerAndSuppliers, off the top of my head, it is possible to reformulate using two Maps and we do not need Pair:
> 
>        Map<String, List<Supplier>> countryToSuppliers = suppliers.stream().collect(
>                // group suppliers by their country
>                Collectors.groupingBy(Supplier::getCountry));
> 
>        Map<Customer, List<Supplier>> s = customers.stream().collect(
>                Collectors.toMap(Function::identity,
>                                 c -> countryToSuppliers.getOrDefault(c.getCountry(), null)));
> 
> 
> Simplify:
> 
>    public static Set<String> bothCustomerAndSupplier() {
>        Set<String> suppliersName = suppliers.stream().
>                map(Supplier::getSupplierName).
>                collect(toSet());
> 
>        return customers.stream().
>                map(Customer::getCompanyName).
>                flatMap(c -> suppliersName.contains(c) ? Stream.of(c) : Stream.empty()).
>                collect(toSet());
>    }
> 
> 
> 
> Hth,
> Paul.
> 
> On Oct 14, 2013, at 5:27 AM, Tristan Yan <tristan.yan at oracle.com> wrote:
> 
>> Hi all
>> 
>> 
>> 
>> Could you please help to review the demo code for parallel core collection API?
>> 
>> http://cr.openjdk.java.net/~pzhang/Tristan/8023555/webrev/
>> 
>> 
>> 
>> Thank you very much
>> 
>> 
>> 
>> 
>> 
>> Tristan Yan(Haibo Yan)
>> 
>> Office : 8610-61066212
>> 
>> Fax  : 8610-61065441
>> 
>> Cell  : 86-18610696822
>> 
>> 
>> 
>> 2F, Building No. 24, Zhongguancun Software Park
>> 
>> Haidian District HYPERLINK "http://people.us.oracle.com/pls/oracle/f?p=8000:6:396067987304343:::6:P6_CITY:Beijing"Beijing , 100193
>> 
>> oracle
>> 
>> 
>> 
> 
> 



More information about the lambda-dev mailing list