Running on Java 22-ea+27-2262 (Preview)
Home of The JavaSpecialists' Newsletter

223ManagedBlocker

Author: Dr. Heinz M. KabutzDate: 2014-11-27Java Version: 8Category: Language
 

Abstract: Blocking methods should not be called from within parallel streams in Java 8, otherwise the shared threads in the common ForkJoinPool will become inactive. In this newsletter we look at a technique to maintain a certain liveliness in the pool, even when some threads are blocked.

 

Welcome to the 223rd issue of The Java(tm) Specialists' Newsletter, sent to you from the Island of Crete. We have lived here since 2006 and could not help but accumulate some olive trees. Our yield is still low, but this is the first year that we have a little bit of an excess. It is delicious! For 2015, if you come to Crete to attend one of my courses, I will give you a personally autographed tin containing one liter of our liquid gold to take home with you, complements of JavaSpecialists.eu :-)

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

ManagedBlocker

Java 8 introduced the concept of a parallel stream, which gives us the ability to parallelize some of our work without very much effort on our part. All is needed is to add a call to parallel() into our streams. Some guidelines by Doug Lea et al on how to do parallelism in Java 8 can be found here. I have already written about the issue with the availableProcessors() not always being accurate nor useful in Issue 220 and Issue 220b.

One of the issues that can happen is that someone might call a blocking method from within a parallel stream. Something simple like System.out::println could even end up blocking, for example:

package eu.javaspecialists.tjsn.examples.issue223;

import java.util.stream.*;

public class PrintlnFun {
  public static void main(String... args) {
    synchronized (System.out) {
      System.out.println("Hello World");
      IntStream.range(0, 4).parallel().
          forEach(System.out::println);
    }
  }
}

Run the code without the call to parallel() and it outputs what you expect (Hello World, followed by 0,1,2,3). However, when we try run it in parallel, we deadlock the system. The main thread had acquired the lock to System.out, which meant that the worker threads from ForkJoinPool.commonPool() were blocked. The main thread could not complete until the worker threads are done, so we have our classic deadlock situation. The fun part is that the output is not going to be consistent. And even more fun is that a thread dump does not detect this as a Java deadlock.

My point with picking on System.out::println is that almost all examples that I've seen of parallel streams have used that call at some point, but this could also be a blocking call. Obviously we should avoid blocking calls from within parallel streams, but sometimes it is not so obvious that it might happen.

One of the ways to deal with blocking methods in parallel streams is by using the ForkJoinPool.ManagedBlocker interface. Instead of blocking directly, we instead call the ForkJoinPool.managedBlock(<instance of ManagedBlocker>) method. I spoke about this at a short Nighthacking interview at Devoxx. I had heard about ManagedBlocker from Paul Sandoz a few minutes before the interview started, with the result that this really was a hacking interview. I made a silly mistake, but eventually we got it working. I knew that Phaser "played nice" with ForkJoinPool, but was not aware that we could also write our own blocking methods that could get along with that framework.

This week, I decided to try to write a ReentrantLock that would be compatible with parallel streams. Ideally we should never call a blocking operation from within a parallel stream. But if we need to, this will allow us to have some element of liveness in our shared pool that we otherwise would forfeit.

Interruptions

Before we get into the subtleties of ManagedBlocker, one of the parts of Java that programmers seem to struggle with a lot is how to deal with InterruptedException. A mechanism that is used in Lock.lock(), Condition.awaitUninterruptibly(), Semaphore.acquireUninterruptibly() and many others, is to save the interrupt until the action has been completed and to then return with a thread that is in the interrupted state. (If you don't understand interruptions, I can strongly recommend my new Java 8 concurrency course available on our Teachable platform). Here is the code to save interruptions for later, which would typically be called using lambdas:

package eu.javaspecialists.tjsn.concurrency.util;

public class Interruptions {
  public static void saveForLater(InterruptibleAction action) {
    saveForLaterTask(() -> {
      action.run();
      return null;
    });
  }

  public static <E> E saveForLaterTask(
      InterruptibleTask<E> task) {
    boolean interrupted = Thread.interrupted(); // clears flag
    try {
      while (true) {
        try {
          return task.run();
        } catch (InterruptedException e) {
          // flag would be cleared at this point too
          interrupted = true;
        }
      }
    } finally {
      if (interrupted) Thread.currentThread().interrupt();
    }
  }

  @FunctionalInterface
  public interface InterruptibleAction {
    public void run() throws InterruptedException;
  }

  @FunctionalInterface
  public interface InterruptibleTask<E> {
    public E run() throws InterruptedException;
  }
}

We can use this to manage interruptions if we do not want to be interruptible. You will see it inside our ManagedReentrantLock in a moment.

ManagedReentrantLock

Perhaps one day the AbstractQueuedSynchronizer, on which most of the concurrency constructs in Java are based, will be converted to support ManagedBlocker. This would solve the problem where some of the threads in the ForkJoinPool are blocked by using a concurrency synchronizer. Since Phaser already works with ManagedBlocker, we can use that instead of CountDownLatch and CyclicBarrier.

In my experiment, I tried to adapt the ReentrantLock. I think my code works correctly. I have written several unit tests to check whether it is at least compatible with ReentrantLock's behaviour and also "plays nicely" with ForkJoinPool and therefore with parallel streams. However, I do not give any guarantees that it really works, nor that it is a good idea to use blocking calls inside parallel streams. In fact, I'd like to state that in general it is a really bad idea to use blocking calls in thread pools :-)

