Reactive streams and aggregate reads and writes

Douglas Surber douglas.surber at oracle.com
Wed Jan 16 16:55:41 UTC 2019


Inspired by Viktor's comments and the code David provided.

I'm very unsure whether subsequent Operations should change the sequence of bytes published.

Douglas

  /**
   * Return an Operation the result of which will publish the bytes of this Blob
   * in order from the position at the time the Operation is completed.
   * Subsequent Operations on this Blob will not change the sequence of bytes
   * published. Any bytes between the position and the limit when the call to
   * onNext returns will not be between the position and the limit in the
   * ByteBuffer passed to the next call. The Publisher obtains the ByteBuffers
   * to pass to Subscriber.onNext from {@code source}.
   *
   * @param source the supplier of ByteBuffers to pass to onNext.
   * @return a Publisher for the bytes of this Blob
   */
  public Operation<Flow.Publisher<ByteBuffer>> publisherOperation(Supplier<ByteBuffer> source);

  /**
   * Return an Operation the result of which is a Subscriber which will insert
   * the bytes published to it into this Blob at the position at the time the
   * Operation is completed. Subsequent Operations on this Blob will not change
   * the position at which subsequent published bytes are inserted. After the
   * bytes have been read from the ByteBuffer, that buffer is passed to
   * {@code releaser} and no longer retained.
   *
   * @param releaser called when the ByteBuffers passed to onNext are no longer
   * needed
   * @return a Subscriber to the bytes of this Blob
   */
  public Operation<Flow.Subscriber<ByteBuffer>> subscriberOperation(Consumer<ByteBuffer> releaser);

  /**
   * Return the default ByteBuffer supplier for the Publishers of this Blob's
   * bytes. If {@code strict} is true, the defaultByteBufferSupplier and
   * defaultByteBufferReleaser of a single Blob must be used as a pair and only
   * for reading/writing bytes from/to that Blob. Failure to do so may lead to
   * unexpected behavior including corrupt data and RuntimeErrors. Even if
   * strict is false it is best practice to conform to the strict requirement
   * though failure to do so is not erroneous.
   * 
   * Multiple Publishers of bytes to/from this Blob may use the same supplier.
   *
   * @param strict if true the caller must conform to the strict use requirements
   * described above.
   * @return a source of ByteBuffers
   */
  public Supplier<ByteBuffer> defaultByteBufferSupplier(boolean strict);

  /**
   * Return the default releaser for ByteBuffers for Subscribers of this Blob's
   * bytes. If this Blob's defaultByteBufferSupplier(true) was called, the
   * consumer created by calling this method must be used strictly to release
   * ByteBuffers passed to the onNext by a publisher Operation created by this
   * Blob. Failure to do so may lead to unexpected behavior including corrupt
   * data and RuntimeErrors. Even if strict is false it is best practice to
   * conform to the strict requirement though failure to do so is not erroneous.
   * 
   * Multiple Subscribers to bytes from/to this Blob may use the same releaser.
   *
   * @return a releaser for ByteBuffers
   */
  public Consumer<ByteBuffer> defaultByteBufferReleaser();

  /**
   * Convenience method that creates a Publisher of the bytes of this Blob. If
   * {@code strict} is true, a Subscriber to the created Publisher must pass all
   * ByteBuffers passed to onNext to the defaultByteBufferReleaser. Failure to
   * do so may lead to unexpected behavior including corrupt data and
   * RuntimeErrors. Even if strict is false it is best practice to conform to
   * the strict requirement though failure to do so is not erroneous.
   *
   * @param strict if true the caller must conform to the strict use requirements
   * described above.
   * @return a Publisher of the bytes of this Blob
   */
  public default CompletionStage<Flow.Publisher<ByteBuffer>> publisher(boolean strict) {
    return publisherOperation(defaultByteBufferSupplier(strict)).submit().getCompletionStage();
  }

  /**
   * Convenience method that creates a Publisher of the bytes of this Blob.
   * 
   * @return a Publisher of the bytes of this Blob
   */
  public default CompletionStage<Flow.Publisher<ByteBuffer>> publisher() {
    return publisherOperation(defaultByteBufferSupplier(false)).submit().getCompletionStage();
  }

  /**
   * Convenience method that creates a Subscriber for bytes in this Blob.
   *
   * @return a Subscriber for inserting bytes into this Blob.
   */
  public default CompletionStage<Flow.Subscriber<ByteBuffer>> subscriber() {
    return subscriberOperation(defaultByteBufferReleaser()).submit().getCompletionStage();



More information about the jdbc-spec-discuss mailing list