Example of a Flow-based JDBC API

Dávid Karnok akarnokd at gmail.com
Fri Oct 13 17:29:30 UTC 2017


In that case, using any Flow API in the async JDBC API appears to be a big
no-no as there is nothing but the SubmissionPublisher in Java SE that
implements parts of it and SubmissionPublisher is not a Processor
(backpressure doesn't compose through SP). I don't think there are any
plans to include Flow transformation and coordination logic in any future
Java SE.

Note though that implementing such Flow-based logic is a non-trivial task
and Reactive Streams users, who would be the main consumers of a reactive
JDBC, are quite customed to picking a vendor for their reactive
transformation needs. If you don't plan to provide implementation behind
the async JDBC API, just the interfaces and enums, a service provider
interface nature should mean no additional disadvantage to driver vendors
or end-users.


2017-10-13 19:04 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:

> If you want to demonstrate that your ideas are appropriate for this
> project you need to show example code that uses only the stuff that will be
> in the Java SE release that includes this project, Java 10/19.x/whatever.
> The API this project specifics must be usable without any external library
> other than a vendor implementation. Sure additional external libraries can
> be helpful e.g. Hibernate, but the API must be usable without any such
> libraries e.g. JDBC. We will not get approval of an API that is useless
> without support from a non-standard library.
>
> There is unlikely to be any internal user of any database access API in
> Java SE. The API will be used exclusively by code outside of Java SE. But
> the API cannot require a non-standard library (in addition to a vendor
> implementation) to be of use. It must be usable within Java SE.
>
> Douglas
>
>
> On Oct 13, 2017, at 9:19 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
>
> I don't understand your response.
>
> Isn't the aim of your project to have an API that will be utilized by
> other libraries and programs? Or is it intended to be used solely
> internally by Java SE? Does that mean that driver implementations should
> also be in Java SE otherwise demonstrating, let's say, a MySQL driver usage
> is by definition invalid?
>
> Do you expect me to demonstrate the usage of my proposed api by
> hand-crafting flow operations on it?
>
>
> 2017-10-13 17:37 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:
>
>> My understanding of the purpose of your example code is to demonstrate
>> how your ideas would be used. Since the goal is integration with Java SE
>> there is no value to an example that depends on an external library; an
>> external library is not Java SE. I am not aware of a project to add a
>> library such as the one you reference to Java SE. This project will not
>> undertake that effort. If you want to demonstrate the suitability of your
>> ideas for this project your example code can only use what will be in the
>> Java SE release that will include your ideas.
>>
>> Douglas
>>
>> On Oct 13, 2017, at 8:30 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
>>
>> The JdbcExample demonstrates how one can use an external fluent library
>> to talk to the proposed API. The interfaces I proposed don't use any
>> non-Java SE dependencies.
>>
>> In addition, the package the example interfaces reside in are by no means
>> an indication where such interfaces would live in the Java-SE.
>>
>> 2017-10-13 17:03 GMT+02:00 Douglas Surber <douglas.surber at oracle.com>:
>>
>>> First observation, your sample code imports and uses extensively a
>>> library that does not exist in Java SE. This is contrary to one of the
>>> non-negotiable goals of this project, to integrate with Java SE.
>>>
>>> Douglas
>>>
>>>
>>> > On Oct 13, 2017, at 5:05 AM, Dávid Karnok <akarnokd at gmail.com> wrote:
>>> >
>>> > Hi again,
>>> >
>>> > I've tried to come up with an API for a reactive JDBC driver ended up
>>> with
>>> > these of interfaces:
>>> >
>>> > https://github.com/akarnokd/akarnokd-misc-java9/tree/
>>> master/src/main/java/hu/akarnokd/java9/jdbc
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_akarnokd_akarnokd-2Dmisc-2Djava9_tree_master_src_main_java_hu_akarnokd_java9_jdbc&d=DwMFaQ&c=RoP1YumCXCgaWHvlZYR8PQcxBKCX5YTpkKY057SbK10&r=ChRVNBZ3Ru5F7CzL9kG_sNBRUO0uuqD6z6ltcMO-LbA&m=R2e6z8LhZJgVhY_0_ma_FmV3rkOrNcLWs0l_WWu-44s&s=0iUukc6OkUFrAv3k8GwZlbcb0lvmiSeHdOgExdV6zQo&e=>
>>> >
>>> > The entry point is JdbcConnectionSource which contains the connection
>>> > information. Its connect() method returns a
>>> Flow.Publisher<JdbcConnection>
>>> > where the JdbcConnection becomes available once the async connection
>>> was
>>> > successful.
>>> >
>>> > The JdbcConnection allows creating the connection specific
>>> JdbcOperations
>>> > and there is only one way to present them to the connection: execute()
>>> > which takes a Flow.Publisher of JdbcOperations and returns a
>>> Flow.Publisher
>>> > of JdbcStatements representing the execution of those operations. One
>>> > Flow.Publisher<JdbcOperation> is considered as a group. The various
>>> > execution modes (sequential, parallel, in transaction) are to be
>>> expressed
>>> > by specific JdbcOperation instances included in the operations
>>> > Flow.Publisher. Too many calls to execute() may be rejected by the
>>> driver.
>>> > Any other non-grouped or direct operation submission can be
>>> implemented on
>>> > top of execute(). I'm not certain the complication of operation (group)
>>> > graphs should be part of the API - such thing could be built on top of
>>> just
>>> > execute by controlling the consumption of the
>>> Flow.Publisher<JdbcOperation>
>>> > and intercepting events through the resulting
>>> > Flow.Publisher<JdbcStatement>s.
>>> >
>>> > Building a JdbcOperation should be straightforward, but there are two
>>> > additional properties:
>>> >
>>> > - a JdbcOperation could be marked deferred so its execution doesn't
>>> begin
>>> > unless there is a consumer for its result.
>>> > - some properties may be set from a Flow of bytes, either in the form
>>> of
>>> > byte arrays of in the form of ByteBuffers that are NIO friendly.
>>> >
>>> > An open question is how to create multi-row operations, such as batch
>>> > inserts represented by a single JdbcOperation. If individual
>>> JdbcOperations
>>> > are cheap to create, the driver could batch up subsequent
>>> JdbcOperations
>>> > with the same structure on its own. Alternatively, a
>>> > JdbcOperation.parameters(Flow.Publisher<T> items, BiConsumer<T,
>>> > JdbcOperations.Builder>>) method could consume a sequence of Ts and
>>> have
>>> > each of them set parameters on a per-item provided
>>> JdbcOperation.Builder.
>>> >
>>> > For each valid JdbcOperation, a JdbcStatement is created. The only way
>>> to
>>> > consume any result from an operation is through an user-provided row
>>> to T
>>> > transformer: Flow.Publisher<T> results(Function<JdbcRow, T>).
>>> >
>>> > The function is called when each row of data is ready to be consumed
>>> > non-blockingly. POJO or ORM-based mapping could be implemented on top
>>> of
>>> > this.
>>> >
>>> > Again, there could be BLOB-like columns which may or may not be
>>> reasonable
>>> > to consume via get() can be consumed by getBytes() as byte arrays,
>>> > getBuffer() as a sequence of ByteBuffers and one overload where the
>>> user
>>> > can specify a supplier for custom ByteBuffers. These are the dual of
>>> the
>>> > JdbcOperation.parameterBuffer() methods which should also help
>>> playing nice
>>> > with NIO.
>>> >
>>> > Here is an example program that would consume such API via the help of
>>> the
>>> > Reactive4JavaFlow library that supports the Flow API directly:
>>> >
>>> > import hu.akarnokd.reactive4javaflow.*;
>>> >
>>> > public class JdbcExample {
>>> >
>>> >    static JdbcConnectionSource connectionSource() {
>>> >        return null;
>>> >    }
>>> >
>>> >    public static void main(String[] args) {
>>> >        Esetleg.fromPublisher(connectionSource().connect())
>>> >                .observeOn(SchedulerServices.computation())
>>> >                .flatMapPublisher(conn -> {
>>> >                    return Folyam.fromPublisher(conn.execute(
>>> >                            Esetleg.fromCallable(() -> {
>>> >                                return conn.query()
>>> >                                        .query("SELECT :v FROM DUAL")
>>> >                                        .parameter("v",
>>> JdbcDataType.INT, 1)
>>> >                                        .build();
>>> >                            })
>>> >                    ))
>>> >                    .flatMap(stmt -> {
>>> >                        return Folyam.fromPublisher(
>>> >                                    stmt.results(row -> row.get("1",
>>> > Integer.TYPE))
>>> >                                ).observeOn(SchedulerServices.
>>> single());
>>> >                    })
>>> >                    .concatWith(Folyam.fromPublisher(conn.close()).map(v
>>> -> 0));
>>> >                })
>>> >                .blockingSubscribe(System.out::println,
>>> > Throwable::printStackTrace);
>>> >    }
>>> > }
>>> >
>>> >
>>> > An open question is where execute() should consume data and where
>>> results()
>>> > should perform the row mapping (probably not on the NIO dispatch
>>> thread).
>>> > There are a couple of possibilities, i.e., the driver uses a thread
>>> pool
>>> > for them or asks the user to provide one. The latter one may give an
>>> > opportunity to avoid one thread hop and utilize one of the user's
>>> existing
>>> > thread pool.
>>> >
>>> > --
>>> > Best regards,
>>> > David Karnok
>>>
>>>
>>
>>
>> --
>> Best regards,
>> David Karnok
>>
>>
>>
>
>
> --
> Best regards,
> David Karnok
>
>
>


-- 
Best regards,
David Karnok


More information about the jdbc-spec-discuss mailing list