In a moment we will look at the code for my ManagedReentrantLock. It is fairly simple. Instead of calling lock.lock() directly, we delegate the call to the ForkJoinPool.managedBlock(ManagedBlocker). The first thing managedBlock() does is check whether we need to even bother blocking. It does this by calling the blocker.isReleasable() method. If that returns true, then no further action is necessary. If it is false, it calls blocker.block(). If that is true, then the block was successful and we can return from the managedBlock() method. The method effectively does something like this:

while (!blocker.isReleasable() && !blocker.block()) {}

We should always try to achieve our goal within the isReleasable() method without blocking. In most cases, a lock would be available, so we would not even need to consider compensating the FJ pool with another thread to keep it live.

The code is a bit tough to digest, so I would recommend you work through it one piece at a time. Start at the top and figure out how lock.lockInterruptibly() and lock.lock() work. For unlock() we are simply using the parent's method. The most tricky part is the conditions, which also have to be managed. However, things go awry when we pass the condition objects back into the lock, for example with the hasWaiters(Condition) method. We thus need to first map back to the original Condition object produced by the superclass.

package eu.javaspecialists.tjsn.concurrency.locks;

import eu.javaspecialists.tjsn.concurrency.util.*;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

/**
 * The ManagedReentrantLock is a lock implementation that is
 * compatible with the Fork/Join framework, and therefore also
 * with the Java 8 parallel streams.  Instead of just blocking
 * when the lock is held by another thread, and thereby removing
 * one of the active threads from the Fork/Join pool, we instead
 * use a ManagedBlocker to manage it.
 * <p>
 * The ManagedReentrantLock subclasses ReentrantLock, which means
 * we can use it as a drop-in replacement.  See also the
 * ManagedBlockers facade, which can adapt several known
 * synchronizers with our new ManagedReentrantLock.
 *
 * @author Heinz Kabutz
 * @see eu.javaspecialists.tjsn.concurrency.util.ManagedBlockers
 */
public class ManagedReentrantLock extends ReentrantLock {
  public ManagedReentrantLock() {
  }

  public ManagedReentrantLock(boolean fair) {
    super(fair);
  }

  public void lockInterruptibly() throws InterruptedException {
    ForkJoinPool.managedBlock(new DoLockInterruptibly());
  }

  public void lock() {
    DoLock locker = new DoLock(); // we want to create this
    // before passing it into the lambda, to prevent it from
    // being created again if the thread is interrupted for some
    // reason
    Interruptions.saveForLater(
        () -> ForkJoinPool.managedBlock(locker));
  }

  public boolean tryLock(long time, TimeUnit unit)
      throws InterruptedException {
    // If we already have the lock, then the TryLocker will
    // immediately acquire the lock due to reentrancy.  We do not
    // really care whether we had a timeout inside the TryLocker,
    // but only want to return whether or not we hold the lock
    // at the end of the method.
    ForkJoinPool.managedBlock(new TryLocker(time, unit));
    return isHeldByCurrentThread();
  }

  public Condition newCondition() {
    return new ManagedCondition(super.newCondition());
  }

  public boolean hasWaiters(Condition c) {
    return super.hasWaiters(getRealCondition(c));
  }

  public int getWaitQueueLength(Condition c) {
    return super.getWaitQueueLength(getRealCondition(c));
  }

  protected Collection<Thread> getWaitingThreads(Condition c) {
    return super.getWaitingThreads(getRealCondition(c));
  }

