Performance of pooling virtual threads vs. semaphores

robert engels rengels at ix.netcom.com
Thu May 30 15:36:47 UTC 2024


Attached is a version that doesn’t throttle the submitter (it is not fully correct but enough for testing). It is better than scenario 2, not as good as 3. The increased number of carrier threads (without cores) hurts performance - as is expected. I would guess if the OP tries scenario 4 on his test machine, it will perform similar to scenario 3.

iMac:vt_test robertengels$ time java -Djdk.virtualThreadScheduler.parallelism=128 Main dummy 4 1000000
all tasks submitted

real	0m37.820s
user	2m56.453s
sys	0m9.973s

iMac:vt_test robertengels$ time java Main dummy 4 1000000
all tasks submitted

real	0m37.992s
user	2m25.169s
sys	0m3.943s

import java.util.Collection;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

public class Main {

  private static Semaphore semaphore = null;
  private static int sink = 0;

  public static void main(String[] args) {
    int strategy = 0;
    int parallelism = 600;
    int numTasks = 10000;

    if (args.length > 1) {
      strategy = Integer.parseInt(args[1]);
    }

    if (args.length > 2) {
      numTasks = Integer.parseInt(args[2]);
    }

    ExecutorService executor;
    switch (strategy) {
      case 1 -> {
        executor = new ForkJoinPool(parallelism);
      }
      case 2 -> {
        executor = Executors.newVirtualThreadPerTaskExecutor();
        semaphore = new Semaphore(parallelism);
      }
      case 3 -> {
        executor = Executors.newFixedThreadPool(parallelism, Thread.ofVirtual().factory());
      }
      case 4 -> {
        executor = new VirtualThreadExecutorService(parallelism);
      }
      default -> {
        throw new IllegalArgumentException();
      }
    }

    try (executor) {
      for (var i = 0; i < numTasks; ++i) {
        executor.execute(Main::task);
      }
      System.out.println("all tasks submitted");
    }
  }

  private static void task() {
    if (semaphore != null) {
      try {
        semaphore.acquire();
      } catch (InterruptedException e) {
        throw new IllegalStateException();
      }
    }

    try {
      Main:sink += fibonacci(20);
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
      }
      Main:sink += fibonacci(20);
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
      }
      Main:sink += fibonacci(20);
    } finally {
      if (semaphore != null) {
        semaphore.release();
      }
    }
  }

  private static int fibonacci(int n) {
    if (n == 0) {
      return 0;
    } else if (n == 1) {
      return 1;
    } else {
      return fibonacci(n - 1) + fibonacci(n - 2);
    }
  }
}

final class VirtualThreadExecutorService extends AbstractExecutorService {
    private final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
    private final AtomicInteger count = new AtomicInteger(0);
    private final Semaphore semaphore;
    private volatile boolean shutdown = false;
    private volatile Thread waiter = null;

    public VirtualThreadExecutorService(int maxConcurrency) {
        semaphore = new Semaphore(maxConcurrency);
    }

    @Override
    public void shutdown() {
        shutdown=true;
    }

