JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher
James Roper
james at lightbend.com
Mon Dec 11 00:47:22 UTC 2017
Hi Chris,
This looks like a straight forward way to solve the problem with minimal
disruption from the existing API. Can I make a few suggestions though?
We could add a contentLength parameter to fromPublisher, to allow
Flow.Publishers where the content length is known to be easily converted to
BodyPublisher:
static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher,
int contentLength) {
...
}
This would mean if you were receiving a servlet request body and publishing
it to another location, then you could do something like this (this uses a
reactive streams implementation on top of the servlet API that I wrote):
HttpServletRequest request = ...
long contentLength = -1;
if (request.getHeader("Content-Length") != null) {
contentLength = Long.parseLong(request.getHeader("Content-Length"));
}
Publisher<ByteBuffer> publisher = new RequestPublisher(request.startAsync(),
8192);
HttpRequest clientRequest = HttpRequest.newBuilder(target)
.POST(BodyPublisher.fromPublisher(publisher, contentLength))
.build()
Perhaps the method could be overloaded for both supplying and not supplying
a content length.
Similarly, I think a fromSubscriber API that accepted a CompletionStage<T>
would be a little more fluent than having to supply it externally:
public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super
List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) {
...
}
Then you could have something like this:
TextSubscriber subscriber = ...; // accumulates bytes and transforms them
into a CompletionStage<String>.
CompletionStage<String> result = subscriber.getTextResult();
CompletableFuture<String> cf = client
.sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
String text = cf.join();
Likewise, this could be an overload of fromSubscriber if we want the option
of not specifying a body future.
One thing I think needs to be carefully specified is, if the method doesn't
accept a CompletionStage, when/how the CompletionStage returned from send
is redeemed.
Regards,
James
On 9 December 2017 at 04:31, Chris Hegarty <chris.hegarty at oracle.com> wrote:
> James,
>
> Thanks for taking the time to look at this, and sending your thoughts.
>
> On 08/12/17 00:30, James Roper wrote:
> > Hi all,
> >
> > I wanted to start a discussion about the use of Flow.Subscriber and
> > Flow.Publisher in JEP 321 (HTTP Client API).
> >
> > It seems that users are required to implement their own publishers and
> > subscribers, that is, they can't take a Flow.Publisher or
> > Flow.Subscriber provided by another reactive streams implementation, and
> > pass it on to the HttpClient API. The reason for this is that the
> > HttpClient API doesn't accept Flow.Publisher/Flow.Subscriber, rather it
> > extends them in HttpRequest.BodyPublisher and
> > HttpResponse.BodySubscriber, and then requires the user to return
> > instances of those sub interfaces from their BodyHandlers. ...
>
> Great point. I think we can address this with straight forward adapters.
> For example:
>
> public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {
>
> /**
> * Returns a request body publisher whose body is retrieved from the
> * given {@code Flow.Publisher}. The returned request body publisher
> * has an unknown content length.
> *
> * @apiNote This method can be used as an adapter between {@code
> * BodyPublisher} and {@code Flow.Publisher}.
> *
> * @param publisher the publisher responsible for publishing the body
> * @return a BodyPublisher
> */
> static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer>
> publisher) {
> ...
> }
>
> ...
>
> public BodySubscriber<T> apply(int statusCode, HttpHeaders
> responseHeaders);
>
> /**
> * Returns a response body handler that returns a {@link
> BodySubscriber
> * BodySubscriber}{@code <Void>} obtained from {@link
> * BodySubscriber#fromSubscriber(Subscriber)}.
> *
> * @apiNote This method can be used as an adapter between {@code
> * BodySubscriber} and {@code Flow.Subscriber}.
> *
> * <p> For example:
> * <pre> {@code
> * TextSubscriber subscriber = ...; // accumulates bytes and
> transforms them into a String.
> * Supplier<String> result = subscriber::getTextResult;
> *
> * CompletableFuture<String> cf = client
> * .sendAsync(request, BodyHandler.fromSubscriber(sub
> scriber))
> * .thenApply((response -> result.get()));
> * String text = cf.join();
> * }</pre>
> *
> * @param subscriber the subscriber
> * @return a response body handler
> */
> public static BodyHandler<Void> fromSubscriber(Subscriber<? super
> List<ByteBuffer>> subscriber) {
> ...
> }
>
> // Add an equivalent BodySubscriber ...
>
>
> This would allow the API to retain its Flow specific types ( that add
> additional HTTP specific and API behavior ), while interacting, without
> much fuss, with regular Publishers and Subscribers.
>
> -Chris.
>
--
*James Roper*
*Senior Octonaut*
Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.openjdk.java.net/pipermail/net-dev/attachments/20171211/225f7298/attachment.html>
More information about the net-dev
mailing list