RFR: 8294916: Cancelling a request must eventually cause its response body subscriber to be unregistered

Roy duke at openjdk.org
Tue Oct 11 20:30:03 UTC 2022


On Tue, 11 Oct 2022 15:49:14 GMT, Daniel Fuchs <dfuchs at openjdk.org> wrote:

> When [JDK-8277969](https://bugs.openjdk.org/browse/JDK-8277969) was implemented, a list of outstanding response subscribers was added to `HttpClientImpl`. A body subscriber is added to the list after being created and is removed from the list when it is completed, either successfully or exceptionally.
> 
> It appears that in the case where the subscription is cancelled before the subscriber is completed, the subscriber might remain registered in the list forever, or at least until the HttpClient gets garbage collected. This can be easily reproduced using streaming subscribers, such as BodySubscriber::ofInputStream. In the case where the input stream is closed without having read all the bytes, Subscription::cancel will be called. Whether the subscriber gets unregistered or not at that point becomes racy.
> 
> Indeed, the reactive stream specification doesn't guarantee whether onComplete or onError will be called or not after a subscriber cancels its subscription. Any cleanup that would have been performed by onComplete/onError might therefore need to be performed when the subscription is cancelled too.

@dfuch where is the part that removes the subscriber in case `HttpResponse.BodyHandlers#ofInputStream` is used and the `inputStream` closed before the response body is received?

i.e. I expect that even if I use `HttpResponse.BodyHandlers#ofInputStream` with `HttpClient#send` in `while(true)` loop it will retain only 1 subscriber at all times.

Example code for clarification (not complete code):

private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();

public static void main(String[] args) throws IOException, InterruptedException {
	final URI uri = URI.create("https://www.oref.org.il/WarningMessages/alert/alerts.json");
	final int minRedAlertEventContentLength = """
			{"cat":"1","data":[],"desc":"","id":0,"title":""}""".getBytes(StandardCharsets.UTF_8).length;

	while (true) {
		final HttpResponse<InputStream> httpResponse = HTTP_CLIENT.send(
				HttpRequest.newBuilder(uri)
						.header("Accept", "application/json")
						.header("X-Requested-With", "XMLHttpRequest")
						.header("Referer", "https://www.oref.org.il/12481-he/Pakar.aspx")
						.timeout(Duration.ofSeconds(10))
						.build(),
				HttpResponse.BodyHandlers.ofInputStream()
		);

		try (InputStream httpResponseBody = httpResponse.body()) {
			if (httpResponse.statusCode() != HttpURLConnection.HTTP_OK) {
				sleep();
				continue;
			}
			if (httpResponse.headers().firstValueAsLong("Content-Length").orElse(-1) > minRedAlertEventContentLength)
				System.out.println(JSON_MAPPER.readValue(
						httpResponseBody,
						RedAlertEvent.class
				));
		}
	}
}

private record RedAlertEvent(
		int cat,
		List<String> data,
		String desc,
		long id,
		String title
) {
}

-------------

PR: https://git.openjdk.org/jdk/pull/10659


More information about the net-dev mailing list