RFR [11] 8197564: HTTP Client implementation - JEP 321

James Roper james at lightbend.com
Thu Mar 29 04:04:28 UTC 2018

On 28 March 2018 at 18:50, Wenbo Zhu <wenboz at google.com> wrote:

>>> Any useful RS support will have to involve some application-level
>>> protocols.
>> I completely disagree with this statement. As someone who has been
>> involved in/led the implementation of multiple WebSocket clients and
>> servers (including Play Framework, Lagom Framework and Akka HTTP), I can
>> tell you from my experience that after the initial WebSocket handshake,
>> WebSockets can be offered to application developers as just streams of
>> messages using Reactive Streams, without any additional WebSocket protocol
>> exposure, and still be useful for 99% of use cases. It certainly does not
>> have to involve exposing the protocol to the application, most applications
>> care nothing about message fragmentation, they care nothing about close
>> codes or reasons beyond whether the connection was terminated with a
>> success or error, and they care nothing about pings/pongs. Everything that
>> they do care about, Reactive Streams gives a way of expressing.
> I don't see the role of RS if the WS wire protocol doesn't support any
> signaling for the receiver to notify the sender to "slow down", except to
> respect the basic TCP flow-control.
> I am reasonably familiar with the IETF WS spec (
> https://tools.ietf.org/html/rfc6455) ... and it would be useful if you
> could point out how exactly flow-control is supposed to work with RS on top
> of Websockets.

I'm not sure how familiar you are with Reactive Streams, so let me first
start by explaining its backpressure mechanism.

RS backpressure works by a Subscriber having a Subscription object.
Subscription provides a method request(n: Long). A Publisher may only
invoke a Subscribers onNext method as many times as the number that has
been supplied to request. So, if request(8) has been invoked, then the
Publisher is allowed to invoke onNext 8 times with 8 elements. The
outstanding demand is cumulative, so if request(8) is invoked, then after
that request(4) has been invoked, then altogether, the publisher is allowed
to invoke onNext 12 times. So backpressure is implemented in RS by virtue
of a Subscriber letting the outstanding demand reach 0, and not invoking
request again until it's ready to receive more elements.

When a Publisher reaches this state, where outstanding demand is zero, it
will stop consuming data from its source of data. In the case of
WebSockets, this will be a TCP socket. Assuming an NIO based
implementation, this will in practice mean that it will cancel the
selection key for that channels read interest. Now at this point it's worth
pointing out, there will be buffers involved along the way, there may be
partially decoded WebSocket frames sitting in a buffer, there may be more
fully decoded WebSocket frames ready to be emitted but can't because demand
is zero. So, there is buffering done here, deregistering interest in
reading doesn't result in immediate backpressure propagation. But
eventually, the OS receive buffer will fill up, and the OS will then send
an ack with a receive window of zero. This will prevent the remote system
from sending any more data.

Now, how that then impacts the WebSocket endpoint on the remote system
depends on that endpoints implementation. If it's using blocking IO, then,
once the remote OS's send buffer is full, any calls to send new messages
will block. Thus, back pressure will have been propagated to the remote
WebSocket endpoint. If it's using non blocking IO with a backpressure
mechanism, for example, if it's using Reactive Streams, then the subscriber
is going to stop requesting demand. Demand will reach zero, causing the
publisher to be unable to send more data. The publisher will then translate
this to whatever backpressure mechanism its source of data represents,
perhaps its receiving messages from a message queue, it's going to stop
consuming those messages. Thus, backpressure will have been propagated.
Another case is that it might be using non blocking IO with no backpressure
mechanism - this is unfortunately the case for many WebSocket
implementations, including web browsers, and the Java EE WebSocket API. In
that case, the WebSocket implementation there has two options, buffer,
drop, or fail, or a combination of two of those. If it buffers, it will
eventually run out of memory. Probably the most common implementation is to
buffer up to a point, and then fail. In such a case - back pressure has
been propagated, though not gracefully, the result is a failure. But the
important thing to realise here is that the side that is receiving messages
has been able to use backpressure to protect itself from running out of
memory - without a backpressure mechanism, there would have been no tcp
backpressure, and it would have had to have accepted data at the rate
produced by the remote end, potentially buffering and running out of
memory, or having to fail.

So, as you can see, WebSockets don't need an explicit backpressure
mechanism in the protocol, they can piggy back off TCP. And Reactive
Streams can use this to ensure a consumer of a WebSocket stream on one end
can push back on the producer at the other end, and have that push back
propagated all the way through, albeit with a certain amount of buffering
delaying the push back from happening immediately.

>> > with the understanding that such a helper will limit the capabilities
>>>> and performance when its used - eg, the range of options for closing, range
>>>> of options for how errors are handled, and perhaps introduce some small
>>>> performance penalties such as additional allocations and/or buffer copies.
>>>> That is a whole set of comprises and decisions that would need to
>>>> be considered.
>>>> > The use case here is how well this API will interact with other APIs.
>>>> WebSockets are rarely used in isolation from other technologies. Most non
>>>> trivial usages of WebSockets will involve plumbing the WebSocket API to
>>>> some other source or sink of streaming data - for example, a message
>>>> broker, a database, perhaps a gRPC stream to another service.
>>>> I could well image a set of vertical-market specific libraries being
>>>> built
>>>> on top of the WebSocket API, and that is how it should be. In this
>>>> specific area, providing the lowest level API is an enabler for others.
>>>> > The question is, how difficult will it be to connect those two APIs?
>>>> I imagine some work may be required, but certainly much less than
>>>> without the WebSocket API.
>>>> > Will their backpressure mechanisms map to each other easily?
>>>> I see no reason that it should not. Are you aware of any?
>>>> >  Will their failure handling and error handling semantics map to each
>>>> other easily?
>>>> I see no reason that they should not. In fact, most higher-level APIs
>>>> provide more coarse grain error handling.
>>>> > How many lines of boilerplate will it take to hook them up?
>>>> I guess that depends on the particular library's functionality, will it
>>>> support/handle/expose partial message delivery, buffering, etc. I
>>>> don’t see this as boilerplate though.
>>>> > If the WebSocket provides an option to use Reactive Streams, then for
>>>> any other sources/sink of streamed data that support Reactive Streams,
>>>> application developers will have an option where the answers are guaranteed
>>>> to be yes without the developer having to do any mapping themselves, and
>>>> the lines of code to do that will be one.
>>>> Sure, if one has a higher-level library that works with Reactive
>>>> Streams,
>>>> and one wants to use WebSocket as a transport, then it would be little
>>>> work if Java SE provided a Reactive Streams interface to WebSocket.
>>>> There are also vertical-markets using WebSocket, but not using Reactive
>>>> Streams.
>>>> At this point in JEP 321, I do not want to increase the scope of the
>>>> project
>>>> to include a Reactive Streams interface for WebSocket. This is something
>>>> that can, and should, be considered separate to JEP 321. It could follow
>>>> in a future release if deemed appropriate, or not, but adding it at
>>>> this point
>>>> will add risk to JEP 321, which I am not willing to do.
>>>> -Chris.
