SubmissionPublisher - Subscriber#onComplete() not invoked when publisher is closed
Michael McMahon
michael.x.mcmahon at oracle.com
Tue Feb 21 12:04:11 UTC 2017
Maybe the spec could be tighter around this, but it's not unreasonable
that there is a
delay in receiving onComplete() notification because of the subscriber
controlled flow control.
Notifying onError() is not subject to flow control; so you might expect
that it would be triggered immediately.
Michael.
On 21/02/2017, 11:32, Pavel Bucek wrote:
> SubmissionPublisher#closeExceptionally does trigger
> Subscriber#onError, but based on javadoc, I cannot really be sure that
> it will be called, since it contains exactly the same wording as
> SubmissionPublisher#close
>
> /** * Unless already closed, issues {@link *
> Flow.Subscriber#onError(Throwable) onError} signals to current *
> subscribers with the given error, and disallows subsequent * attempts
> to publish. Future subscribers also receive the given * error. Upon
> return, this method does <em>NOT</em>guarantee * that all subscribers
> have yet completed. * * @param error the {@code onError} argument sent
> to subscribers * @throws NullPointerException if error is null */
>
> So, Pavel, if that is not a bug, how can the SubmissionPublisher be
> closed in a way that subscribers are notified?
>
> Thanks for the link to the other mailing list - do I understand it
> correctly that I should move this thread there?
>
> Thanks and regards,
> Pavel
>
>
> On 21/02/2017 12:15, Pavel Rappo wrote:
>> I believe, the most appropriate place for concurrency-related
>> questions is
>>
>> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>> As for the question itself. I don't think this behaviour is a bug.
>> SubmissionPublisher.close() seems to be a graceful way of shutting
>> down (in
>> contrast with SubmissionPublisher.closeExceptionally()), akin to
>> putting a EOF
>> on an input stream.
>>
>> My reading of the javadoc is that after SubmissionPublisher.close has
>> been
>> invoked, the publisher will no longer accept any attempts to publish
>> items and
>> will call Subscriber.onClose() *eventually*.
>>
>>> On 21 Feb 2017, at 09:24, Pavel Bucek <pavel.bucek at oracle.com> wrote:
>>>
>>> there is a formatting issue in the code snippet, publisher.close()
>>> should be on the new line:
>>>
>>> {
>>> SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>>> publisher.subscribe(new Flow.Subscriber<String>() {
>>> @Override public void onSubscribe(Flow.Subscription
>>> subscription) { }
>>>
>>> @Override public void onNext(String item) { }
>>>
>>> @Override public void onError(Throwable throwable) {
>>> System.out.println("onError()");
>>> }
>>>
>>> @Override public void onComplete() {
>>> System.out.println("onComplete()");
>>> }
>>> });
>>> publisher.submit("item");// if this is commented out,
>>> #onComplete is invoked.
>>>
>>> publisher.close();
>>> }
>>>
>>>
>>> On 21/02/2017 10:16, Pavel Bucek wrote:
>>>> Hi all,
>>>>
>>>> firstly - please let me know if this is is a wrong place to send
>>>> this; I wasn't able to find list specific to concurrency.
>>>>
>>>> Consider following example:
>>>>
>>>> {
>>>> SubmissionPublisher<String> publisher =new
>>>> SubmissionPublisher<>();
>>>> publisher.subscribe(new Flow.Subscriber<String>() {
>>>> @Override public void onSubscribe(Flow.Subscription
>>>> subscription) { }
>>>>
>>>> @Override public void onNext(String item) { }
>>>>
>>>> @Override public void onError(Throwable throwable) {
>>>> System.out.println("onError()");
>>>> }
>>>>
>>>> @Override public void onComplete() {
>>>> System.out.println("onComplete()");
>>>> }
>>>> });
>>>> publisher.submit("item");// if this is commented out,
>>>> #onComplete is invoked. publisher.close();
>>>> }
>>>>
>>>> I'd expect that Subscriber#onComplete is invoked after calling
>>>> publisher.close(), but it is not happening. Curiously, when I
>>>> comment out 'publisher.submit("item")', Subscriber#onComplete is
>>>> indeed invoked.
>>>>
>>>> SubmissionPublisher#close() javadoc says:
>>>>
>>>> /** * Unless already closed, issues {@link *
>>>> Flow.Subscriber#onComplete() onComplete} signals to current *
>>>> subscribers, and disallows subsequent attempts to publish. * Upon
>>>> return, this method does <em>NOT</em>guarantee that all *
>>>> subscribers have yet completed. */
>>>>
>>>> So it seems like it will be invoked in different thread or
>>>> something like that, but it is not invoked ever (or more precisely
>>>> - not during 10 second after the publisher is closed. There is
>>>> nothing else running on that particular jvm instance).
>>>>
>>>> Also, publisher#isClosed() returns true and
>>>> publisher#getNumberOfSubscribers() returns 0.
>>>>
>>>> I'm using Java(TM) SE Runtime Environment (build
>>>> 9-ea+157-jigsaw-nightly-h6115-20170219)
>>>>
>>>> What am I doing wrong?
>>>>
>>>> Thanks and regards, Pavel
>>>>
>
More information about the core-libs-dev
mailing list