Example of a Flow-based JDBC API
Dávid Karnok
akarnokd at gmail.com
Fri Oct 13 15:30:38 UTC 2017
The JdbcExample demonstrates how one can use an external fluent library to
talk to the proposed API. The interfaces I proposed don't use any non-Java
SE dependencies.
In addition, the package the example interfaces reside in are by no means
an indication where such interfaces would live in the Java-SE.
2017-10-13 17:03 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:
> First observation, your sample code imports and uses extensively a library
> that does not exist in Java SE. This is contrary to one of the
> non-negotiable goals of this project, to integrate with Java SE.
>
> Douglas
>
>
> > On Oct 13, 2017, at 5:05 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
> >
> > Hi again,
> >
> > I've tried to come up with an API for a reactive JDBC driver ended up
> with
> > these of interfaces:
> >
> > https://github.com/akarnokd/akarnokd-misc-java9/tree/
> master/src/main/java/hu/akarnokd/java9/jdbc
> >
> > The entry point is JdbcConnectionSource which contains the connection
> > information. Its connect() method returns a
> Flow.Publisher<JdbcConnection>
> > where the JdbcConnection becomes available once the async connection was
> > successful.
> >
> > The JdbcConnection allows creating the connection specific JdbcOperations
> > and there is only one way to present them to the connection: execute()
> > which takes a Flow.Publisher of JdbcOperations and returns a
> Flow.Publisher
> > of JdbcStatements representing the execution of those operations. One
> > Flow.Publisher<JdbcOperation> is considered as a group. The various
> > execution modes (sequential, parallel, in transaction) are to be
> expressed
> > by specific JdbcOperation instances included in the operations
> > Flow.Publisher. Too many calls to execute() may be rejected by the
> driver.
> > Any other non-grouped or direct operation submission can be implemented
> on
> > top of execute(). I'm not certain the complication of operation (group)
> > graphs should be part of the API - such thing could be built on top of
> just
> > execute by controlling the consumption of the
> Flow.Publisher<JdbcOperation>
> > and intercepting events through the resulting
> > Flow.Publisher<JdbcStatement>s.
> >
> > Building a JdbcOperation should be straightforward, but there are two
> > additional properties:
> >
> > - a JdbcOperation could be marked deferred so its execution doesn't begin
> > unless there is a consumer for its result.
> > - some properties may be set from a Flow of bytes, either in the form of
> > byte arrays of in the form of ByteBuffers that are NIO friendly.
> >
> > An open question is how to create multi-row operations, such as batch
> > inserts represented by a single JdbcOperation. If individual
> JdbcOperations
> > are cheap to create, the driver could batch up subsequent JdbcOperations
> > with the same structure on its own. Alternatively, a
> > JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
> > JdbcOperations.Builder>>) method could consume a sequence of Ts and have
> > each of them set parameters on a per-item provided JdbcOperation.Builder.
> >
> > For each valid JdbcOperation, a JdbcStatement is created. The only way to
> > consume any result from an operation is through an user-provided row to T
> > transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).
> >
> > The function is called when each row of data is ready to be consumed
> > non-blockingly. POJO or ORM-based mapping could be implemented on top of
> > this.
> >
> > Again, there could be BLOB-like columns which may or may not be
> reasonable
> > to consume via get() can be consumed by getBytes() as byte arrays,
> > getBuffer() as a sequence of ByteBuffers and one overload where the user
> > can specify a supplier for custom ByteBuffers. These are the dual of the
> > JdbcOperation.parameterBuffer() methods which should also help playing
> nice
> > with NIO.
> >
> > Here is an example program that would consume such API via the help of
> the
> > Reactive4JavaFlow library that supports the Flow API directly:
> >
> > import hu.akarnokd.reactive4javaflow.*;
> >
> > public class JdbcExample {
> >
> > static JdbcConnectionSource connectionSource() {
> > return null;
> > }
> >
> > public static void main(String[] args) {
> > Esetleg.fromPublisher(connectionSource().connect())
> > .observeOn(SchedulerServices.computation())
> > .flatMapPublisher(conn -> {
> > return Folyam.fromPublisher(conn.execute(
> > Esetleg.fromCallable(() -> {
> > return conn.query()
> > .query("SELECT :v FROM DUAL")
> > .parameter("v", JdbcDataType.INT,
> 1)
> > .build();
> > })
> > ))
> > .flatMap(stmt -> {
> > return Folyam.fromPublisher(
> > stmt.results(row -> row.get("1",
> > Integer.TYPE))
> > ).observeOn(SchedulerServices.single());
> > })
> > .concatWith(Folyam.fromPublisher(conn.close()).map(v
> -> 0));
> > })
> > .blockingSubscribe(System.out::println,
> > Throwable::printStackTrace);
> > }
> > }
> >
> >
> > An open question is where execute() should consume data and where
> results()
> > should perform the row mapping (probably not on the NIO dispatch thread).
> > There are a couple of possibilities, i.e., the driver uses a thread pool
> > for them or asks the user to provide one. The latter one may give an
> > opportunity to avoid one thread hop and utilize one of the user's
> existing
> > thread pool.
> >
> > --
> > Best regards,
> > David Karnok
>
>
--
Best regards,
David Karnok
More information about the jdbc-spec-discuss
mailing list