websockets
Pavel Rappo
pavel.rappo at oracle.com
Mon Feb 12 16:33:13 UTC 2018
Hello James,
Thanks for the comprehensive reply to Chuck's email. Now regarding your own email.
> On 10 Feb 2018, at 07:38, James Roper <james at lightbend.com> wrote:
>
> <snip>
>
> But that brings me to a problem that I'd like to give as feedback to the implementers - this API is not Reactive Streams, and so therefore can't take advantage of Reactive Streams implementations, and more problematic, can't interop with other Reactive Streams sinks/sources. If I wanted to stream a WebSocket into a message broker that supports Reactive Streams, I can't. I would definitely hope that Reactive Streams support could be added to this API, at a minimum as a wrapper, so that application developers can easily focus on their business problems, plumbing and transforming messages from one place to another, rather than having to deal with implementing concurrent code to pass messages.
>
I totally understand your concern over the lack of a Reactive Streams interface
to this low-level API. All I can say is that it's not that we haven't tried. As
usual the devil is in the detail. There were at least a couple of major issues
we couldn't find a satisfactory solution to.
The first one is how to communicate errors back to the user's code that
publishes messages to WebSocket. This issue has been extensively discussed here [1].
The only signal the publisher can receive from a subscriber in the case of a
failure is a cancellation signal. After this signal the subscription is
considered cancelled. Now, there are different solutions to this problem, but
none of the ones we looked at seemed comprehensive. For example, such errors
can be propagated by the WebSocket to the user's subscriber receiving messages.
In which case WebSocket input and output will become tied and the WebSocket
client will seem to require both the publisher and the subscriber to be present
at the same time.
The second issue is how to communicate completion signals from processing
individual messages. That might be handy in the case the implementation or the
application decide to recycle buffers they use. With a naive RS interface to
WebSocket all the user's publisher can receive from the WebSocket's subscriber
is a request for a number of extra messages. Using only this number the
publisher cannot deduce which of the previously published messages have been
actually sent. This problem also has a number of solutions. One of which would
be to use more streams. An extra publisher WebSocket would use to emit outcomes
of sending messages and an extra subscriber WebSocket would use to receive
signals from the user once a message has been received. To be honest it looks
cumbersome.
There are also a dozen of smaller issues that have to be resolved before
creating a well-defined RS interface to WebSocket.
> It may well require wrapping messages in a high level object - text, binary, ping, pong, etc, to differentiate between the message types.
>
We went great lengths in supporting different kinds of requirements in this API.
One of the such requirements that arose from a number of discussions on this
mailing list was not to force unnecessary allocations, garbage creation and
copying. To this end we utilised completion stages for send/receive operations [2].
We abandoned types for messages and used a method-naming scheme instead (I
believe with RS the equivalent would be to provide a stream per message type).
Even WebSocket.Listener was designed such that one instance could (if required)
service many WebSocket instances. That's the reason each method in Listener has
a WebSocket argument.
> <snip>
>
> https://developer.lightbend.com/blog/2018-02-06-reactive-streams-ee4j/index.html
>
Thanks for the link.
That said, I think once we have satisfactory answers to the questions above,
we might provide an RS adapter you were talking about to this low-level API.
Meanwhile it is possible to built one already, on top of the existing API even
though it is not the same as RS, semantically they are very similar.
Thanks,
-Pavel
---------------------------------------------------
[1] https://github.com/reactive-streams/reactive-streams-jvm/issues/271
[2] At one point we were even considering using callbacks similar to
java.nio.channels.CompletionHandler instead of java.util.concurrent.CompletionStage
for both sending and receiving messages. All this is for the sake of not
creating extra objects when this is considered expensive. Who knows, we
might come back to retrofit the API and add this capability later.
More information about the net-dev
mailing list