Example of a Flow-based JDBC API
Douglas Surber
douglas.surber at oracle.com
Fri Oct 13 15:03:00 UTC 2017
First observation, your sample code imports and uses extensively a library that does not exist in Java SE. This is contrary to one of the non-negotiable goals of this project, to integrate with Java SE.
Douglas
> On Oct 13, 2017, at 5:05 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
>
> Hi again,
>
> I've tried to come up with an API for a reactive JDBC driver ended up with
> these of interfaces:
>
> https://github.com/akarnokd/akarnokd-misc-java9/tree/master/src/main/java/hu/akarnokd/java9/jdbc
>
> The entry point is JdbcConnectionSource which contains the connection
> information. Its connect() method returns a Flow.Publisher<JdbcConnection>
> where the JdbcConnection becomes available once the async connection was
> successful.
>
> The JdbcConnection allows creating the connection specific JdbcOperations
> and there is only one way to present them to the connection: execute()
> which takes a Flow.Publisher of JdbcOperations and returns a Flow.Publisher
> of JdbcStatements representing the execution of those operations. One
> Flow.Publisher<JdbcOperation> is considered as a group. The various
> execution modes (sequential, parallel, in transaction) are to be expressed
> by specific JdbcOperation instances included in the operations
> Flow.Publisher. Too many calls to execute() may be rejected by the driver.
> Any other non-grouped or direct operation submission can be implemented on
> top of execute(). I'm not certain the complication of operation (group)
> graphs should be part of the API - such thing could be built on top of just
> execute by controlling the consumption of the Flow.Publisher<JdbcOperation>
> and intercepting events through the resulting
> Flow.Publisher<JdbcStatement>s.
>
> Building a JdbcOperation should be straightforward, but there are two
> additional properties:
>
> - a JdbcOperation could be marked deferred so its execution doesn't begin
> unless there is a consumer for its result.
> - some properties may be set from a Flow of bytes, either in the form of
> byte arrays of in the form of ByteBuffers that are NIO friendly.
>
> An open question is how to create multi-row operations, such as batch
> inserts represented by a single JdbcOperation. If individual JdbcOperations
> are cheap to create, the driver could batch up subsequent JdbcOperations
> with the same structure on its own. Alternatively, a
> JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
> JdbcOperations.Builder>>) method could consume a sequence of Ts and have
> each of them set parameters on a per-item provided JdbcOperation.Builder.
>
> For each valid JdbcOperation, a JdbcStatement is created. The only way to
> consume any result from an operation is through an user-provided row to T
> transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).
>
> The function is called when each row of data is ready to be consumed
> non-blockingly. POJO or ORM-based mapping could be implemented on top of
> this.
>
> Again, there could be BLOB-like columns which may or may not be reasonable
> to consume via get() can be consumed by getBytes() as byte arrays,
> getBuffer() as a sequence of ByteBuffers and one overload where the user
> can specify a supplier for custom ByteBuffers. These are the dual of the
> JdbcOperation.parameterBuffer() methods which should also help playing nice
> with NIO.
>
> Here is an example program that would consume such API via the help of the
> Reactive4JavaFlow library that supports the Flow API directly:
>
> import hu.akarnokd.reactive4javaflow.*;
>
> public class JdbcExample {
>
> static JdbcConnectionSource connectionSource() {
> return null;
> }
>
> public static void main(String[] args) {
> Esetleg.fromPublisher(connectionSource().connect())
> .observeOn(SchedulerServices.computation())
> .flatMapPublisher(conn -> {
> return Folyam.fromPublisher(conn.execute(
> Esetleg.fromCallable(() -> {
> return conn.query()
> .query("SELECT :v FROM DUAL")
> .parameter("v", JdbcDataType.INT, 1)
> .build();
> })
> ))
> .flatMap(stmt -> {
> return Folyam.fromPublisher(
> stmt.results(row -> row.get("1",
> Integer.TYPE))
> ).observeOn(SchedulerServices.single());
> })
> .concatWith(Folyam.fromPublisher(conn.close()).map(v -> 0));
> })
> .blockingSubscribe(System.out::println,
> Throwable::printStackTrace);
> }
> }
>
>
> An open question is where execute() should consume data and where results()
> should perform the row mapping (probably not on the NIO dispatch thread).
> There are a couple of possibilities, i.e., the driver uses a thread pool
> for them or asks the user to provide one. The latter one may give an
> opportunity to avoid one thread hop and utilize one of the user's existing
> thread pool.
>
> --
> Best regards,
> David Karnok
More information about the jdbc-spec-discuss
mailing list