JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher
Chris Hegarty
chris.hegarty at oracle.com
Tue Dec 12 11:13:02 UTC 2017
I filed the following JIRA issue to track this discussion
and proposal.
https://bugs.openjdk.java.net/browse/JDK-8193365
I'll start the process of bringing it into 10, unless there
are any final comments. FTR, I'm happy where we ended up on
this.
-Chris.
On 11/12/17 15:48, Chris Hegarty wrote:
> James,
>
> On 11/12/17 00:47, James Roper wrote:
> > Hi Chris,
> >
> > This looks like a straight forward way to solve the problem with minimal
> > disruption from the existing API. Can I make a few suggestions though?
>
> Of course, your input here is much appreciated.
>
> > We could add a contentLength parameter to fromPublisher, to allow
> > Flow.Publishers where the content length is known to be easily converted
> > to BodyPublisher:
> >
> > static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher,
> > int contentLength) {
> > ...
> > }
>
> Good idea. Added ( as can be seen below ).
>
> > This would mean if you were receiving a servlet request body and
> > publishing it to another location, then you could do something like this
> > (this uses a reactive streams implementation on top of the servlet API
> > that I wrote):
> >
> > HttpServletRequest request = ...
> > long contentLength = -1;
> > if (request.getHeader("Content-Length") != null) {
> > contentLength = Long.parseLong(request.getHeader("Content-Length"));
> > }
> > Publisher<ByteBuffer> publisher = new
> > RequestPublisher(request.startAsync(), 8192);
> >
> > HttpRequest clientRequest = HttpRequest.newBuilder(target)
> > .POST(BodyPublisher.fromPublisher(publisher, contentLength))
> > .build()
>
> Nice.
>
> > Perhaps the method could be overloaded for both supplying and not
> > supplying a content length.
>
> I think an overload is justified here. Added.
>
> > Similarly, I think a fromSubscriber API that accepted a
> > CompletionStage<T> would be a little more fluent than having to supply
> > it externally:
>
> Daniel and I discussed this too, but I opted to leave it out initially
> for simplicity. I think if we have two overloads, then the simple case
> can still be supported with little ceremony, while allowing a more
> powerful variant.
>
> > public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super
> > List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) {
> > ...
> > }
> >
> > Then you could have something like this:
> >
> > TextSubscriber subscriber = ...; // accumulates bytes and transforms
> > them into a CompletionStage<String>.
> > CompletionStage<String> result = subscriber.getTextResult();
> >
> > CompletableFuture<String> cf = client
> > .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
> > String text = cf.join();
> >
> > Likewise, this could be an overload of fromSubscriber if we want the
> > option of not specifying a body future.
> >
> > One thing I think needs to be carefully specified is, if the method
> > doesn't accept a CompletionStage, when/how the CompletionStage returned
> > from send is redeemed.
>
> Hmmm... this could be tricky. I think we can avoid the scenario
> completely by accepting a finishing function that can generate/return
> the value to use for completion, rather than the CF itself.
>
> Here is an outline of all of the above:
>
> public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {
>
> /**
> * Returns a request body publisher whose body is retrieved from the
> * given {@code Flow.Publisher}. The returned request body publisher
> * has an unknown content length.
> *
> * @apiNote This method can be used as an adapter between {@code
> * BodyPublisher} and {@code Flow.Publisher}, where the amount of
> * request body that the publisher will publish is unknown.
> *
> * @param publisher the publisher responsible for publishing the body
> * @return a BodyPublisher
> */
> static BodyPublisher fromPublisher(Flow.Publisher<? extends
> ByteBuffer> publisher) { ... }
>
> /**
> * Returns a request body publisher whose body is retrieved from the
> * given {@code Flow.Publisher}. The returned request body publisher
> * has the given content length.
> *
> * <p> The given {@code contentLength} is a positive number, that
> * represents the exact amount of bytes the {@code publisher} must
> * publish.
> *
> * @apiNote This method can be used as an adapter between {@code
> * BodyPublisher} and {@code Flow.Publisher}, where the amount of
> * request body that the publisher will publish is known.
> *
> * @param publisher the publisher responsible for publishing the body
> * @param contentLength a positive number representing the exact
> * amount of bytes the publisher will publish
> * @throws IllegalArgumentException if the content length is
> * non-positive
> * @return a BodyPublisher
> */
> static BodyPublisher fromPublisher(Flow.Publisher<? extends
> ByteBuffer> publisher,
> long contentLength) { ... }
>
>
> public interface BodyHandler<T> {
>
> /**
> * Returns a response body handler that returns a {@link
> BodySubscriber
> * BodySubscriber}{@code <Void>} obtained from {@linkplain
> * BodySubscriber#fromSubscriber(Subscriber)}, with the given
> * {@code subscriber}.
> *
> * <p> The response body is not available through this, or the {@code
> * HttpResponse} API, but instead all response body is forwarded to
> the
> * given {@code subscriber}, which should make it available, if
> * appropriate, through some other mechanism, e.g. an entry in a
> * database, etc.
> *
> * @apiNote This method can be used as an adapter between {@code
> * BodySubscriber} and {@code Flow.Subscriber}.
> *
> * <p> For example:
> * <pre> {@code
> * TextSubscriber subscriber = new TextSubscriber();
> * HttpResponse<Void> response = client.sendAsync(request,
> * BodyHandler.fromSubscriber(subscriber)).join();
> * System.out.println(response.statusCode());
> * }</pre>
> *
> * @param subscriber the subscriber
> * @return a response body handler
> */
> public static BodyHandler<Void>
> fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
> ... }
>
> /**
> * Returns a response body handler that returns a {@link
> BodySubscriber
> * BodySubscriber}{@code <S,T>} obtained from {@link
> * BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
> * given {@code subscriber} and {@code finisher} function.
> *
> * <p> The given {@code finisher} function is applied after the given
> * subscriber's {@code onComplete} has been invoked. The {@code
> finisher}
> * function is invoked with the given subscriber, and returns a value
> * that is set as the response's body.
> *
> * @apiNote This method can be used as an adapter between {@code
> * BodySubscriber} and {@code Flow.Subscriber}.
> *
> * <p> For example:
> * <pre> {@code
> * TextSubscriber subscriber = ...; // accumulates bytes and
> transforms them into a String.*
> * HttpResponse<String> response = client.sendAsync(request,
> * BodyHandler.fromSubscriber(subscriber,
> TextSubscriber::getTextResult)).join();
> * String text = response.body();
> * }</pre>
> *
> * @param <S> the type of the Subscriber
> * @param <T> the type of the response body
> * @param subscriber the subscriber
> * @param finisher a function to be applied after the subscriber
> has completed
> * @return a response body handler
> */
> public static <S extends Subscriber<? super List<ByteBuffer>>,T>
> BodyHandler<T>
> fromSubscriber(S subscriber, Function<S,T> finisher) { ... }
>
>
> // And a similar pair of BodySubscriber methods, omitted for brevity.
>
>
> -Chris.
More information about the net-dev
mailing list