loom-dev Digest, Vol 53, Issue 48
Cleber Muramoto
cleber.muramoto at gmail.com
Fri May 20 14:18:25 UTC 2022
>
> Date: Thu, 19 May 2022 13:00:57 -0700
> From: Swaranga Sarma <sarma.swaranga at gmail.com>
> To: loom-dev at openjdk.java.net
> Subject: Guidance on using the new Loom features
> Message-ID:
> <CAGZD2TCG2TDCM=
> pX3PdbQTg30n4DxHQcxZg+iPHis3MwfXqEXA at mail.gmail.com>
> Content-Type: text/plain; charset="UTF-8"
>
> I had posted this on a Reddit thread but in hindsight, it was perhaps not
> the best place to get eyes from the actual authors of Loom.
>
> I am hoping for some guidance on the "unlearning existing habits" comment
> from Ron Pressler in many Loom talks on how to use the new virtual threads
> and structured concurrency API.
>
> Say, I have an application that consumes messages from a distributed queue
> (example AWS SQS). During the processing of the messages, the application
> needs to batch them into a List of records and when the batch has
> accumulated ~X mins of data, it should flush them to a downstream
> database/storage system. Once all the records are successfully stored, we
> need to delete the message from the queue.
> Today the application is written with heavy use of ThreadPoolExecutors. We
> have a "poller" thread-pool whose threads are running an infinite loop
> where it polls for messages from the queue. When a message is polled, the
> poller thread submits a processing task to another thread-pool. On the
> CompletableFuture of the processing task, the pollers attach a thenRun
> callback to delete the message from the queue.
>
> The processor thread-pool takes the incoming messages, does the required
> processing(filtering, transformation) which may require a network call and
> then adds it to an in-memory BlockingQueue (along with the
> CompletableFuture).
>
> Finally, we have a single scheduled-thread that executes every X mins and
> drains the records from the BlockingQueue and flushes the records to the
> database. Once done, the thread marks all the CompletableFuture as
> completed.
>
> Here is a skeleton implementation:
>
> class App {
> public static void main(String[] args) {
> SQSQueue sqs = ...;
>
> BlockingQueue<Pair<Message, CompletableFuture>> dataQueue = ...;
> BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue =
> ...;
>
> ThreadPoolExecutor pollers = new ThreadPoolExecutor(numPollerThreads,
> ...);
> ThreadPoolExecutor processors = new
> ThreadPoolExecutor(numProcessorThreads, ...);
>
> for (int i = 0; i < numPollerThreads; i++) {
> pollers.submit(new Poller(sqs, dataQueue));
> }
>
> for (int i = 0; i < numProcessorThreads; i++) {
> processors.submit(new Processor(dataQueue, flushQueue));
> }
>
> newSingleThreadedScheduledExecutor()
> .scheduleAtFixedRate(10, MINUTES, new Flusher(flushQueue)::run);
> }
> }
>
> class Poller implements Runnable {
> SQSQueue sqs;
> BlockingQueue<Pair<Message, CompletableFuture>> dataQueue;
>
> void run() {
> while(true) {
> Message message = sqs.poll(); // network call
>
> CompletableFuture future = new CompletableFuture();
> future.thenRun(ignored -> sqs.delete(message.id()));
>
> dataQueue.offer(new Pair(message, future));
> }
> }
> }
>
> class Processor implements Runnable {
> BlockingQueue<Pair<Message, CompletableFuture>> dataQueue;
> BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue;
>
> void run() {
> while(true) {
> Pair<Message, CompletableFuture> p = dataQueue.poll();
>
> TransformedMessage tm = transform(p.message()); // network call
>
> flushQueue.offer(new Pair(tm, p.future()));
> }
> }
> }
>
> class Flusher implements Runnable {
> BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue;
>
> void run() {
> List<Pair<TransformedMessage, CompletableFuture>> list =
> flushQueue.drain();
> persistToDB(list); // network call
> list.forEach(p -> p.future().complete());
> }
> }
>
> First question I should ask is if this is even a good candidate for virtual
> threads. If so, then how and where would one go about making use of Virtual
> threads and possibly structured concurrency for such an app. This is a
> fairly large application processing millions of records per second today
> and I have omitted a lot of details including failure handling, retries but
> some starting guidance will be helpful for me to go back and implement it
> in a real production system and see how it works.
>
> Regards
> Swaranga
>
Given that SQS returns at most 10 msgs per call and it's usually cheaper to
use long polling with batches, perhaps an alternative flow, that would be a
perfect fit for loom would be:
1)Periodically query the queue attributes to get the approximate number of
messages
2)If 0, fire a single consumer. Else fire ceil(N/10) vthreads (possibly
capped with a threshold) with batching and long polling.
3)Since transformation is also blocking, fire again one vthread per message
4)Regroup transformed messages into N batches of 10, fire again M
vthreads to persist the batch, and then issue a single deleteMessageBatch
This flow will issue more persistToDB calls, but 10 times fewer SQS calls,
and potentially can reduce the current visibility timeout.
If you really need to accumulate more messages to persist to DB, you can
regroup the transformed messages in larger batches to persist and then
partition each large batch into chunks of 10 to delete. Of course, if you
don't care whether delete succeeds or not, you can again use a separate
vthread for deletion.
Cheers
More information about the loom-dev
mailing list