Running on Java 22-ea+27-2262 (Preview)
Home of The JavaSpecialists' Newsletter

311Virtual Threads and Parallel Streams

Author: Dr Heinz M. KabutzDate: 2023-08-31Java Version: 21Category: Concurrency
 

Abstract: Virtual threads should not be used for CPU intensive tasks. The recommended approach is to continue using parallel streams for processing large data sets. However, we should be careful when invoking a parallel stream from a virtual thread. In this newsletter we propose a "safety valve" that can protect us - sometimes.

 

Welcome to the 311th edition of The Java(tm) Specialists' Newsletter, sent whilst watching the super blue moon up in the evening sky. The hot Cretan weather is slowly turning towards the much more pleasant autumn. My local "parea" - friends - and I cannot figure out why so many tourists come during the worst time of year. July and early August are suffocatingly hot here on Crete. The rest of the year is way better!

On to a fun newsletter, just in time for the next Java LTS release.

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Virtual Threads and Parallel Streams

Java 21 is around the corner, and it is one of the most exciting releases. Java 11 was great, with the Java Module System and many other amazing features. One of my biggest surprises is how slow some companies have been in moving off Java 8 (and 7). Even in 2023, I'm still selling courses on Mastering Java 11! Not every Java programmer has mastered modules and VarHandles yet. Java 17 was even better, with some game changing performance improvements and stunning language enhancements (records, enhanced switch, etc.). And yes, I am also booking courses for Mastering Java 17.

Java 21 is way way better.

We finally have virtual threads as part of a feature release. Ron Pressler, Alan Bateman, and their team have worked tirelessly for the last few years to give us a better concurrency model. We also have structured concurrency, but that is still in preview in Java 21. And yes, I also have a one-day course for Mastering Virtual Threads in Java. It takes one day if the student is already familiar with platform threads, otherwise we have a one day refresher in the course Mastering Platform Threads in Java (created for Java 21). This is a lot quicker than doing a full 4 day Reactive Programming Course. This is, of course, the selling point of virtual threads. It is much easier to code and to debug.

Whilst I'm sounding a bit like a loom fan boy right now, and I do like the technology not just a small amount, we do have to be careful. I mentioned Virtual Thread Deadlocks last year already, and this is particularly tricky with deadlocks in ReentrantLocks. Thread dumps with virtual threads are also tricky, as they might not show all the virtual threads currently running in the system (although in Java 21 they do by default).

Another issue that might cause surprises is what happens when we use parallel streams from within virtual threads. Virtual threads address the issue of scalability with blocking operations. However, they should not be used for high CPU activity. Why would anyone even think of using a parallel stream from within a virtual thread? Well, JEP 444: Virtual Threads appears to push us in that direction with the paragraph:

"It is not a goal to offer a new data parallelism construct in either the Java language or the Java libraries. The Stream API remains the preferred way to process large data sets in parallel."

Here is a trick question. How long will this take to run?

ForkJoinPool.commonPool().submit(() -> {
    long until = System.currentTimeMillis() + 1000;
    while (System.currentTimeMillis() <= until) ;
}).join();
  

We would think that it should take approximately one second. However, the trick is that it might never return if we have configured our common ForkJoinPool to have zero threads with the JVM parameter -Djava.util.concurrent.ForkJoinPool.common.parallelism=0. It thus depends on a JVM parameter. By default the parallelism is equal to the number of hardware threads minus one.

Now for the next quiz. This one is easier. How long will this take to execute?

IntStream.range(0, Runtime.getRuntime().availableProcessors())
        .parallel()
        .forEach(i -> {
            System.out.println(Thread.currentThread());
            long until = System.currentTimeMillis() + 1000;
            while (System.currentTimeMillis() <= until) ;
        });
  

The answer again depends on the size of the common ForkJoinPool. If it is the default value, in other words availableProcessors() - 1, then it will take roughly one second. If we set the parallelism to 0, then it will take roughly availableProcessors() seconds. Here is the output running it on my machine with one socket, 8 cores and 2 hardware threads per core:

