SubmissionPublisher - Subscriber#onComplete() not invoked when publisher is closed

Michael McMahon michael.x.mcmahon at oracle.com
Tue Feb 21 12:04:11 UTC 2017


Maybe the spec could be tighter around this, but it's not unreasonable 
that there is a
delay in receiving onComplete() notification because of the subscriber 
controlled flow control.
Notifying onError() is not subject to flow control; so you might expect 
that it would be triggered immediately.

Michael.

On 21/02/2017, 11:32, Pavel Bucek wrote:
> SubmissionPublisher#closeExceptionally does trigger 
> Subscriber#onError, but based on javadoc, I cannot really be sure that 
> it will be called, since it contains exactly the same wording as 
> SubmissionPublisher#close
>
> /** * Unless already closed, issues {@link * 
> Flow.Subscriber#onError(Throwable) onError} signals to current * 
> subscribers with the given error, and disallows subsequent * attempts 
> to publish. Future subscribers also receive the given * error. Upon 
> return, this method does <em>NOT</em>guarantee * that all subscribers 
> have yet completed. * * @param error the {@code onError} argument sent 
> to subscribers * @throws NullPointerException if error is null */
>
> So, Pavel, if that is not a bug, how can the SubmissionPublisher be 
> closed in a way that subscribers are notified?
>
> Thanks for the link to the other mailing list - do I understand it 
> correctly that I should move this thread there?
>
> Thanks and regards,
> Pavel
>
>
> On 21/02/2017 12:15, Pavel Rappo wrote:
>> I believe, the most appropriate place for concurrency-related 
>> questions is
>>
>>      http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>> As for the question itself. I don't think this behaviour is a bug.
>> SubmissionPublisher.close() seems to be a graceful way of shutting 
>> down (in
>> contrast with SubmissionPublisher.closeExceptionally()), akin to 
>> putting a EOF
>> on an input stream.
>>
>> My reading of the javadoc is that after SubmissionPublisher.close has 
>> been
>> invoked, the publisher will no longer accept any attempts to publish 
>> items and
>> will call Subscriber.onClose() *eventually*.
>>
>>> On 21 Feb 2017, at 09:24, Pavel Bucek <pavel.bucek at oracle.com> wrote:
>>>
>>> there is a formatting issue in the code snippet, publisher.close() 
>>> should be on the new line:
>>>
>>> {
>>>     SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>>>     publisher.subscribe(new Flow.Subscriber<String>() {
>>>         @Override public void onSubscribe(Flow.Subscription 
>>> subscription) { }
>>>
>>>         @Override public void onNext(String item) { }
>>>
>>>         @Override public void onError(Throwable throwable) {
>>>             System.out.println("onError()");
>>>         }
>>>
>>>         @Override public void onComplete() {
>>>             System.out.println("onComplete()");
>>>         }
>>>     });
>>>     publisher.submit("item");// if this is commented out, 
>>> #onComplete is invoked.
>>>
>>>     publisher.close();
>>> }
>>>
>>>
>>> On 21/02/2017 10:16, Pavel Bucek wrote:
>>>> Hi all,
>>>>
>>>> firstly - please let me know if this is is a wrong place to send 
>>>> this; I wasn't able to find list specific to concurrency.
>>>>
>>>> Consider following example:
>>>>
>>>> {
>>>>     SubmissionPublisher<String> publisher =new 
>>>> SubmissionPublisher<>();
>>>>     publisher.subscribe(new Flow.Subscriber<String>() {
>>>>         @Override public void onSubscribe(Flow.Subscription 
>>>> subscription) { }
>>>>
>>>>         @Override public void onNext(String item) { }
>>>>
>>>>         @Override public void onError(Throwable throwable) {
>>>>             System.out.println("onError()");
>>>>         }
>>>>
>>>>         @Override public void onComplete() {
>>>>             System.out.println("onComplete()");
>>>>         }
>>>>     });
>>>>     publisher.submit("item");// if this is commented out, 
>>>> #onComplete is invoked. publisher.close();
>>>> }
>>>>
>>>> I'd expect that Subscriber#onComplete is invoked after calling 
>>>> publisher.close(), but it is not happening. Curiously, when I 
>>>> comment out 'publisher.submit("item")', Subscriber#onComplete is 
>>>> indeed invoked.
>>>>
>>>> SubmissionPublisher#close() javadoc says:
>>>>
>>>> /** * Unless already closed, issues {@link * 
>>>> Flow.Subscriber#onComplete() onComplete} signals to current * 
>>>> subscribers, and disallows subsequent attempts to publish. * Upon 
>>>> return, this method does <em>NOT</em>guarantee that all * 
>>>> subscribers have yet completed. */
>>>>
>>>> So it seems like it will be invoked in different thread or 
>>>> something like that, but it is not invoked ever (or more precisely 
>>>> - not during 10 second after the publisher is closed. There is 
>>>> nothing else running on that particular jvm instance).
>>>>
>>>> Also, publisher#isClosed() returns true and 
>>>> publisher#getNumberOfSubscribers() returns 0.
>>>>
>>>> I'm using Java(TM) SE Runtime Environment (build 
>>>> 9-ea+157-jigsaw-nightly-h6115-20170219)
>>>>
>>>> What am I doing wrong?
>>>>
>>>> Thanks and regards, Pavel
>>>>
>


More information about the core-libs-dev mailing list