Guidance on using the new Loom features

Swaranga Sarma sarma.swaranga at gmail.com
Thu May 19 20:00:57 UTC 2022


I had posted this on a Reddit thread but in hindsight, it was perhaps not
the best place to get eyes from the actual authors of Loom.

I am hoping for some guidance on the "unlearning existing habits" comment
from Ron Pressler in many Loom talks on how to use the new virtual threads
and structured concurrency API.

Say, I have an application that consumes messages from a distributed queue
(example AWS SQS). During the processing of the messages, the application
needs to batch them into a List of records and when the batch has
accumulated ~X mins of data, it should flush them to a downstream
database/storage system. Once all the records are successfully stored, we
need to delete the message from the queue.

Today the application is written with heavy use of ThreadPoolExecutors. We
have a "poller" thread-pool whose threads are running an infinite loop
where it polls for messages from the queue. When a message is polled, the
poller thread submits a processing task to another thread-pool. On the
CompletableFuture of the processing task, the pollers attach a thenRun
callback to delete the message from the queue.

The processor thread-pool takes the incoming messages, does the required
processing(filtering, transformation) which may require a network call and
then adds it to an in-memory BlockingQueue (along with the
CompletableFuture).

Finally, we have a single scheduled-thread that executes every X mins and
drains the records from the BlockingQueue and flushes the records to the
database. Once done, the thread marks all the CompletableFuture as
completed.

Here is a skeleton implementation:

class App {
  public static void main(String[] args) {
    SQSQueue sqs = ...;

    BlockingQueue<Pair<Message, CompletableFuture>> dataQueue = ...;
    BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue =
...;

    ThreadPoolExecutor pollers = new ThreadPoolExecutor(numPollerThreads,
...);
    ThreadPoolExecutor processors = new
ThreadPoolExecutor(numProcessorThreads, ...);

    for (int i = 0; i < numPollerThreads; i++) {
      pollers.submit(new Poller(sqs, dataQueue));
    }

    for (int i = 0; i < numProcessorThreads; i++) {
      processors.submit(new Processor(dataQueue, flushQueue));
    }

    newSingleThreadedScheduledExecutor()
      .scheduleAtFixedRate(10, MINUTES, new Flusher(flushQueue)::run);
  }
}

class Poller implements Runnable {
  SQSQueue sqs;
  BlockingQueue<Pair<Message, CompletableFuture>> dataQueue;

  void run() {
    while(true) {
      Message message = sqs.poll(); // network call

      CompletableFuture future = new CompletableFuture();
      future.thenRun(ignored -> sqs.delete(message.id()));

      dataQueue.offer(new Pair(message, future));
    }
  }
}

class Processor implements Runnable {
  BlockingQueue<Pair<Message, CompletableFuture>> dataQueue;
  BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue;

  void run() {
    while(true) {
      Pair<Message, CompletableFuture> p = dataQueue.poll();

      TransformedMessage tm = transform(p.message()); // network call

      flushQueue.offer(new Pair(tm, p.future()));
    }
  }
}

class Flusher implements Runnable {
  BlockingQueue<Pair<TransformedMessage, CompletableFuture>> flushQueue;

  void run() {
    List<Pair<TransformedMessage, CompletableFuture>> list =
flushQueue.drain();
    persistToDB(list); // network call
    list.forEach(p -> p.future().complete());
  }
}

First question I should ask is if this is even a good candidate for virtual
threads. If so, then how and where would one go about making use of Virtual
threads and possibly structured concurrency for such an app. This is a
fairly large application processing millions of records per second today
and I have omitted a lot of details including failure handling, retries but
some starting guidance will be helpful for me to go back and implement it
in a real production system and see how it works.

Regards
Swaranga


More information about the loom-dev mailing list