SubmissionPublisher - Subscriber#onComplete() not invoked when publisher is closed

Pavel Bucek pavel.bucek at oracle.com
Tue Feb 21 11:32:55 UTC 2017


SubmissionPublisher#closeExceptionally does trigger Subscriber#onError, 
but based on javadoc, I cannot really be sure that it will be called, 
since it contains exactly the same wording as SubmissionPublisher#close

/** * Unless already closed, issues {@link * 
Flow.Subscriber#onError(Throwable) onError} signals to current * 
subscribers with the given error, and disallows subsequent * attempts to 
publish. Future subscribers also receive the given * error. Upon return, 
this method does <em>NOT</em>guarantee * that all subscribers have yet 
completed. * * @param error the {@code onError} argument sent to 
subscribers * @throws NullPointerException if error is null */

So, Pavel, if that is not a bug, how can the SubmissionPublisher be 
closed in a way that subscribers are notified?

Thanks for the link to the other mailing list - do I understand it 
correctly that I should move this thread there?

Thanks and regards,
Pavel


On 21/02/2017 12:15, Pavel Rappo wrote:
> I believe, the most appropriate place for concurrency-related questions is
>
>      http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
> As for the question itself. I don't think this behaviour is a bug.
> SubmissionPublisher.close() seems to be a graceful way of shutting down (in
> contrast with SubmissionPublisher.closeExceptionally()), akin to putting a EOF
> on an input stream.
>
> My reading of the javadoc is that after SubmissionPublisher.close has been
> invoked, the publisher will no longer accept any attempts to publish items and
> will call Subscriber.onClose() *eventually*.
>
>> On 21 Feb 2017, at 09:24, Pavel Bucek <pavel.bucek at oracle.com> wrote:
>>
>> there is a formatting issue in the code snippet, publisher.close() should be on the new line:
>>
>> {
>>     SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>>     publisher.subscribe(new Flow.Subscriber<String>() {
>>         @Override public void onSubscribe(Flow.Subscription subscription) { }
>>
>>         @Override public void onNext(String item) { }
>>
>>         @Override public void onError(Throwable throwable) {
>>             System.out.println("onError()");
>>         }
>>
>>         @Override public void onComplete() {
>>             System.out.println("onComplete()");
>>         }
>>     });
>>     publisher.submit("item");// if this is commented out, #onComplete is invoked.
>>
>>     publisher.close();
>> }
>>
>>
>> On 21/02/2017 10:16, Pavel Bucek wrote:
>>> Hi all,
>>>
>>> firstly - please let me know if this is is a wrong place to send this; I wasn't able to find list specific to concurrency.
>>>
>>> Consider following example:
>>>
>>> {
>>>     SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>>>     publisher.subscribe(new Flow.Subscriber<String>() {
>>>         @Override public void onSubscribe(Flow.Subscription subscription) { }
>>>
>>>         @Override public void onNext(String item) { }
>>>
>>>         @Override public void onError(Throwable throwable) {
>>>             System.out.println("onError()");
>>>         }
>>>
>>>         @Override public void onComplete() {
>>>             System.out.println("onComplete()");
>>>         }
>>>     });
>>>     publisher.submit("item");// if this is commented out, #onComplete is invoked. publisher.close();
>>> }
>>>
>>> I'd expect that Subscriber#onComplete is invoked after calling publisher.close(), but it is not happening. Curiously, when I comment out 'publisher.submit("item")', Subscriber#onComplete is indeed invoked.
>>>
>>> SubmissionPublisher#close() javadoc says:
>>>
>>> /** * Unless already closed, issues {@link * Flow.Subscriber#onComplete() onComplete} signals to current * subscribers, and disallows subsequent attempts to publish. * Upon return, this method does <em>NOT</em>guarantee that all * subscribers have yet completed. */
>>>
>>> So it seems like it will be invoked in different thread or something like that, but it is not invoked ever (or more precisely - not during 10 second after the publisher is closed. There is nothing else running on that particular jvm instance).
>>>
>>> Also, publisher#isClosed() returns true and publisher#getNumberOfSubscribers() returns 0.
>>>
>>> I'm using Java(TM) SE Runtime Environment (build 9-ea+157-jigsaw-nightly-h6115-20170219)
>>>
>>> What am I doing wrong?
>>>
>>> Thanks and regards, Pavel
>>>



More information about the core-libs-dev mailing list