  ////// Helper functions and inner classes /////////

  private Condition getRealCondition(Condition c) {
    if (!(c instanceof ManagedCondition))
      throw new IllegalArgumentException("not owner");
    return ((ManagedCondition) c).condition;
  }

  private class ManagedCondition implements Condition {
    private final Condition condition;

    private ManagedCondition(Condition condition) {
      this.condition = condition;
    }

    public void await() throws InterruptedException {
      managedBlock(() -> condition.await());
    }

    public void awaitUninterruptibly() {
      Interruptions.saveForLater(
          () -> managedBlock(
              () -> condition.awaitUninterruptibly())
      );
    }

    public long awaitNanos(long nanosTimeout)
        throws InterruptedException {
      long[] result = {nanosTimeout};
      managedBlock(
          () -> result[0] = condition.awaitNanos(nanosTimeout));
      return result[0];
    }

    public boolean await(long time, TimeUnit unit)
        throws InterruptedException {
      boolean[] result = {false};
      managedBlock(
          () -> result[0] = condition.await(time, unit));
      return result[0];
    }

    public boolean awaitUntil(Date deadline)
        throws InterruptedException {
      boolean[] result = {false};
      managedBlock(
          () -> result[0] = condition.awaitUntil(deadline));
      return result[0];
    }

    public void signal() {
      condition.signal();
    }

    public void signalAll() {
      condition.signalAll();
    }
  }

  private static void managedBlock(
      AlwaysBlockingManagedBlocker blocker)
      throws InterruptedException {
    ForkJoinPool.managedBlock(blocker);
  }

  // we should always try to achieve our goal within the
  // isReleasable() method instead of block().  This avoids
  // trying to compensate the loss of a thread by creating
  // a new one.
  private abstract class AbstractLockAction
      implements ForkJoinPool.ManagedBlocker {
    private boolean hasLock = false;

    public final boolean isReleasable() {
      return hasLock || (hasLock = tryLock());
    }
  }

  private class DoLockInterruptibly extends AbstractLockAction {
    public boolean block() throws InterruptedException {
      if (isReleasable()) return true;
      ManagedReentrantLock.super.lockInterruptibly();
      return true;
    }
  }

  private class DoLock extends AbstractLockAction {
    public boolean block() {
      if (isReleasable()) return true;
      ManagedReentrantLock.super.lock();
      return true;
    }
  }

  private class TryLocker extends AbstractLockAction {
    private final long time;
    private final TimeUnit unit;

    private TryLocker(long time, TimeUnit unit) {
      this.time = time;
      this.unit = unit;
    }

    public boolean block() throws InterruptedException {
      if (isReleasable()) return true;
      ManagedReentrantLock.super.tryLock(time, unit);
      return true;
    }
  }

  @FunctionalInterface
  private interface AlwaysBlockingManagedBlocker
      extends ForkJoinPool.ManagedBlocker {
    default boolean isReleasable() {
      return false;
    }

    default boolean block() throws InterruptedException {
      doBlock();
      return true;
    }

    void doBlock() throws InterruptedException;
  }
}

We can now use this ManagedReentrantLock just like we would use an ordinary ReentrantLock, with the only difference being that if it were held inside a ForkJoinPool, we would be compensated with additional threads to keep the system alive.

Here is a demo that suspends the threads, thereby jamming up the common ForkJoinPool:

package eu.javaspecialists.tjsn.examples.issue223;

import eu.javaspecialists.tjsn.concurrency.locks.*;

import java.util.concurrent.locks.*;
import java.util.stream.*;

public class ManagedReentrantLockDemo {
  public static void main(String... args) {
    ReentrantLock lock = new ReentrantLock();
//    ReentrantLock lock = new ManagedReentrantLock();
    Condition condition = lock.newCondition();
    int upto = Runtime.getRuntime().availableProcessors() * 10;
    IntStream.range(0, upto).parallel().forEach(
        i -> {
          lock.lock();
          try {
            System.out.println(i + ": Got lock, now waiting - " +
                Thread.currentThread().getName());
            condition.awaitUninterruptibly();
          } finally {
            lock.unlock();
          }
        }
    );
  }
}

On my MacBook Pro with 8 hardware threads, I see the following output when I use an ordinary ReentrantLock:

