Guidance on using the new Loom features
Swaranga Sarma
sarma.swaranga at gmail.com
Thu May 19 20:00:57 UTC 2022
I had posted this on a Reddit thread but in hindsight, it was perhaps not
the best place to get eyes from the actual authors of Loom.
I am hoping for some guidance on the "unlearning existing habits" comment
from Ron Pressler in many Loom talks on how to use the new virtual threads
and structured concurrency API.
Say, I have an application that consumes messages from a distributed queue
(example AWS SQS). During the processing of the messages, the application
needs to batch them into a List of records and when the batch has
accumulated ~X mins of data, it should flush them to a downstream
database/storage system. Once all the records are successfully stored, we
need to delete the message from the queue.
Today the application is written with heavy use of ThreadPoolExecutors. We
have a "poller" thread-pool whose threads are running an infinite loop
where it polls for messages from the queue. When a message is polled, the
poller thread submits a processing task to another thread-pool. On the
CompletableFuture of the processing task, the pollers attach a thenRun
callback to delete the message from the queue.
The processor thread-pool takes the incoming messages, does the required
processing(filtering, transformation) which may require a network call and
then adds it to an in-memory BlockingQueue (along with the
CompletableFuture).
Finally, we have a single scheduled-thread that executes every X mins and
drains the records from the BlockingQueue and flushes the records to the
database. Once done, the thread marks all the CompletableFuture as
completed.
Here is a skeleton implementation:
class App {
public static void main(String[] args) {
SQSQueue sqs = ...;
BlockingQueue<Pair<Message, CompletableFuture>> dataQueue = ...;
BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue =
...;
ThreadPoolExecutor pollers = new ThreadPoolExecutor(numPollerThreads,
...);
ThreadPoolExecutor processors = new
ThreadPoolExecutor(numProcessorThreads, ...);
for (int i = 0; i < numPollerThreads; i++) {
pollers.submit(new Poller(sqs, dataQueue));
}
for (int i = 0; i < numProcessorThreads; i++) {
processors.submit(new Processor(dataQueue, flushQueue));
}
newSingleThreadedScheduledExecutor()
.scheduleAtFixedRate(10, MINUTES, new Flusher(flushQueue)::run);
}
}
class Poller implements Runnable {
SQSQueue sqs;
BlockingQueue<Pair<Message, CompletableFuture>> dataQueue;
void run() {
while(true) {
Message message = sqs.poll(); // network call
CompletableFuture future = new CompletableFuture();
future.thenRun(ignored -> sqs.delete(message.id()));
dataQueue.offer(new Pair(message, future));
}
}
}
class Processor implements Runnable {
BlockingQueue<Pair<Message, CompletableFuture>> dataQueue;
BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue;
void run() {
while(true) {
Pair<Message, CompletableFuture> p = dataQueue.poll();
TransformedMessage tm = transform(p.message()); // network call
flushQueue.offer(new Pair(tm, p.future()));
}
}
}
class Flusher implements Runnable {
BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue;
void run() {
List<Pair<TransformedMessage, CompletableFuture>> list =
flushQueue.drain();
persistToDB(list); // network call
list.forEach(p -> p.future().complete());
}
}
First question I should ask is if this is even a good candidate for virtual
threads. If so, then how and where would one go about making use of Virtual
threads and possibly structured concurrency for such an app. This is a
fairly large application processing millions of records per second today
and I have omitted a lot of details including failure handling, retries but
some starting guidance will be helpful for me to go back and implement it
in a real production system and see how it works.
Regards
Swaranga
More information about the loom-dev
mailing list