Reactive Streams utility API

James Roper james at lightbend.com
Tue Sep 25 04:33:58 UTC 2018


Hi Pavel,

MicroProfile doesn't do implementations, or even have a concept of a
reference implementation, which is why that executor isn't there. It is
however in one of the implementations of the spec that we've created at
Lightbend:

https://github.com/lightbend/microprofile-reactive-streams/blob/master/zerodep/src/main/java/com/lightbend/microprofile/reactive/streams/zerodep/MutexExecutor.java

This implementation is being called a "zero dependency" implementation,
since it doesn't bring in any other dependencies, unlike the implementation
that we are going to use ourselves, which uses Akka Streams. The zerodep
implementation isn't production ready, our main reason for providing it is
as a candidate starting point for a reference implementation in the JDK.
But I also don't think it's that bad an implementation, it's just that
unlike the Akka Streams based implementation (or any of the other
implementations, eg RedHat's one built on RxJava and Vert.x), it has not
been proven through any real world use.

As for the MutexExecutor itself, that was mostly written by Viktor Klang,
and I believe he wrote it based on his experience implementing similar
constructs for Akka mailboxes. There is one major problem with it that I'm
aware of - it's not fair on the underlying executor. If you submit tasks at
an equal or higher rate than can be processed by a single thread, the
executor will never return the thread it uses to the underlying executor. I
don't think that's hard to fix - we could limit the number of sequential
tasks it does on a thread before resubmitting to the underlying executor.

A slightly different incarnation of this problem is when each task invoked
resubmits another task, for example, using the current reactive streams
API, if I did ReactiveStreams.generate(() ->
"foo").forEach(System.out::println), that is by design effectively an
infinite loop that prints out foo, but being an asynchronous API it
shouldn't actually be an infinite loop, it should return the thread back to
the underlying executor at least periodically to allow that thread to be
used for other tasks queued on the executor, but it doesn't do that.

But this issue (and some similar issues that may exist) we haven't begun to
consider addressing, primarily because fixing it requires selecting some
magic numbers for limits on work to do, and they can't be selected without
some realistic benchmarks being created to tune them to, and we're just not
ready to take this implementation to that level, it could change very
significantly which would change all the assumptions before it's ready to
be used.

Cheers,

James

On Fri, 21 Sep 2018 at 21:16, Pavel Rappo <pavel.rappo at oracle.com> wrote:

> Hi James,
>
> It sounds like the project is being very active. Glad to hear that! No
> matter
> how good a mailing list is, in practice, it cannot be the best mailing
> list for
> every topic there is. core-libs-dev is no exception. It's good to see
> you've
> found another way to make progress on the project.
>
> The question I was going to ask you was on the implementation internals
>
>     org.reactivestreams.utils.impl.MutexExecutor
>
> However, after examining the new repository, I found that not only has the
> code
> in question disappeared, but the structure of the project has changed as
> well.
>
> Will the API have the default implementation (ReactiveStreamsEngine)?
>
> -Pavel
>
>

-- 
*James Roper*
*Senior Developer, Office of the CTO*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>


More information about the core-libs-dev mailing list