52: Got lock, now waiting - main
72: Got lock, now waiting - ForkJoinPool.commonPool-worker-2
65: Got lock, now waiting - ForkJoinPool.commonPool-worker-4
5: Got lock, now waiting - ForkJoinPool.commonPool-worker-6
35: Got lock, now waiting - ForkJoinPool.commonPool-worker-7
12: Got lock, now waiting - ForkJoinPool.commonPool-worker-3
25: Got lock, now waiting - ForkJoinPool.commonPool-worker-1
32: Got lock, now waiting - ForkJoinPool.commonPool-worker-5

However, when I run it with the ManagedReentrantLock, I see this output instead:

5: Got lock, now waiting - ForkJoinPool.commonPool-worker-6
65: Got lock, now waiting - ForkJoinPool.commonPool-worker-4
22: Got lock, now waiting - ForkJoinPool.commonPool-worker-7
35: Got lock, now waiting - ForkJoinPool.commonPool-worker-5
72: Got lock, now waiting - ForkJoinPool.commonPool-worker-2
25: Got lock, now waiting - ForkJoinPool.commonPool-worker-1
12: Got lock, now waiting - ForkJoinPool.commonPool-worker-3
52: Got lock, now waiting - main
27: Got lock, now waiting - ForkJoinPool.commonPool-worker-12
62: Got lock, now waiting - ForkJoinPool.commonPool-worker-0
2: Got lock, now waiting - ForkJoinPool.commonPool-worker-14
28: Got lock, now waiting - ForkJoinPool.commonPool-worker-11
60: Got lock, now waiting - ForkJoinPool.commonPool-worker-10
17: Got lock, now waiting - ForkJoinPool.commonPool-worker-9
63: Got lock, now waiting - ForkJoinPool.commonPool-worker-13
0: Got lock, now waiting - ForkJoinPool.commonPool-worker-15
15: Got lock, now waiting - ForkJoinPool.commonPool-worker-8
32: Got lock, now waiting - ForkJoinPool.commonPool-worker-18
77: Got lock, now waiting - ForkJoinPool.commonPool-worker-19
70: Got lock, now waiting - ForkJoinPool.commonPool-worker-27
73: Got lock, now waiting - ForkJoinPool.commonPool-worker-20
10: Got lock, now waiting - ForkJoinPool.commonPool-worker-30
75: Got lock, now waiting - ForkJoinPool.commonPool-worker-23
30: Got lock, now waiting - ForkJoinPool.commonPool-worker-31
3: Got lock, now waiting - ForkJoinPool.commonPool-worker-24
18: Got lock, now waiting - ForkJoinPool.commonPool-worker-17
38: Got lock, now waiting - ForkJoinPool.commonPool-worker-29
13: Got lock, now waiting - ForkJoinPool.commonPool-worker-22
45: Got lock, now waiting - ForkJoinPool.commonPool-worker-28
42: Got lock, now waiting - ForkJoinPool.commonPool-worker-21
48: Got lock, now waiting - ForkJoinPool.commonPool-worker-16
40: Got lock, now waiting - ForkJoinPool.commonPool-worker-26
47: Got lock, now waiting - ForkJoinPool.commonPool-worker-25
57: Got lock, now waiting - ForkJoinPool.commonPool-worker-42
33: Got lock, now waiting - ForkJoinPool.commonPool-worker-51
55: Got lock, now waiting - ForkJoinPool.commonPool-worker-44
50: Got lock, now waiting - ForkJoinPool.commonPool-worker-37
58: Got lock, now waiting - ForkJoinPool.commonPool-worker-61
53: Got lock, now waiting - ForkJoinPool.commonPool-worker-54
37: Got lock, now waiting - ForkJoinPool.commonPool-worker-47
68: Got lock, now waiting - ForkJoinPool.commonPool-worker-40
78: Got lock, now waiting - ForkJoinPool.commonPool-worker-33
8: Got lock, now waiting - ForkJoinPool.commonPool-worker-59
43: Got lock, now waiting - ForkJoinPool.commonPool-worker-52
67: Got lock, now waiting - ForkJoinPool.commonPool-worker-45
20: Got lock, now waiting - ForkJoinPool.commonPool-worker-38
7: Got lock, now waiting - ForkJoinPool.commonPool-worker-62
23: Got lock, now waiting - ForkJoinPool.commonPool-worker-55

We don't get to 80 threads, but that is because the number of leaves are limited inside the java.util.stream.AbstractTask class.

Note that the purpose of the managed blockers is not to necessarily create an unbounded number of threads, nor to maintain exactly the same number of active threads as desired parallelism in the ForkJoinPool, but rather to improve the liveliness of the ForkJoinPool when faced with potentially blocking tasks.

