RFR: 8299338: AssertionError in ResponseSubscribers$HttpResponseInputStream::onSubscribe [v2]

Daniel Fuchs dfuchs at openjdk.org
Tue Feb 21 16:36:28 UTC 2023


On Tue, 21 Feb 2023 16:28:52 GMT, Daniel Fuchs <dfuchs at openjdk.org> wrote:

>> This fix revisits an assertion that has been observed failing in ResponseSubscribers.HttpResponseInputStream.
>> 
>> The HttpResponseInputStream has the logic to wait until a buffer has been taken out of the queue before requesting a new one. Therefore there should at most be one byte buffer in the queue, except in the case of error (or asychronous close), where a LAST_LIST sentinel is inserted in the queue to unblock the consumer of the input stream, which might be blocked in queue.take().
>> 
>> The  HttpResponseInputStream thus asserts, when processing a subscription, that the remaining capacity of the queue should be greater than 1 (unless already closed), to ensure that there will be room for the LAST_LIST sentinel. However, in case of asynchronous shutdown of the executor, it's possible that the subscriber will be marked failed and the LAST_LIST sentinel inserted into the queue before/at the same time that the subscription is processed.
>> 
>> This fix proposes to relax the assertion to only fire if closed == false and failed == null and capacity <= 1 when processing the subscription
>
> Daniel Fuchs has updated the pull request incrementally with one additional commit since the last revision:
> 
>   Call tryRegister() before markSubscribed()

@jaikiran made me realise that there is a code path where `register()` might end up in a call to `HttpSubscriberWrapper.onError()`, after the subscriber was `markSubscribed()` but before `onSubscribe()` was called on the wrapped subscriber. This results in having `onError()` called on the wrapped subscriber before  `onSubscribe()`, which is the very thing that the `HttpSubscriberWrapper` class is trying to prevent. 

I have slightly modified `HttpSubscriberWrapper::onSubscribe` to cater for this, by calling `tryRegister` before calling `markSusbcribed`, but still within the `subscriptionLock`. Now if `register()` ends up calling `onError`, the subscriber will not be marked subscribed yet, causing `propagateError` to behave correctly. The real subscription will then be cancelled, which is what we want.

Thanks for the good analysis @jaikiran!

-------------

PR: https://git.openjdk.org/jdk/pull/12677


More information about the net-dev mailing list