SubmissionPublisher - Subscriber#onComplete() not invoked when publisher is closed

Pavel Rappo pavel.rappo at oracle.com
Tue Feb 21 11:15:15 UTC 2017


I believe, the most appropriate place for concurrency-related questions is

    http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest

As for the question itself. I don't think this behaviour is a bug.
SubmissionPublisher.close() seems to be a graceful way of shutting down (in
contrast with SubmissionPublisher.closeExceptionally()), akin to putting a EOF
on an input stream.

My reading of the javadoc is that after SubmissionPublisher.close has been
invoked, the publisher will no longer accept any attempts to publish items and
will call Subscriber.onClose() *eventually*.

> On 21 Feb 2017, at 09:24, Pavel Bucek <pavel.bucek at oracle.com> wrote:
> 
> there is a formatting issue in the code snippet, publisher.close() should be on the new line:
> 
> {
>    SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>    publisher.subscribe(new Flow.Subscriber<String>() {
>        @Override public void onSubscribe(Flow.Subscription subscription) { }
> 
>        @Override public void onNext(String item) { }
> 
>        @Override public void onError(Throwable throwable) {
>            System.out.println("onError()");
>        }
> 
>        @Override public void onComplete() {
>            System.out.println("onComplete()");
>        }
>    });
>    publisher.submit("item");// if this is commented out, #onComplete is invoked.
> 
>    publisher.close();
> }
> 
> 
> On 21/02/2017 10:16, Pavel Bucek wrote:
>> Hi all,
>> 
>> firstly - please let me know if this is is a wrong place to send this; I wasn't able to find list specific to concurrency.
>> 
>> Consider following example:
>> 
>> {
>>    SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>>    publisher.subscribe(new Flow.Subscriber<String>() {
>>        @Override public void onSubscribe(Flow.Subscription subscription) { }
>> 
>>        @Override public void onNext(String item) { }
>> 
>>        @Override public void onError(Throwable throwable) {
>>            System.out.println("onError()");
>>        }
>> 
>>        @Override public void onComplete() {
>>            System.out.println("onComplete()");
>>        }
>>    });
>>    publisher.submit("item");// if this is commented out, #onComplete is invoked. publisher.close();
>> }
>> 
>> I'd expect that Subscriber#onComplete is invoked after calling publisher.close(), but it is not happening. Curiously, when I comment out 'publisher.submit("item")', Subscriber#onComplete is indeed invoked.
>> 
>> SubmissionPublisher#close() javadoc says:
>> 
>> /** * Unless already closed, issues {@link * Flow.Subscriber#onComplete() onComplete} signals to current * subscribers, and disallows subsequent attempts to publish. * Upon return, this method does <em>NOT</em>guarantee that all * subscribers have yet completed. */
>> 
>> So it seems like it will be invoked in different thread or something like that, but it is not invoked ever (or more precisely - not during 10 second after the publisher is closed. There is nothing else running on that particular jvm instance).
>> 
>> Also, publisher#isClosed() returns true and publisher#getNumberOfSubscribers() returns 0.
>> 
>> I'm using Java(TM) SE Runtime Environment (build 9-ea+157-jigsaw-nightly-h6115-20170219)
>> 
>> What am I doing wrong?
>> 
>> Thanks and regards, Pavel
>> 
> 



More information about the core-libs-dev mailing list