RFR 8184285: Buffer sizes of Flow based BodyProcessor API

Tobias Thierer tobiast at google.com
Fri Aug 18 17:45:19 UTC 2017

Hi Michael & Chris -

apologies for the slow follow-up. I couldn't get my IDE (IntelliJ) to
accept a locally built OpenJDK 9 into which I had patched your proposed
changes ("The selected directory is not a valid home for the JDK"). Not
your problem to solve, but it led me to procrastinate a little because I
had to run command lines to compile & see failures. Also, note that I had
only patched initial your change rather than Chris's later revision, but
from quick inspection it looks like that doesn't affect the comments below.

I've run into one limitation with the new List<ByteBuffer> based approach
that I wasn't aware of when I wrote my initial reaction. It's not
necessarily a deal-breaker (BufferingProcessor is still useful), but I
wanted to mention it. I also have an idea that would allow us to go back to
ByteBuffer (rather than List<ByteBuffer>) being the unit of data that's
passed through the subscription, without losing any flexibility/capability
of the API.

===== Limitation of the new API

The two goals that I expected your change to achieve is:

   1. Give an application control over the size of the data chunks that it
   has to process at a time, and
   2. Give an application control (lower/upper bound) on how many bytes, as
   opposed to how many ByteBuffers, are being held in memory.

I only realized today that your change actually only achieves the first
goal, but not the second. I also had an idea how the first goal could be
achieved without changing the unit of data from ByteBuffer to
List<ByteBuffer> (see more below).

The issue with the second goal is that while the
new BodyProcessor.buffering() API gives the application control over the
size of the ByteBuffers delivered to it, it doesn't give it control over
the number of bytes buffered, because it doesn't know how long a
List<ByteBuffer> will be delivered to it on each call to onNext().

Note that both before and after your proposed change, an application can
achieve the state where a request for more data is issued immediately when
the number of bytes buffered drops below a *lower* bound; it just can't
stop the system from giving it more data than it'd like (upper bound). This
is true both before and after your change.

For example, I've adjusted my example of a PipedResponseStream to a new
API. The way I've implemented the lower bound is:

   - Previously
   it started with subscription.request(initialBuffersToRequest) and
   followed that up with subscription.request(1) each time a buffer was
   cleared from the queue.
   - I now changed it to keep track of whether there is currently a request
   outstanding (the time between subscription.request(1) and the corresponding
   onNext()). Everytime there is (a) no request outstanding, and (b)
   buffers.size < numByteBuffersToBuffer, a new subscription.request(1) is
   made. This can happen at three different times: 1.) during onSubscribe(),
   2.) when on request completes in onNext() but we discover that we still
   have too little data, and 3.) during take() when a ByteBuffer is taken out
   of the internal buffer. Of course, if the latency between onRequest(1) and
   onNext() is too high, it could be that the internal buffer runs out and we
   don't keep up refilling it.

I can ask Martin to upload the latest code to his workspace if you like,
but I suspect you get the idea.

===== How to change back to the old API without losing the new capability

I think it's not actually necessary to change from ByteBuffer to
List<ByteBuffer> in order to achieve goal (1.), i.e. the fixed ByteBuffer
size delivered by BufferingProcessor.

Here's how:

   - BufferingProcessor keeps a Queue<ByteBuffer> internally, instead of
   passing the whole List to the delegate BodyProcessor.
   - When the delegate calls request(n), BufferingProcessor calls
   onNext(ByteBuffer) n times, supplying ByteBuffers from its internal Queue.
   - When the internal queue size gets small, BufferingProcessor calls
   request(1) (or request() with some value > 1) on *its* subscription to
   get more data to feed into its Queue<ByteBuffer>.


   - Obviously, BufferingProcessor will run the risk of running out of data
   buffered internally if the delegate processor is requesting the data too
   quickly. And obviously, the BufferingProcessor has a hard time deciding on
   the correct number *n* to pass to request(*n*) on its subscription. But,
   because BufferingProcessor is part of the HTTP Client implementation, it is
   in a much better position than the application to know about internal
   buffer sizes, have some heuristic to determine *n* based on that ration
   between that internal buffer size and the chunk size requested by the
   delegate BodyProcessor, and potentially (in a sophistication improvement)
   measure throughput / latency to make a guess as to how much more data it
   should request how early on.
   - Also obviously, the application would have no control over how much
   extra data will be used up by the BufferingProcessor. But that's not too
   different from how the application currently has no control over how many
   extra ByteBuffers will be delivered by the system during
   onNext(List<ByteBuffer>). And again, BufferingProessor is in a better
   position to deal with this than the application because it can hard code
   more knowledge about the rest of the implementation.
   - By going back from List<ByteBuffer> to ByteBuffer, everyone who
   implements BodyProcessor is saved the onerous task of doing the "for
   (BodyProcessor item : items) { ... }" loop in onNext().



On Thu, Aug 3, 2017 at 6:02 PM, Michael McMahon <
michael.x.mcmahon at oracle.com> wrote:

> Hi,
> The HTTP client work is continuing in a new branch of the JDK 10 sandbox
> forest (http-client-branch),
> and here is the first of a number of changes we want to make.
> This one is to address the feedback we received where
> HttpResponse.BodyProcessors would
> be easier to implement if there was control over the size of buffers being
> supplied.
> To that end we have added APIs for creating buffered response processors
> (and handlers)
> So, HttpResponse.BodyProcessor has a new static method with the following
> signature
> public static <T> BodyProcessor<T> buffering(BodyProcessor<T> downstream,
> long buffersize) {}
> This returns a new processor which delivers data to the supplied
> downstream processor, buffered
> by the 'buffersize' parameter. It guarantees that all data is delivered in
> chunks of that size
> until the final chunk, which may be smaller.
> This should allow other BodyProcessor implementations that require
> buffering to wrap themselves
> in this way, be guaranteed that the data they receive is buffered, and
> then return that composite
> processor to their user.
> A similar method is added to HttpResponse.BodyHandler.
> Note also, that we have changed HttpResponse.BodyProcessor from being a
> Flow.Subscriber<ByteBuffer>
> to Flow.Subscriber(List<ByteBuffer>). That change is technically
> orthogonal to this one, but is motivated
> by it. By transferring ByteBuffers in lists makes it easier to buffer them
> efficiently.
> The webrev is at: http://cr.openjdk.java.net/~michaelm/8184285/webrev.1/
> Thanks,
> Michael