ManagedBlockers Facade

Since we subclassed ReentrantLock in our ManagedReentrantLock solution, we can use it anywhere that we currently have a ReentrantLock. In the next piece of code, I use reflection to replace the locks inside some well-known classes, specifically LinkedBlockingQueue, ArrayBlockingQueue and PriorityBlockingQueue. Thus if you want to use a LinkedBlockingQueue from within parallel streams, but are concerned about using blocking operations (rightly so), you could instead make it managed with our ManagedBlockers class. Just make sure that you call the makeManaged() method prior to publishing the constructs!

package eu.javaspecialists.tjsn.concurrency.util;

import eu.javaspecialists.tjsn.concurrency.locks.*;

import java.lang.reflect.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ManagedBlockers {
  public static <E> ArrayBlockingQueue<E> makeManaged(
      ArrayBlockingQueue<E> queue) {
    Class<?> clazz = ArrayBlockingQueue.class;

    try {
      Field lockField = clazz.getDeclaredField("lock");
      lockField.setAccessible(true);
      ReentrantLock old = (ReentrantLock) lockField.get(queue);
      boolean fair = old.isFair();
      ReentrantLock lock = new ManagedReentrantLock(fair);
      lockField.set(queue, lock);

      replace(queue, clazz, "notEmpty", lock.newCondition());
      replace(queue, clazz, "notFull", lock.newCondition());

      return queue;
    } catch (IllegalAccessException | NoSuchFieldException e) {
      throw new IllegalStateException(e);
    }
  }

  public static <E> LinkedBlockingQueue<E> makeManaged(
      LinkedBlockingQueue<E> queue) {
    Class<?> clazz = LinkedBlockingQueue.class;

    ReentrantLock takeLock = new ManagedReentrantLock();
    ReentrantLock putLock = new ManagedReentrantLock();

    try {
      replace(queue, clazz, "takeLock", takeLock);
      replace(queue, clazz, "notEmpty", takeLock.newCondition());
      replace(queue, clazz, "putLock", putLock);
      replace(queue, clazz, "notFull", putLock.newCondition());

      return queue;
    } catch (IllegalAccessException | NoSuchFieldException e) {
      throw new IllegalStateException(e);
    }
  }

  public static <E> PriorityBlockingQueue<E> makeManaged(
      PriorityBlockingQueue<E> queue) {
    Class<?> clazz = PriorityBlockingQueue.class;

    ReentrantLock lock = new ManagedReentrantLock();

    try {
      replace(queue, clazz, "lock", lock);
      replace(queue, clazz, "notEmpty", lock.newCondition());

      return queue;
    } catch (IllegalAccessException | NoSuchFieldException e) {
      throw new IllegalStateException(e);
    }
  }

  private static void replace(Object owner,
                              Class<?> clazz, String fieldName,
                              Object fieldValue)
      throws NoSuchFieldException, IllegalAccessException {
    Field field = clazz.getDeclaredField(fieldName);
    field.setAccessible(true);
    field.set(owner, fieldValue);
  }
}

Here is a small class that uses it to make the ForkJoinPool a bit more lively:

package eu.javaspecialists.tjsn.examples.issue223;

import eu.javaspecialists.tjsn.concurrency.util.*;

import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.stream.*;

public class ManagedLivenessQueueDemo {
  private static final LinkedBlockingQueue<Integer> numbers =
      new LinkedBlockingQueue<>();

  public static void main(String... args) {
    ManagedBlockers.makeManaged(numbers);
    Thread jamThread = makeJamThread();
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
    for (int i = 0; i < 100; i++) {
      numbers.add(i);
    }
  }

  private static Thread makeJamThread() {
    Thread jamup = new Thread(() -> {
      int par = Runtime.getRuntime().availableProcessors() * 4;
      IntStream.range(0, par).parallel().forEach(
          i -> {
            System.out.println(i + ": Waiting for number " +
                Thread.currentThread().getName());
            int num = Interruptions.saveForLaterTask(
                () -> numbers.take());
            System.out.println("Got number: " + num);
          }
      );
    });
    jamup.start();
    return jamup;
  }
}

Again, you will see different output if you use a plain unmanaged LinkedBlockingQueue.

I hope you enjoyed this newsletter. I know that reading the ManagedReentrantLock class is a bit hard, but I hope that you learned something new. I certainly did!

Kind regards from a chilly Crete

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...