Example of a Flow-based JDBC API

Dávid Karnok akarnokd at gmail.com
Fri Oct 13 12:05:39 UTC 2017


Hi again,

I've tried to come up with an API for a reactive JDBC driver ended up with
these of interfaces:

https://github.com/akarnokd/akarnokd-misc-java9/tree/master/src/main/java/hu/akarnokd/java9/jdbc

The entry point is JdbcConnectionSource which contains the connection
information. Its connect() method returns a Flow.Publisher<JdbcConnection>
where the JdbcConnection becomes available once the async connection was
successful.

The JdbcConnection allows creating the connection specific JdbcOperations
and there is only one way to present them to the connection: execute()
which takes a Flow.Publisher of JdbcOperations and returns a Flow.Publisher
of JdbcStatements representing the execution of those operations. One
Flow.Publisher<JdbcOperation> is considered as a group. The various
execution modes (sequential, parallel, in transaction) are to be expressed
by specific JdbcOperation instances included in the operations
Flow.Publisher. Too many calls to execute() may be rejected by the driver.
Any other non-grouped or direct operation submission can be implemented on
top of execute(). I'm not certain the complication of operation (group)
graphs should be part of the API - such thing could be built on top of just
execute by controlling the consumption of the Flow.Publisher<JdbcOperation>
and intercepting events through the resulting
Flow.Publisher<JdbcStatement>s.

Building a JdbcOperation should be straightforward, but there are two
additional properties:

- a JdbcOperation could be marked deferred so its execution doesn't begin
unless there is a consumer for its result.
- some properties may be set from a Flow of bytes, either in the form of
byte arrays of in the form of ByteBuffers that are NIO friendly.

An open question is how to create multi-row operations, such as batch
inserts represented by a single JdbcOperation. If individual JdbcOperations
are cheap to create, the driver could batch up subsequent JdbcOperations
with the same structure on its own. Alternatively, a
JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
JdbcOperations.Builder>>) method could consume a sequence of Ts and have
each of them set parameters on a per-item provided JdbcOperation.Builder.

For each valid JdbcOperation, a JdbcStatement is created. The only way to
consume any result from an operation is through an user-provided row to T
transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).

The function is called when each row of data is ready to be consumed
non-blockingly. POJO or ORM-based mapping could be implemented on top of
this.

Again, there could be BLOB-like columns which may or may not be reasonable
to consume via get() can be consumed by getBytes() as byte arrays,
getBuffer() as a sequence of ByteBuffers and one overload where the user
can specify a supplier for custom ByteBuffers. These are the dual of the
JdbcOperation.parameterBuffer() methods which should also help playing nice
with NIO.

Here is an example program that would consume such API via the help of the
Reactive4JavaFlow library that supports the Flow API directly:

import hu.akarnokd.reactive4javaflow.*;

public class JdbcExample {

    static JdbcConnectionSource connectionSource() {
        return null;
    }

    public static void main(String[] args) {
        Esetleg.fromPublisher(connectionSource().connect())
                .observeOn(SchedulerServices.computation())
                .flatMapPublisher(conn -> {
                    return Folyam.fromPublisher(conn.execute(
                            Esetleg.fromCallable(() -> {
                                return conn.query()
                                        .query("SELECT :v FROM DUAL")
                                        .parameter("v", JdbcDataType.INT, 1)
                                        .build();
                            })
                    ))
                    .flatMap(stmt -> {
                        return Folyam.fromPublisher(
                                    stmt.results(row -> row.get("1",
Integer.TYPE))
                                ).observeOn(SchedulerServices.single());
                    })
                    .concatWith(Folyam.fromPublisher(conn.close()).map(v -> 0));
                })
                .blockingSubscribe(System.out::println,
Throwable::printStackTrace);
    }
}


An open question is where execute() should consume data and where results()
should perform the row mapping (probably not on the NIO dispatch thread).
There are a couple of possibilities, i.e., the driver uses a thread pool
for them or asks the user to provide one. The latter one may give an
opportunity to avoid one thread hop and utilize one of the user's existing
thread pool.

-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list