Example of a Flow-based JDBC API

Dávid Karnok akarnokd at gmail.com
Fri Oct 13 15:30:38 UTC 2017


The JdbcExample demonstrates how one can use an external fluent library to
talk to the proposed API. The interfaces I proposed don't use any non-Java
SE dependencies.

In addition, the package the example interfaces reside in are by no means
an indication where such interfaces would live in the Java-SE.

2017-10-13 17:03 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:

> First observation, your sample code imports and uses extensively a library
> that does not exist in Java SE. This is contrary to one of the
> non-negotiable goals of this project, to integrate with Java SE.
>
> Douglas
>
>
> > On Oct 13, 2017, at 5:05 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
> >
> > Hi again,
> >
> > I've tried to come up with an API for a reactive JDBC driver ended up
> with
> > these of interfaces:
> >
> > https://github.com/akarnokd/akarnokd-misc-java9/tree/
> master/src/main/java/hu/akarnokd/java9/jdbc
> >
> > The entry point is JdbcConnectionSource which contains the connection
> > information. Its connect() method returns a
> Flow.Publisher<JdbcConnection>
> > where the JdbcConnection becomes available once the async connection was
> > successful.
> >
> > The JdbcConnection allows creating the connection specific JdbcOperations
> > and there is only one way to present them to the connection: execute()
> > which takes a Flow.Publisher of JdbcOperations and returns a
> Flow.Publisher
> > of JdbcStatements representing the execution of those operations. One
> > Flow.Publisher<JdbcOperation> is considered as a group. The various
> > execution modes (sequential, parallel, in transaction) are to be
> expressed
> > by specific JdbcOperation instances included in the operations
> > Flow.Publisher. Too many calls to execute() may be rejected by the
> driver.
> > Any other non-grouped or direct operation submission can be implemented
> on
> > top of execute(). I'm not certain the complication of operation (group)
> > graphs should be part of the API - such thing could be built on top of
> just
> > execute by controlling the consumption of the
> Flow.Publisher<JdbcOperation>
> > and intercepting events through the resulting
> > Flow.Publisher<JdbcStatement>s.
> >
> > Building a JdbcOperation should be straightforward, but there are two
> > additional properties:
> >
> > - a JdbcOperation could be marked deferred so its execution doesn't begin
> > unless there is a consumer for its result.
> > - some properties may be set from a Flow of bytes, either in the form of
> > byte arrays of in the form of ByteBuffers that are NIO friendly.
> >
> > An open question is how to create multi-row operations, such as batch
> > inserts represented by a single JdbcOperation. If individual
> JdbcOperations
> > are cheap to create, the driver could batch up subsequent JdbcOperations
> > with the same structure on its own. Alternatively, a
> > JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
> > JdbcOperations.Builder>>) method could consume a sequence of Ts and have
> > each of them set parameters on a per-item provided JdbcOperation.Builder.
> >
> > For each valid JdbcOperation, a JdbcStatement is created. The only way to
> > consume any result from an operation is through an user-provided row to T
> > transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).
> >
> > The function is called when each row of data is ready to be consumed
> > non-blockingly. POJO or ORM-based mapping could be implemented on top of
> > this.
> >
> > Again, there could be BLOB-like columns which may or may not be
> reasonable
> > to consume via get() can be consumed by getBytes() as byte arrays,
> > getBuffer() as a sequence of ByteBuffers and one overload where the user
> > can specify a supplier for custom ByteBuffers. These are the dual of the
> > JdbcOperation.parameterBuffer() methods which should also help playing
> nice
> > with NIO.
> >
> > Here is an example program that would consume such API via the help of
> the
> > Reactive4JavaFlow library that supports the Flow API directly:
> >
> > import hu.akarnokd.reactive4javaflow.*;
> >
> > public class JdbcExample {
> >
> >    static JdbcConnectionSource connectionSource() {
> >        return null;
> >    }
> >
> >    public static void main(String[] args) {
> >        Esetleg.fromPublisher(connectionSource().connect())
> >                .observeOn(SchedulerServices.computation())
> >                .flatMapPublisher(conn -> {
> >                    return Folyam.fromPublisher(conn.execute(
> >                            Esetleg.fromCallable(() -> {
> >                                return conn.query()
> >                                        .query("SELECT :v FROM DUAL")
> >                                        .parameter("v", JdbcDataType.INT,
> 1)
> >                                        .build();
> >                            })
> >                    ))
> >                    .flatMap(stmt -> {
> >                        return Folyam.fromPublisher(
> >                                    stmt.results(row -> row.get("1",
> > Integer.TYPE))
> >                                ).observeOn(SchedulerServices.single());
> >                    })
> >                    .concatWith(Folyam.fromPublisher(conn.close()).map(v
> -> 0));
> >                })
> >                .blockingSubscribe(System.out::println,
> > Throwable::printStackTrace);
> >    }
> > }
> >
> >
> > An open question is where execute() should consume data and where
> results()
> > should perform the row mapping (probably not on the NIO dispatch thread).
> > There are a couple of possibilities, i.e., the driver uses a thread pool
> > for them or asks the user to provide one. The latter one may give an
> > opportunity to avoid one thread hop and utilize one of the user's
> existing
> > thread pool.
> >
> > --
> > Best regards,
> > David Karnok
>
>


-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list