RFR: Add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream [v10]
Rémi Forax
forax at openjdk.org
Mon Sep 4 17:20:17 UTC 2023
> This is a minimal patch that adds a new subclass of StructuredTaskScope named Stremable (better name needed) pushing failed/succceding subtasks into a Stream.
>
> This subclass aim to:
> - make easier for users to use STS without having to override handleCompleted, which is called concurrently so hard to get right, at a price of being a little less efficient
> - ease the implementation of shortcuited stream semantics like get the first two values, get the first value greater than a threshold, etc by auto shutdowning the STS once the condition is true
>
> The Streamable STS adds two new methods joinWhile/joinUntilWhile(function) that takes a function that takes a Stream and return a value
>
> public <U> U joinWhile(Function<? super Stream<Subtask<T>>, ? extends U> mapper) throws InterruptedException {
>
> When this method is called, each finished subtask (with state SUCCESS or FAILED) are pushed into the Stream until there is no more subtasks, the stream has finished (has been short-circuited), the scope has been shutdown, interrupted or the dealine occurs. If some tasks are still pending because the stream has been short-cirtuited, they are shutdown.
>
> Here are two examples:
> - get a list of all the values that suceeed
>
> try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
> streamable.fork(() -> {
> Thread.sleep(200);
> return 12;
> });
> streamable.fork(() -> {
> Thread.sleep(100);
> return 17;
> });
> List<Integer> list = streamable.joinWhile(stream -> stream.filter(task -> task.state() == State.SUCCESS).map(Subtask::get).toList());
> System.out.println(list); // [17, 12]
> }
>
> - find the first subtask (that suceed or fail)
>
> try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
> streamable.fork(() -> {
> Thread.sleep(1_000);
> return 12;
> });
> streamable.fork(() -> {
> Thread.sleep(100);
> return 17;
> });
> Optional<Subtask<Integer>> first = streamable.joinWhile(Stream::findFirst);
> System.out.println(first); // Optional[PlainSubTask[state=SUCCESS, result=17, exception=null]]
> }
>
> Internally, handleCompleted post each subtask into a queue which is read by the Stream spliterator inside joinWhile.
>
> The current implementation uses thread flock methods ThreadFlock.awaitAll()/ThreadFlock.w...
Rémi Forax has updated the pull request incrementally with one additional commit since the last revision:
Use a LinkedTransferQueue instead of the ThreadFlock
-------------
Changes:
- all: https://git.openjdk.org/loom/pull/202/files
- new: https://git.openjdk.org/loom/pull/202/files/47baa324..1f04c988
Webrevs:
- full: https://webrevs.openjdk.org/?repo=loom&pr=202&range=09
- incr: https://webrevs.openjdk.org/?repo=loom&pr=202&range=08-09
Stats: 72 lines in 1 file changed: 28 ins; 19 del; 25 mod
Patch: https://git.openjdk.org/loom/pull/202.diff
Fetch: git fetch https://git.openjdk.org/loom.git pull/202/head:pull/202
PR: https://git.openjdk.org/loom/pull/202
More information about the loom-dev
mailing list