EnumeratedStream

ІП-24 Олександр Ротань rotan.olexandr at gmail.com
Sun Apr 21 00:12:58 UTC 2024


I am sorry, but I feel like I am missing the point of your response and how
is it related to what I have said.

Regarding wrapping and unwrapping indexed pairs, one of advantages of the
approach I have suggested is that EnumeratedStream is still a stream and
all index-unaware ops could still be applied to it

On Sun, Apr 21, 2024, 03:08 - <liangchenblue at gmail.com> wrote:

> We must convert index-processing operations to a `gather(Gatherers.scan(/*
> index gathering */))` immediate before the operation that uses the index,
> and immediately unwrap the indices afterwards.
>
> Syntactically writing such 3 lines for every index-aware operation would
> be weird; I actually wonder if we can find a way to convert a
> Gatherer<Indexed<T>, ?, R> into a Gatherer<T, ?, R>; the nested gatherer
> itself will receive the enumerated values. I don't know if such a way to
> compose Gatherer has been studied, but this would allow us to define all
> indexed operations as gatherers instead of having to make new
> BiConsumers for indices. This would be interesting to look into. So the
> ideal form would be:
>
> BiPredicate<Integer, T> filter = ...
> Gatherer<Indexed<T>, ?, R> filterGatherer = Gatherer.of((_, indexed, sink)
> -> return (!filter.test(indexed.index(), indexed.value())) ||
> sink.push(indexed.value()));
> Gatherer<T, ?, R> wrapped = IndexedGatherer.wrap(filterGatherer); // a
> factory method performing wrapping
> return stream.gather(wrapped) // and more
>
> On Sat, Apr 20, 2024 at 6:47 PM ІП-24 Олександр Ротань <
> rotan.olexandr at gmail.com> wrote:
>
>> I would like to correct myself a bit: indexes should be assigned at the
>> moment an element is received from upstream, not when it is passed to
>> processing. Not sure if it will effectively change order of execution in
>> actual implementation, but to put it this way would be more precise
>>
>> вс, 21 апр. 2024 г. в 02:33, ІП-24 Олександр Ротань <
>> rotan.olexandr at gmail.com>:
>>
>>> Hello again.
>>>
>>> I have imagined implementation of enumerated streams in a slightly
>>> different way. I think enumerated streams should be a separate kind of
>>> stream like parallel, sequential or some primitive stream. This way
>>> enumeration could be deferred to the time that index is being consumed,
>>> This would eliminate issue with stream merging and also kind of smoothen
>>> the issue with parallel streams, as the indexed will follow the order
>>> elements have been passed to processing.
>>>
>>> This could potentially lead to another issue with consistency of indexes
>>> through stream pipeline, but there could be a workaround like
>>> IdentityHashMaps that is received from previous enumerated streams.
>>>
>>> This way we could make streams more efficient and also introduce a more
>>> user-friendly API, which is essentially a win-win situation.
>>>
>>> Regarding concatenation specifically, we could either introduce separate
>>> methods for merging enumerated streams, where we define how conflicts are
>>> resolved, or just treat them as usual streams, so the result of
>>> concatenation of enumerated streams will result in a normal stream that
>>> should be enumerated once again. Also i don't really see how gathering
>>> elements into index-value pairs solves issue with conflicting indexes
>>>
>>> вс, 21 апр. 2024 г. в 02:06, - <liangchenblue at gmail.com>:
>>>
>>>> Hi Oleksandr,
>>>> I fear that enumeration might not be applicable to all streams,
>>>> especially parallel ones. If we have a parallel stream, we might not always
>>>> process all elements in order, and even index generation can be unreliable.
>>>> In addition, stream merging will become a headache. I think
>>>> Gatherers.scan(() -> new Indexed<>(0, dummy), (indexed, value) -> new
>>>> Indexed<>(indexed.index() + 1, value)) can create a Stream<Indexed<V>>
>>>> which should serve your purpose.
>>>>
>>>> And for expanded operations for enumerated streams, there are similar
>>>> features for primitive streams already, where they have extra methods like
>>>> summaryStatistics() compared to object streams. We most likely will have
>>>> Gatherers that explicitly work on Stream<Indexed<V>> to support
>>>> type-specific operations, partly due to Java's type limits and that Stream
>>>> hierarchy is effectively closed.
>>>>
>>>> Best,
>>>> Chen Liang
>>>>
>>>> On Sat, Apr 20, 2024 at 4:07 PM ІП-24 Олександр Ротань <
>>>> rotan.olexandr at gmail.com> wrote:
>>>>
>>>>> My proposal regarding findIndex brought up a topic that, as I see, has
>>>>> been brought up here multiple times.
>>>>>
>>>>> My idea is to extend the existing stream interface in the new
>>>>> EnumeratedStream interface. Such an approach has a number of advantages.
>>>>>
>>>>> 1. Some methods will be able to explicitly specify that they require
>>>>> an enumerated stream.
>>>>>
>>>>> 2. This would allow us to use an enumerated stream just as the default
>>>>> value stream, if the index is redundant at a certain processing step.
>>>>>
>>>>> 3. This could help introduce a more user-friendly interface: instead
>>>>> of passing pairs of index and value, they could be passed as separate
>>>>> variables, which will make code more concise.
>>>>>
>>>>> Consider following example:
>>>>> List.of(1, 2, 3).stream()
>>>>>                 .enumerated()
>>>>>                 .map(idx, value -> idx % 2 == 0 ? value : -value);
>>>>>
>>>>> looks much better than
>>>>> List.of(1, 2, 3).stream()
>>>>>                 .enumerated()
>>>>>                 .map(pair -> pair.idx % 2 == 0 ? pair.value :
>>>>> -pair.value);
>>>>>
>>>>> However, I see some caveats with this approach, which relate to
>>>>> parallel streams:
>>>>> when a stream is backed by a collection, you might expect assigned
>>>>> indexes to represent order of iteration through collection, especially when
>>>>> talking about sequential collections. Assigning indexes in such streams
>>>>> could be a difficult task in a parallel environment. It should either
>>>>> assign index to a stream elements at the moment when stream becomes
>>>>> parallelized, which is also impossible if already parallelized stream is
>>>>> being enumerated, and also wont work for infinite streams, or make
>>>>> enumerated stream at least partially sequential by passing elements to
>>>>> processing in order they were passed to stream at the first place, which is
>>>>> also hard or maybe even impossible to achieve.
>>>>>
>>>>> Also, side note: methods that already accept 2 parameters might become
>>>>> kinda verbose, like reduce that will have to accept for params (idx1, val1,
>>>>> idx2, val2), but i think that is not a big deal
>>>>>
>>>>>
>>>>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20240421/345115f8/attachment.htm>


More information about the core-libs-dev mailing list