SubmissionPublisher - Subscriber#onComplete() not invoked when publisher is closed

Pavel Bucek pavel.bucek at oracle.com
Tue Feb 21 09:24:32 UTC 2017


there is a formatting issue in the code snippet, publisher.close() 
should be on the new line:

{
     SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
     publisher.subscribe(new Flow.Subscriber<String>() {
         @Override public void onSubscribe(Flow.Subscription 
subscription) { }

         @Override public void onNext(String item) { }

         @Override public void onError(Throwable throwable) {
             System.out.println("onError()");
         }

         @Override public void onComplete() {
             System.out.println("onComplete()");
         }
     });
     publisher.submit("item");// if this is commented out, #onComplete 
is invoked.

     publisher.close();
}


On 21/02/2017 10:16, Pavel Bucek wrote:
> Hi all,
>
> firstly - please let me know if this is is a wrong place to send this; 
> I wasn't able to find list specific to concurrency.
>
> Consider following example:
>
> {
>     SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
>     publisher.subscribe(new Flow.Subscriber<String>() {
>         @Override public void onSubscribe(Flow.Subscription 
> subscription) { }
>
>         @Override public void onNext(String item) { }
>
>         @Override public void onError(Throwable throwable) {
>             System.out.println("onError()");
>         }
>
>         @Override public void onComplete() {
>             System.out.println("onComplete()");
>         }
>     });
>     publisher.submit("item");// if this is commented out, #onComplete 
> is invoked. publisher.close();
> }
>
> I'd expect that Subscriber#onComplete is invoked after calling 
> publisher.close(), but it is not happening. Curiously, when I comment 
> out 'publisher.submit("item")', Subscriber#onComplete is indeed invoked.
>
> SubmissionPublisher#close() javadoc says:
>
> /** * Unless already closed, issues {@link * 
> Flow.Subscriber#onComplete() onComplete} signals to current * 
> subscribers, and disallows subsequent attempts to publish. * Upon 
> return, this method does <em>NOT</em>guarantee that all * subscribers 
> have yet completed. */
>
> So it seems like it will be invoked in different thread or something 
> like that, but it is not invoked ever (or more precisely - not during 
> 10 second after the publisher is closed. There is nothing else running 
> on that particular jvm instance).
>
> Also, publisher#isClosed() returns true and 
> publisher#getNumberOfSubscribers() returns 0.
>
> I'm using Java(TM) SE Runtime Environment (build 
> 9-ea+157-jigsaw-nightly-h6115-20170219)
>
> What am I doing wrong?
>
> Thanks and regards, Pavel
>



More information about the core-libs-dev mailing list