Thread[#1,main,5,main]
Thread[#30,ForkJoinPool.commonPool-worker-2,5,main]
Thread[#33,ForkJoinPool.commonPool-worker-5,5,main]
Thread[#36,ForkJoinPool.commonPool-worker-8,5,main]
Thread[#31,ForkJoinPool.commonPool-worker-3,5,main]
Thread[#29,ForkJoinPool.commonPool-worker-1,5,main]
Thread[#35,ForkJoinPool.commonPool-worker-7,5,main]
Thread[#34,ForkJoinPool.commonPool-worker-6,5,main]
Thread[#38,ForkJoinPool.commonPool-worker-10,5,main]
Thread[#32,ForkJoinPool.commonPool-worker-4,5,main]
Thread[#39,ForkJoinPool.commonPool-worker-11,5,main]
Thread[#37,ForkJoinPool.commonPool-worker-9,5,main]
Thread[#41,ForkJoinPool.commonPool-worker-13,5,main]
Thread[#42,ForkJoinPool.commonPool-worker-14,5,main]
Thread[#40,ForkJoinPool.commonPool-worker-12,5,main]
Thread[#43,ForkJoinPool.commonPool-worker-15,5,main]

And if we set the parallelism to 0, we see this:

Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]
Thread[#1,main,5,main]

This is why parallel streams work, whether we have available threads in the common pool or not. The submitting thread always helps to do the work, and if no one helps, it does the work by itself.

Herein lies the issue with using parallel streams from within virtual threads. The virtual thread will be one of the threads doing the actual work, and this will reduce the number of available carrier threads for other virtual threads. Do this often enough, and we will prevent other virtual threads from being mounted.

There is a workaround, but it does not always work. We can create a safety valve around our parallel streams that will avoid using the submitter thread to do the work. However, it only works if we have at least two threads in the common pool. Ideally, we would check whether we have at least one thread in the common pool. However, the common pool reports that it has one thread, when it actually has zero. Yes, it lies to us deliberately, to avoid programmers seeing an ArithmeticException due to division by zero. In my opinion this is a mistake. Perhaps we should also never return size() == 0 from a collection for the same reason? (sarcasm) Parallel sorting of arrays also only works in parallel when we have at least 2 threads in the common pool.

Here is what the safety valve would look like:

public static <T> T safetyValve(Supplier<T> streamTask) {
    return Thread.currentThread().isVirtual()
            && ForkJoinPool.getCommonPoolParallelism() > 1 ?
            ForkJoinPool.commonPool()
                    .submit(streamTask::get).join() :
            streamTask.get();
}

We can then delegate to that whenever we need to use a parallel stream, regardless of whether we are running inside a virtual thread or not. For example, here we are calculating the IntSummaryStatistics of random values in parallel:

var stats = safetyValve(() ->
        ThreadLocalRandom.current().ints(100_000_000)
                .parallel()
                .sorted()
                .summaryStatistics());

If we run the code within a virtual thread without the safety valve, then the carrier thread would be one of the worker threads and would not be unmounted. But instead, we are creating a task, submitting that to the common pool, and then calling join(). This will unpark the virtual thread, making the carrier thread available to be used for other virtual threads. It won't work for single-core and dual-core machines, because in both cases the common pool parallelism will be reported as "1", same as when we deliberately set it to "0". In that case, I would recommend simply omitting the test for the getCommonPoolParallelism() size from the safety valve.

Here is a longer demo that shows what happens.

import java.util.concurrent.*;
import java.util.function.*;

public class SafetyValveDemo {
    public static <T> T safetyValve(Supplier<T> streamTask) {
        return Thread.currentThread().isVirtual()
                && ForkJoinPool.getCommonPoolParallelism() > 1 ?
                ForkJoinPool.commonPool()
                        .submit(streamTask::get).join() :
                streamTask.get();
    }

    public static void main(String... args) {
        long time = System.nanoTime();
        try (var pool = Executors.newVirtualThreadPerTaskExecutor()) {
            pool.submit(() -> {
                var thread1 = Thread.currentThread().toString();
                System.out.println(thread1);
                var stats = safetyValve(() ->
                        ThreadLocalRandom.current().ints(100_000_000)
                                .parallel()
                                .sorted()
                                .summaryStatistics());
                var thread2 = Thread.currentThread().toString();
                System.out.println(thread2);
                if (!thread1.equals(thread2))
                    System.out.println("unmounted");
                else
                    System.out.println("maybe not unmounted");
                return stats;
            });
            for (int i = 0; i < 100000; i++) {
                pool.submit(() -> {/* empty task */});
            }
        } finally {
            time = System.nanoTime() - time;
            System.out.printf("time = %dms%n", (time / 1_000_000));
        }
    }
}

Running this on my machine with the default settings results in output like this:

VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-10
unmounted
time = 3273ms

If I run it with -XX:ActiveProcessorCount=1 or -XX:ActiveProcessorCount=2, or with the common pool parallelism set to 0, then we will see output such as:

VirtualThread[#14]/runnable@ForkJoinPool-1-worker-1
VirtualThread[#14]/runnable@ForkJoinPool-1-worker-1
maybe not unmounted
time = 14223ms

I hope this helps a bit in case you need to work with large data sets using parallel streams and you want to be sure that your code will also work if being called from a virtual thread.

King regards

Heinz

P.S. Our Mastering Virtual Threads in Java Course is now available as an in-house course for 10 or more programmers, presented either virtually or in-person. Please contact me via email, on my website or via WhatsApp.

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...