RFR: 8350451: HttpClient should not wait until the request body has been fully sent before reading the response headers
Daniel Fuchs
dfuchs at openjdk.org
Fri Feb 21 13:05:57 UTC 2025
On Fri, 21 Feb 2025 11:20:09 GMT, Volkan Yazici <vyazici at openjdk.org> wrote:
>> Hi - Please find here a PR that improves streaming strategy in the HttpClient.
>>
>> The HttpClient currently waits until the full request body has been sent before starting to listen for a response. This is not optimal, in particular in cases where the server sends back e.g. 500 without even reading the body. It also prevents starting to stream the response body from the server before having sent the full request body, which prevents the server to stream back the request body to the client without reading it fully first.
>>
>> While writing a test to verify the fix, I also noticed a few places where additional tasks needed to be performed asynchronously (= delegated to the executor) to support this.
>
> src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java line 428:
>
>> 426: var sendBody = exchImpl.sendBodyAsync();
>> 427: CompletableFuture<Response> cf = exchImpl.getResponseAsync(executor);
>> 428: sendBody.exceptionally((t) -> {
>
> Why don't we mirror this as in `cf.exceptionally(t -> ...)`? That is, what if receive-response-body fails while send-request-body is in flight? (Thinking of push promises or alike early responses that server delivers without fully consuming the request.)
If we get an error while receiving the response headers or the response body the exchange will be cancelled (with cancelImpl) which will stop/cancel the sending of the request body.
> src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java line 406:
>
>> 404: .thenAccept(this::cancelIfFailed)
>> 405: .thenAcceptAsync((s) -> requestMoreBody(),
>> 406: exchange.executor().safeDelegate());
>
> Shall we document the need to switch to this particular executor? (I can guess it from the context of the PR, yet it is not apparent to me in isolation.)
If we don't spawn a new task for executing requestMoreBody() here the PostFromGet test will get wedged. The regular executor attempts to execute as much tasks as it can in the current thread to minimize context switches. This usually works well - but we're doing something a bit unusual in the PostFromGet test. We're using InputStream for reading and posting, which means we are risking to block within a reactive stream pipeline. Spawning a new task here ensure we don't block in the pipeline.
> src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java line 703:
>
>> 701: bodySentCF.completeAsync(() -> this, exec);
>> 702: } else {
>> 703: exec.ensureExecutedAsync(this::requestMoreBody);
>
> This means that _"the immediate run shortcut for non-selector threads"_ facilitated by `DelegatingExecutor::execute` will be skipped. Why?
Same answer as above. Our subscribers/publishers used in the PostFromGet test are based on InputStream which is blocking. If we execute request in the current thread it risk executing the synchronous scheduler loop in the current thread, which might block due to the nature of the subscribers/publishers. What we're doing here is just ensuring that we are not going to block from within the pipeline.
> src/java.net.http/share/classes/jdk/internal/net/http/Stream.java line 1113:
>
>> 1111: // decremented on the HttpClient.
>> 1112: cancelImpl(ex);
>> 1113: }
>
> Curious: This _fix_ is not specific to this PR, that is, this should have been fixed even in the absence of the feature this PR is delivering, right?
kind of. Now we're calling request in a separate thread, so if it throws (which it must not but still might do since it's potentially implemented by user code), then the excepion won't get caught by the caller. So the fact that we execute request() in a separate thread means we need to protect here against stray exception it might throw.
> src/java.net.http/share/classes/jdk/internal/net/http/Stream.java line 1124:
>
>> 1122: if (debug.on())
>> 1123: debug.log("RequestSubscriber: onSubscribe, request 1");
>> 1124: exchange.executor().safeDelegate().execute(this::tryRequestMore);
>
> I'd appreciate it if we can document the need to execute this asynchronously using this particular executor – while it is not ultimate measure, nevertheless, it is not apparent to me. 🙈
PostFromGet will/might get wedged if we execute inline at this point. See my previous explanations.
-------------
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965417642
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965430555
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965435443
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965439082
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965441720
More information about the net-dev
mailing list