    @Override
    public List<Runnable> shutdownNow() {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public boolean isShutdown() {
        return shutdown;
    }

    @Override
    public boolean isTerminated() {
        return count.get()==0;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        waiter = Thread.currentThread();
        while(count.get()>0) {
            LockSupport.park();
        }
        return true;
    }

    private void maybeStartAnother() {
        if(!queue.isEmpty() && semaphore.tryAcquire()) {
            Runnable r = queue.poll();
            if(r!=null) {
                Thread.ofVirtual().start(r);
            } else {
                semaphore.release();
            }
        }
    }

    @Override
    public void execute(Runnable command) {
        if(shutdown) throw new IllegalStateException("executor is shutdown");
        count.incrementAndGet();
        queue.add(() -> {
            command.run();
            semaphore.release();
            if(count.decrementAndGet()==0) {
                LockSupport.unpark(waiter);
            }
            maybeStartAnother();
        });
        maybeStartAnother();
    }
}



> On May 30, 2024, at 9:50 AM, robert engels <rengels at ix.netcom.com> wrote:
> 
> A minor rework would fix that - but the ThreadPerTaskExecutor is not public, so it would have been more work. The total task submission time is insignificant - that is not the issue. It is only added an entry per task to concurrent linked list.
> 
> My tests are easily outperforming if I only have 4 cores and the reporter has 64 - on an ops per cpu basis. The OP also states: "A benchmark run on a 128 core machine is included below.” So, it may be 64, but it probably has dual hardware threads. Also, look at the user and system cpu times - those are more important than wall time if you are attempting to look at efficiency of schedulers. My reported times are way lower across every scenario - which is why it appears something else may be affecting the OP’s testing.
> 
>> On May 30, 2024, at 9:08 AM, Attila Kelemen <attila.kelemen85 at gmail.com <mailto:attila.kelemen85 at gmail.com>> wrote:
>> 
>> Your case 4 is unfair. It doesn't have the same behavior as the other 3, because you are pushing back (i.e., you are limiting the task producer thread) which is of course more efficient. Even though pushing back is normally necessary in real world code, you are just simply measuring something different, and I suppose the question is not how to make this code faster, but why case 2 scales so weirdly with the number of tasks compared to the other two scenarios.
>> 
>> Also, I might be misreading something, but you are not outperforming the reported numbers. You are about 4s slower. FYI: Liam had the CPU near the end of his email: "AMD Ryzen Threadripper PRO 3995WX" (64 real cores).
>> 
>> robert engels <rengels at ix.netcom.com <mailto:rengels at ix.netcom.com>> ezt írta (időpont: 2024. máj. 30., Cs, 15:44):
>> As a somewhat important aside, I am guessing based on the original reported timings, that it is not a real 128 core machine. It is most likely 128 virtual cores, and probably shared. I don’t think you should be performing CPU benchmarks in those environments. The fact that my 8 core machine (only 4 real cores) is outperforming a 128 core machine is not a good sign.
>> 
>>> On May 30, 2024, at 8:35 AM, robert engels <rengels at ix.netcom.com <mailto:rengels at ix.netcom.com>> wrote:
>>> 
>>> Reworking the design (scenario 4) brings the pooling and new VT per task in line:
>>> 
>>> iMac:vt_test robertengels$ time java -Djdk.virtualThreadScheduler.parallelism=128 Main dummy 2 1000000
>>> 
>>> real	0m38.405s
>>> user	3m42.240s
>>> sys	0m12.976s
>>> 
>>> iMac:vt_test robertengels$ time java -Djdk.virtualThreadScheduler.parallelism=128 Main dummy 3 1000000
>>> 
>>> real	0m37.710s
>>> user	2m28.901s
>>> sys	0m3.427s
>>> 
>>> iMac:vt_test robertengels$ time java -Djdk.virtualThreadScheduler.parallelism=128 Main dummy 4 1000000
>>> 
>>> real	0m38.441s
>>> user	2m39.547s
>>> sys	0m7.027s
>>> 
>>> My machine only has 8 real cores, so I expect greater system cpu usage due to kernel thread scheduling. There is also the additional semaphore management.
>>> 
>>> In scenario 2, the system is starting and scheduling all 1 million threads, just to have them go park, waiting on the semaphore, then to be woken up and rescheduled. This is not insignificant. Scenario 4 avoid this.
>>> 
>>> import java.util.concurrent.ExecutorService;
>>> import java.util.concurrent.Executors;
>>> import java.util.concurrent.ForkJoinPool;
>>> import java.util.concurrent.Semaphore;
>>> import java.util.concurrent.ThreadFactory;
>>> 
>>> public class Main {
>>> 
>>>   private static Semaphore semaphore = null;
>>>   private static int sink = 0;
>>> 
>>>   public static void main(String[] args) {
>>>     int strategy = 0;
>>>     int parallelism = 600;
>>>     int numTasks = 10000;
>>> 
>>>     if (args.length > 1) {
>>>       strategy = Integer.parseInt(args[1]);
>>>     }
>>> 
>>>     if (args.length > 2) {
>>>       numTasks = Integer.parseInt(args[2]);
>>>     }
>>> 
>>>     ExecutorService executor;
>>>     switch (strategy) {
>>>       case 1 -> {
>>>         executor = new ForkJoinPool(parallelism);
>>>       }
>>>       case 2 -> {
>>>         executor = Executors.newVirtualThreadPerTaskExecutor();
>>>         semaphore = new Semaphore(parallelism);
>>>       }
>>>       case 3 -> {
>>>         executor = Executors.newFixedThreadPool(parallelism, Thread.ofVirtual().factory());
>>>       }
>>>       case 4 -> {
>>>         var factorySem = new Semaphore(parallelism);
>>>         ThreadFactory tf = (Runnable r) -> {
>>>             try {
>>>                 factorySem.acquire();
>>>             } catch (InterruptedException ex) {
>>>                 throw new IllegalStateException("interrupted");
>>>             }
>>>             return Thread.ofVirtual().unstarted(() -> 
>>>                 {
>>>                     try { 
>>>                         r.run(); 
>>>                     } finally { 
>>>                         factorySem.release();
>>>                     }
>>>                 });
>>>         };
>>>         executor = Executors.newThreadPerTaskExecutor(tf);
>>>       }
>>>       default -> {
>>>         throw new IllegalArgumentException();
>>>       }
>>>     }
>>> 
>>>     try (executor) {
>>>       for (var i = 0; i < numTasks; ++i) {
>>>         executor.execute(Main::task);
>>>       }
>>>     }
>>>   }
>>> 
>>>   private static void task() {
>>>     if (semaphore != null) {
>>>       try {
>>>         semaphore.acquire();
>>>       } catch (InterruptedException e) {
>>>         throw new IllegalStateException();
>>>       }
>>>     }
>>> 
>>>     try {
>>>       Main:sink += fibonacci(20);
>>>       try {
>>>         Thread.sleep(10);
>>>       } catch (InterruptedException e) {
>>>       }
>>>       Main:sink += fibonacci(20);
>>>       try {
>>>         Thread.sleep(10);
>>>       } catch (InterruptedException e) {
>>>       }
>>>       Main:sink += fibonacci(20);
>>>     } finally {
>>>       if (semaphore != null) {
>>>         semaphore.release();
>>>       }
>>>     }
>>>   }
>>> 
>>>   private static int fibonacci(int n) {
>>>     if (n == 0) {
>>>       return 0;
>>>     } else if (n == 1) {
>>>       return 1;
>>>     } else {
>>>       return fibonacci(n - 1) + fibonacci(n - 2);
>>>     }
>>>   }
>>> }
>>> 
>>> 
>>>> On May 30, 2024, at 7:27 AM, Robert Engels <rengels at ix.netcom.com <mailto:rengels at ix.netcom.com>> wrote:
>>>> 
>>>> I am going to dig in some more today - interesting problem. Is it maybe in scenario 2 you are creating the 1M queue entries and 1M VT at the same time? I don’t remember if the queue entry is is actually GCable until it completes in order to support error reporting. 
>>>> 
>>>>> On May 30, 2024, at 7:20 AM, Attila Kelemen <attila.kelemen85 at gmail.com <mailto:attila.kelemen85 at gmail.com>> wrote:
>>>>> 
>>>>> 
>>>>> They only create 600 VT, but they do create 1M queue entries for the executor, and the relative memory usage should be the same for the scenario of 10k tasks and the 1M (both in terms of bytes and number of objects). I would love to see the result of this experiment with the epsilon GC (given that the total memory usage should be manageable even for 1M tasks) to confirm or exclude the possibility of the GC scaling this noticeably poorly.
>>>>> 
>>>>> Robert Engels <rengels at ix.netcom.com <mailto:rengels at ix.netcom.com>> ezt írta (időpont: 2024. máj. 30., Cs, 14:10):
>>>>> That is what I pointed out - in scenario 2 you are creating 1M VT up front. The other cases only create at most 600 VT or platform threads. 
>>>>> 
>>>>> The peak memory usage in scenario 2 is much much higher. 
>>>>> 
>>>>>> On May 30, 2024, at 7:07 AM, Attila Kelemen <attila.kelemen85 at gmail.com <mailto:attila.kelemen85 at gmail.com>> wrote:
>>>>>> 
>>>>>> 
>>>>>> Though the additional work the VT has to do is understandable. However, I don't see them explaining these measurements. Because in the case of 10k tasks VT wins over FJP, but with 1M tasks, VT loses to FJP. What is the source of the scaling difference, when there are still only 128 carriers, and 600 concurrent threads in both cases? If this was merely more work, then I would expect to see the same relative difference between FJP and VT when there are 10k tasks and when there are 1M tasks. Just a wild naive guess: Could the GC scale worse for that many VTs, or is that a stupid idea?
>>>>>>  
>>>>>> 
>>>>>> If the concurrency for the virtual thread run is limited to the same 
>>>>>> value as the thread count in the thread pool runs then you are unlikely 
>>>>>> to see benefit. The increased CPU time probably isn't too surprising 
>>>>>> either. In the two runs with threads then the N task are queued once. In 
>>>>>> the virtual thread run then the tasks for the N virtual threads may be 
>>>>>> queued up to 4 times, one for the initial submit, one waiting for 
>>>>>> semaphore permit, and twice for the two sleeps. Also when CPU 
>>>>>> utilization is low (as I assume it is here) then the FJP scan does tend 
>>>>>> up to show up in profiles.
>>>>>> 
>>>>>> Has Chi looked into increasing the concurrency so that it's not limited 
>>>>>> to 600? Concurrency may need limited at finer grain the "real world 
>>>>>> program", but may not the number of threads.
>>>>>> 
>>>>>> -Alan
>>>>>> 
>>> 
>> 
> 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20240530/a00dc0a8/attachment-0001.htm>


More information about the loom-dev mailing list