Abstract: Java has support for parallelism baked into the JDK. We have parallel streams, parallel sort and CompletableFutures, all using the same common ForkJoinPool. In this newsletter we explore how to measure the effectiveness of our parallelism.
Welcome to the 297th edition of The Java(tm) Specialists' Newsletter. Last week the country shut down for two days due to an expected snowfall. It was chaotic and my poor mother-in-law was stuck in Athens Airport for over 34 hours. Not fun. But the sun was shining again today, which meant we got our walk and run on the beach, followed by my daily push-up routine and a splash in the sea. My target for January was 2000 push-ups, and fortunately I even exceeded that by a bit. February will be tougher with a target of 3000.
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
One of my side projects is trying to add a parallelMultiply() method to BigInteger. It makes sense once the numbers become large enough. The Toom Cook 3 algorithm is fairly easy to parallelize. Unfortunately we cannot achieve perfect parallelism, because a major bottleneck is the memory access. One of the reviewers asked how efficient the parallel algorithm is versus the sequential. The latency is certainly less, but what about the CPU consumption? Does this increase or stay the same?
This is not that easy to figure out. As far as I could tell, the Java Microbenchmarking Harness (JMH) does not give us easy access to these numbers. I had written some code in the past to tickle this information from the ForkJoinPool by writing a custom thread factory. It worked well enough, but at the time I didn't need it that desperately and thus stopped developing it.
The trick is to create a special ForkJoinWorkerThread factory by implementing the ForkJoinPool.ForkJoinWorkerThreadFactory. This is then set with the VM parameter -Djava.util.concurrent.ForkJoinPool.common.parallelism=OurFactoryClassName. Things went wonky when I hit a rather gnarly class loader bug (JDK-8280772) that seems to have crept into Java 12.
To make things easier, I set the property inside my ForkJoinPoolBench, hopefully before the common ForkJoinPool is used for the first time. I used the ideas from my ByteWatcher to also measure the amount of bytes allocated.
Before I show the ForkJoinPoolBench code, I'd like to demonstrate its usefulness. We can easily measure that sorting an int array in parallel is faster on a multi-core machine than sorting it sequentially. But which uses more CPU time? Let's try it out:
import eu.javaspecialists.tjsn.bench.*; import java.util.*; import java.util.concurrent.*; public class ArraySortBenchTest { public static void main(String... args) { int[] array = ThreadLocalRandom.current() .ints(100_000_000).toArray(); for (int i = 0; i < 3; i++) { int[] sequentialToSort = array.clone(); ForkJoinPoolBench.test( () -> Arrays.sort(sequentialToSort), new DefaultStatsListener("sequentialSort" + i)); } for (int i = 0; i < 3; i++) { int[] parallelToSort = array.clone(); ForkJoinPoolBench.test( () -> Arrays.parallelSort(parallelToSort), new DefaultStatsListener("parallelSort" + i)); } } }
Running this on a 1-6-2 server with Java 17, we get the following results:
Results for sequentialSort2 real 0m6.841s user 0m6.840s sys 0m0.000s mem 344.0B Results for parallelSort2 real 0m0.863s user 0m9.200s sys 0m0.010s mem 387.0MB
We can thus say that the parallel sort was 7.9x faster in real time, but used 1.35x as much CPU time in total. The parallel sort also allocated 448 MB/s. However, we must treat these figures with caution. We only have 6 cores on that particular server, so how can it be almost 8x faster with real time? It is possible that the two sorting algorithms are slightly different.
Here is another example, calculating 200k! using first a sequential stream and then a parallel stream. The parallel is faster, wildly so, but not for the reasons we might imagine. The parallelism is incidental to the performance improvement.
import eu.javaspecialists.tjsn.bench.*; import java.math.*; import java.util.stream.*; public class FactorialByStreamDemo { public static void main(String... args) { ForkJoinPoolBench.test( () -> IntStream.rangeClosed(1, 200_000) .mapToObj(BigInteger::valueOf) .reduce(BigInteger.ONE, BigInteger::multiply), new DefaultStatsListener("sequentialFactorial")); ForkJoinPoolBench.test( () -> IntStream.rangeClosed(1, 200_000) .parallel() .mapToObj(BigInteger::valueOf) .reduce(BigInteger.ONE, BigInteger::multiply), new DefaultStatsListener("parallelFactorial")); } }
Here is the output from the run:
Results for sequentialFactorial real 0m9.838s user 0m9.670s sys 0m0.106s mem 52.9GB Results for parallelFactorial real 0m0.281s user 0m1.760s sys 0m0.056s mem 1.5GB
On a 1-6-2 machine, the parallel is 35x faster and uses 35x less memory. What gives?
Let's rewrite the algorithm using CompletableFuture, either as serial or as parallel using thenCombineAsync().
import eu.javaspecialists.tjsn.bench.*; import java.math.*; import java.util.concurrent.*; import java.util.function.*; public class FactorialByCompletableFutureDemo { private static final BinaryOperator<CompletableFuture<BigInteger>> SERIAL = (a, b) -> a.thenCombine(b, BigInteger::multiply); private static final BinaryOperator<CompletableFuture<BigInteger>> PARALLEL = (a, b) -> a.thenCombineAsync(b, BigInteger::multiply); private static BigInteger factorial( int n, BinaryOperator<CompletableFuture<BigInteger>> op) { return factorial(0, n, op).join(); } private static CompletableFuture<BigInteger> factorial( int from, int to, BinaryOperator<CompletableFuture<BigInteger>> op) { if (from == to) { BigInteger result = from == 0 ? BigInteger.ONE : BigInteger.valueOf(from); return CompletableFuture.completedFuture(result); } int mid = (from + to) >>> 1; return op.apply(factorial(from, mid, op), factorial(mid + 1, to, op)); } public static void main(String... args) { ForkJoinPoolBench.test( () -> factorial(2_000_000, SERIAL), new DefaultStatsListener("sequentialFactorial")); ForkJoinPoolBench.test( () -> factorial(2_000_000, PARALLEL), new DefaultStatsListener("parallelFactorial")); } }
Note that we increased the factorial to 2 million. Here are the performance results:
Results for sequentialFactorial real 0m8.396s user 0m8.190s sys 0m0.131s mem 27.3GB Results for parallelFactorial real 0m5.461s user 0m13.260s sys 0m1.706s mem 27.4GB
We see that the memory usage was roughly the same for sequential and parallel. The parallel was only 1.54x faster in real time, and used 1.62x as much user time. System time was also substantial for the parallel version. Memory is being allocated at a rate of 5 GB/s.
Why is the performance not better? Well, the way that the algorithm works is that we are multiplying larger and larger numbers together. The final three muliplications take a big chunk of time. For the last two seconds of the run, we are doing the final multiplication. This is what my parallelMultiply() method improves (hopefully Java 19, but may end up in 20 or 21). Here are the results if we run this with my parallelMultiply() method:
Results for parallelFactorial real 0m2.656s user 0m18.160s sys 0m3.271s mem 27.4GB
The real time is now 3.16x faster than the sequential version and we are using 2.58x more cpu time. At 10 GB/s, we are probably close to the memory bandwidth of my server.
The ForkJoinPool may occasionally create additional threads, enabled by the ManagedBlocker. One example of a class that has from the outset incorporated the ManagedBlocker is the Phaser. Here is an example of the ForkJoinPoolBench using the Phaser:
import eu.javaspecialists.tjsn.bench.*; import java.util.concurrent.*; import java.util.stream.*; public class PhaserBenchTest { public static void main(String... args) { System.out.println("Phaser"); ForkJoinPoolBench.test(() -> { int upto = 4 * Runtime.getRuntime().availableProcessors(); var phaser = new Phaser(upto); IntStream.range(0, upto) .parallel() .forEach(ignored -> phaser.arriveAndAwaitAdvance()); System.out.println("done"); }, (realTime, userTimeStats, cpuTimeStats, allocationStats) -> System.out.println( "realTime = " + realTime + ",\n" + " userTimeStats = " + userTimeStats + ",\n" + " cpuTimeStats = " + cpuTimeStats + ",\n" + " allocationStats = " + allocationStats)); } }
When we look at the output, we see that we now have 48 data points, thus our 12 hardware threads x 4.
Phaser done realTime = 23203070, userTimeStats = LongSummaryStatistics{count=48, sum=10000047, min=1, average=208334.312500, max=10000000}, cpuTimeStats = LongSummaryStatistics{count=48, sum=37473564, min=282691, average=780699.250000, max=12217508}, allocationStats = LongSummaryStatistics{count=48, sum=251047, min=1041, average=5230.145833, max=74336}
Without much more fanfare, here is my ForkJoinPoolBench. It works well enough, but is not suitable to be run in production. Please let me know if you find this useful.
/* * (C)opyright 2022, Heinz Kabutz, All rights reserved */ package eu.javaspecialists.tjsn.bench; import javax.management.*; import java.lang.management.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** * @author Dr Heinz M. Kabutz heinz@javaspecialists.eu */ public final class ForkJoinPoolBench { /** * The key to this bench is the ThreadFactory, which measures * the cpu time, user time and memory allocation of each thread * that is created in the common ForkJoinPool. The number of * threads might increase temporarily because of the * ManagedBlocker, and we never remove unused threads again. */ public static class ThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool) { var thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory .newThread(pool); bench.addCounters(thread); return thread; } } /** * We run the task in our bench, measuring user time, cpu time, * real time and bytes allocated. */ public static void test(Runnable task, StatsListener listener) { bench.test0(task, listener); } /** * Class to simplify the ConcurrentHashMap usage. */ private static class MeasureMap extends ConcurrentHashMap<Thread, AtomicLong> {} private final MeasureMap userTime = new MeasureMap(); private final MeasureMap cpuTime = new MeasureMap(); private final MeasureMap allocation = new MeasureMap(); /** * Interface to simplify the ToLongFunction code. */ @FunctionalInterface private interface ExtractorFunction { long extract(Thread thread); } private ForkJoinPoolBench() {} /** * Only one thread at a time can run the test. */ private static final Object TEST_MONITOR = new Object(); private void test0(Runnable task, StatsListener listener) { LongSummaryStatistics userStats, cpuStats, memStats; long realTime; synchronized (TEST_MONITOR) { addCounters(Thread.currentThread()); try { resetAllCounters(); realTime = System.nanoTime(); try { task.run(); } finally { memStats = getStats(allocation, MEM_FUNCTION); realTime = System.nanoTime() - realTime; userStats = getStats(userTime, USER_TIME_FUNCTION); cpuStats = getStats(cpuTime, CPU_TIME_FUNCTION); } } finally { removeCounters(Thread.currentThread()); } } listener.result(realTime, userStats, cpuStats, memStats); } private LongSummaryStatistics getStats( MeasureMap map, ExtractorFunction extractorFunction) { map.forEach((key, value) -> { long after = extractorFunction.extract(key); long before = value.get(); value.set(after - before); }); return map.values() .stream() .mapToLong(AtomicLong::get) .summaryStatistics(); } private void addCounters(Thread thread) { add(userTime, thread, USER_TIME_FUNCTION); add(cpuTime, thread, CPU_TIME_FUNCTION); add(allocation, thread, MEM_FUNCTION); } private void removeCounters(Thread thread) { userTime.remove(thread); cpuTime.remove(thread); allocation.remove(thread); } private void add(MeasureMap map, Thread thread, ExtractorFunction extractor) { map.put(thread, new AtomicLong(extractor.extract(thread))); } private void resetAllCounters() { resetCounter(userTime, USER_TIME_FUNCTION); resetCounter(cpuTime, CPU_TIME_FUNCTION); resetCounter(allocation, MEM_FUNCTION); } private void resetCounter(MeasureMap map, ExtractorFunction extractor) { map.forEach((thread, value) -> value.set(extractor.extract(thread))); } private static long threadAllocatedBytes(Thread thread) { try { return (long) mBeanServer.invoke(name, "getThreadAllocatedBytes", new Object[]{thread.getId()}, SIGNATURE); } catch (JMException e) { throw new IllegalArgumentException(e); } } private static final ThreadMXBean tmb = ManagementFactory.getThreadMXBean(); private static final ExtractorFunction USER_TIME_FUNCTION = thread -> tmb.getThreadUserTime(thread.getId()); private static final ExtractorFunction CPU_TIME_FUNCTION = thread -> tmb.getThreadCpuTime(thread.getId()); private static final ExtractorFunction MEM_FUNCTION = ForkJoinPoolBench::threadAllocatedBytes; private static final String[] SIGNATURE = {long.class.getName()}; private static final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); private static final ObjectName name; private static final ForkJoinPoolBench bench; static { System.setProperty("java.util.concurrent.ForkJoinPool" + ".common.threadFactory", ThreadFactory.class.getName()); if (!(ForkJoinPool.commonPool() .getFactory() instanceof ThreadFactory)) throw new IllegalStateException( "Common pool thread factory should be a " + ThreadFactory.class.getName()); try { name = new ObjectName(ManagementFactory.THREAD_MXBEAN_NAME); } catch (MalformedObjectNameException e) { throw new ExceptionInInitializerError(e); } bench = new ForkJoinPoolBench(); } public interface StatsListener { void result(long realTime, LongSummaryStatistics userTimeStats, LongSummaryStatistics cpuTimeStats, LongSummaryStatistics allocationStats); } }
And a DefaultStatsListener that prints the information out the way that I showed above in my output:
package eu.javaspecialists.tjsn.bench; import java.util.*; import java.util.concurrent.*; public class DefaultStatsListener implements ForkJoinPoolBench.StatsListener { private final String description; public DefaultStatsListener(String description) { this.description = description; } public void result(long realTime, LongSummaryStatistics userTimeStats, LongSummaryStatistics cpuTimeStats, LongSummaryStatistics allocationStats) { long userTime = userTimeStats.getSum(); long sysTime = cpuTimeStats.getSum() - userTime; long bytes = allocationStats.getSum(); System.out.println("Results for " + description); System.out.println("real " + formatTime(realTime)); System.out.println("user " + formatTime(userTime)); System.out.println("sys " + formatTime(sysTime)); System.out.println("mem " + formatMemory(bytes)); } private static String formatMemory(double bytes) { double val; String unitStr; if (bytes < 1024) { val = bytes; unitStr = "B"; } else if (bytes < 1024 * 1024) { val = bytes / 1024; unitStr = "KB"; } else if (bytes < 1024 * 1024 * 1024) { val = bytes / (1024 * 1024); unitStr = "MB"; } else if (bytes < 1024 * 1024 * 1024 * 1024L) { val = bytes / (1024 * 1024 * 1024L); unitStr = "GB"; } else { val = bytes / (1024 * 1024 * 1024 * 1024L); unitStr = "TB"; } return String.format(Locale.US, "%.1f%s", val, unitStr); } private static String formatTime(long nanos) { if (nanos < 0) nanos = 0; long timeInMs = TimeUnit.NANOSECONDS.toMillis(nanos); long minutes = timeInMs / 60_000; double remainingMs = (timeInMs % 60_000) / 1000.0; return String.format(Locale.US, "%dm%.3fs", minutes, remainingMs); } }
I hope you enjoyed this and found it interesting, perhaps even useful. Many greetings from Crete!
Kind regards
Heinz
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.