Abstract: The LinkedBlockingQueue and LinkedBlockingDeque behave slightly differently when methods put() and take() are called by a thread that happens to be in the "interrupted" state. The LBQ throws an InterruptedException immediately when we call take(), even if the queue contains elements. The LBD, however, only throws InterruptedException if the deque is empty and the thread would enter the WAITING state. In this newsletter we show how a dynamic proxy can make the behaviour consistent.
Welcome to the 322nd edition of The Java(tm) Specialists' Newsletter, sent from the Island of Crete. Today marks the two year anniversary of the Tempe Train Crash, the worst rail disaster in Greek history. A freight train carrying dangerous flammable chemicals collided head-on with a passenger train. Half of the victims were young students returning to university after their semester break. The amount of corruption leading up to the disaster, and the subsequent poor handling of it, is hard to fathom. This will almost certainly bring down the current government. It is a day of mourning in Greece and we are angry.
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
In the early days of Java, we did not have a lot of methods that would throw the InterruptedException. We had Object.wait(), Thread.sleep() and Thread.join(). That was pretty much it. We were used to this annoying exception when we called Thread.sleep(), and the most common approach was to catch it and then ignore it. Then Java 5 came along, and with it almost 100 new methods throwing InterruptedException.
Here is what JavaDoc says of when InterruptedException would be thrown:
* Thrown when a thread is waiting, sleeping, or otherwise occupied, * and the thread is interrupted, either before or during the activity. * Occasionally a method may wish to test whether the current * thread has been interrupted, and if so, to immediately throw * this exception. The following code can be used to achieve * this effect: * {@snippet lang=java : * if (Thread.interrupted()) // Clears interrupted status! * throw new InterruptedException(); * }
Any thread can interrupt any other thread that it has a handle to. We could even interrupt all the threads in the JVM (no, rather don't do this):
public class CausingHavock { public static void main(String... args) { Thread.getAllStackTraces().keySet().forEach(Thread::interrupt); } }
When we call shutdownNow() on an ExecutorService, a thread that is busy executing a task will be interrupted:
import java.util.concurrent.*; public class ShutdownNowDemo { public static void main(String... args) throws InterruptedException { var pool = Executors.newSingleThreadExecutor(); var future = pool.submit(() -> {Thread.sleep(10000); return "done";}); pool.shutdown(); // worker threads are left in peace Thread.sleep(100); System.out.println("isTerminated? " + pool.isTerminated()); pool.shutdownNow(); // interrupts all the worker threads in pool Thread.sleep(100); System.out.println("isTerminated? " + pool.isTerminated()); try { future.get(); } catch (ExecutionException e) { System.out.println("Future threw an exception: " + e.getCause()); } } }
We see the following output:
isTerminated? false isTerminated? true Future threw an exception: java.lang.InterruptedException: sleep interrupted
One of the interesting approaches to dealing with InterruptedException is
mentioned in the JavaDoc - to exit early if a thread is interrupted, even
without encountering a WAITING state. For example, with ReentrantLock, we
can lockInterruptibly()
. If a thread tries to lock and it is
in the interrupted state, even if the lock is immediately available, we
will still exit with an InterruptedException. We can see the code inside
the ReentrantLock.Synch class, which extends AbstractQueuedSynchronizer:
final void lockInterruptibly() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!initialTryLock()) acquireInterruptibly(1); }
This approach is used consistently for all BlockingQueue implementations. For example, if we try and call take() on a BlockingQueue, we see an early InterruptedException, even if there is an element in the queue that we could return:
import java.util.concurrent.*; public class TakeOnBlockingQueue { public static void main(String... args) { test(new ArrayBlockingQueue<>(10)); test(new LinkedBlockingQueue<>()); test(new LinkedTransferQueue<>()); test(new PriorityBlockingQueue<>()); test(new SynchronousQueue<>()); } public static void test(BlockingQueue<Integer> queue) { System.out.print(queue.getClass().getSimpleName()); Thread.interrupted(); // clear interrupt queue.offer(42); // might not add if no available capacity try { Thread.currentThread().interrupt(); // self-interrupt queue.take(); // shouldn't really go into the WAITING state System.out.println(" - no early interrupt on take()"); } catch (InterruptedException e) { System.out.println(" - early interrupt on take()"); } } }
We see the following output:
ArrayBlockingQueue - early interrupt on take() LinkedBlockingQueue - early interrupt on take() LinkedTransferQueue - early interrupt on take() PriorityBlockingQueue - early interrupt on take() SynchronousQueue - early interrupt on take()
We would not necessarily expect the same behaviour for put(), because PriorityBlockingQueue and LinkedTransferQueue are always unbounded, thus put() will never block. Those methods do not even throw the InterruptedException.
import java.util.concurrent.*; public class PutOnBlockingQueue { public static void main(String... args) { test(new ArrayBlockingQueue<>(10)); test(new LinkedBlockingQueue<>()); test(new LinkedTransferQueue<>()); test(new PriorityBlockingQueue<>()); test(new SynchronousQueue<>()); } public static void test(BlockingQueue<Integer> queue) { System.out.print(queue.getClass().getSimpleName()); Thread.interrupted(); // clear interrupt queue.offer(42); // might not add if no available capacity try { Thread.currentThread().interrupt(); // self-interrupt queue.put(53); // shouldn't really go into the WAITING state System.out.println(" - no early interrupt on put()"); } catch (InterruptedException e) { System.out.println(" - early interrupt on put()"); } } }
This time, we see that the LinkedTransferQueue and the PriorityBlockingQueue do not throw the exception. In fact, the put() methods on those classes also do not declare that they would throw the InterruptedException.
ArrayBlockingQueue - early interrupt on put() LinkedBlockingQueue - early interrupt on put() LinkedTransferQueue - no early interrupt on put() PriorityBlockingQueue - no early interrupt on put() SynchronousQueue - early interrupt on put()
It makes sense that the unbounded queues would not throw InterruptedException on put(). However, to be consistent, shouldn't a LinkedBlockingQueue also not throw the InterruptedException if it is constructed without a capacity? The JavaDoc for put() and take() is remarkably vague (inherited from the BlockingQueue interface):
* @throws InterruptedException if interrupted while waiting
If we split hairs, the threads in our experiment, except for the SynchronousQueue, never go into the WAITING state. Should the InterruptedException have been thrown in our experiments? It is convenient, but is it 100% "correct"?
I am busy preparing a new "Java Concurrency Teardown" class, where I examine the LinkedBlockingQueue and LinkedBlockingDeque with a fine comb. The course is going to consist of an overview of these two classes, together with related interfaces SequencedCollection, Deque and BlockingDeque. It is a continuation of our other two teardown classes that looked at the ArrayBlockingQueue and the CopyOnWrite collections, both available in our Java Concurrency Aficionados 2024. This new "Teardown" class will be more extensive, with exercises to do, some fun experiments, plus a detailed walkthrough of the linked blocking collections.
Whilst writing one of the exercises, I noticed that LinkedBlockingDeque never exited early in put() and take():
import java.util.concurrent.*; public class PutAndTakeOnBlockingDeque { public static void main(String... args) { PutOnBlockingQueue.test(new LinkedBlockingDeque<>()); TakeOnBlockingQueue.test(new LinkedBlockingDeque<>()); } }
This time, we see that neither put() nor take() throws the exception early:
LinkedBlockingDeque - no early interrupt on put() LinkedBlockingDeque - no early interrupt on take()
I thought that this might have been an oversight and dutifully logged a bug, as well as a PR for the fix. Martin Buchholz spent a fair amount of time researching the bug and determined that there were other cases in the java.util.concurrent package where InterruptedException was not thrown early. Thus the behaviour of throwing the InterruptedException early, as seen in the BlockingQueue implementations, was not as consistent as I had thought. For now, the PR is stuck, and Martin Buchholz correctly pointed out that if we want this early throwing behaviour, we can wrap our classes with another class that does this.
One way to add the early throwing behaviour to the LinkedBlockingDeque is to subclass it, like so:
import java.util.*; import java.util.concurrent.*; public class BetterLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> { public BetterLinkedBlockingDeque() { } public BetterLinkedBlockingDeque(int capacity) { super(capacity); } public BetterLinkedBlockingDeque(Collection<? extends E> c) { super(c); } public void putFirst(E e) throws InterruptedException { throwIfInterrupted(); super.putFirst(e); } public void putLast(E e) throws InterruptedException { throwIfInterrupted(); super.putLast(e); } public E takeFirst() throws InterruptedException { throwIfInterrupted(); return super.takeFirst(); } public E takeLast() throws InterruptedException { throwIfInterrupted(); return super.takeLast(); } private static void throwIfInterrupted() throws InterruptedException { if (Thread.currentThread().interrupted()) throw new InterruptedException(); } }
This behaves consistently with the LinkedBlockingQueue, throwing InterruptedException early:
import java.util.concurrent.*; public class PutAndTakeOnBetterBlockingDeque { public static void main(String... args) { PutOnBlockingQueue.test(new BetterLinkedBlockingDeque<>()); TakeOnBlockingQueue.test(new BetterLinkedBlockingDeque<>()); } }
As expected, the InterruptedException is now always thrown early:
BetterLinkedBlockingDeque - early interrupt on put() BetterLinkedBlockingDeque - early interrupt on take()
However, what if we also want the put() methods of LinkedTransferQueue and PriorityBlockingQueue to have this behaviour? We would have to subclass each one. Aha, it is time to dig out our dynamic proxies. Most of the concurrency classes are implementing some interface, and dynamic proxies can easily implement these. In our InvocationHandler, we check whether the method declares that it throws the InterruptedException. If it does, we throw it early:
import java.lang.reflect.*; /** * The proxy() method returns an object that implements the given interface and * throws the InterruptedException early for all interruptible methods. */ public class EarlyInterruptedExceptionThrower { public static <T> T proxy(Class<T> intf, T target) { return intf.cast(Proxy.newProxyInstance( intf.getClassLoader(), new Class<?>[]{intf}, (_, method, args) -> { // Unnamed variable _ - JEP 456 Java 22+ if (isInterruptible(method) && Thread.interrupted()) throw new InterruptedException(); try { return method.invoke(target, args); } catch (InvocationTargetException e) { throw e.getCause(); } })); } private static boolean isInterruptible(Method method) { for (Class<?> exceptionType : method.getExceptionTypes()) { if (exceptionType == InterruptedException.class) return true; } return false; } }
Here is how we can use that:
import java.util.concurrent.*; public class PutAndTakeEarlyThrowers { public static void main(String... args) { PutOnBlockingQueue.test(EarlyInterruptedExceptionThrower.proxy( BlockingDeque.class, new LinkedBlockingDeque<>())); TakeOnBlockingQueue.test(EarlyInterruptedExceptionThrower.proxy( BlockingDeque.class, new LinkedBlockingDeque<>())); // these also work PutOnBlockingQueue.test(EarlyInterruptedExceptionThrower.proxy( BlockingQueue.class, new LinkedTransferQueue<>())); PutOnBlockingQueue.test(EarlyInterruptedExceptionThrower.proxy( BlockingQueue.class, new PriorityBlockingQueue<>())); } }
It behaves as we had hoped it would, but unfortunately getClass() no longer shows what the implementation class is:
$Proxy0 - early interrupt on put() $Proxy0 - early interrupt on take() $Proxy1 - early interrupt on put() $Proxy1 - early interrupt on put()
The dynamic proxy is not limited to the BlockingQueue. There are other places in the java.util.concurrent package, where the InterruptedException is only thrown when we go into the WAITING state, not before. For example, in the ExecutorService, we have awaitTermination(). It only throws the InterruptedException if the pool has not been terminated and the thread needs to wait:
import java.util.concurrent.*; public class ThreadPoolAwait { public static void main(String... args) { test(Executors.newSingleThreadExecutor()); test(Executors.newCachedThreadPool()); test(Executors.newWorkStealingPool()); test(Executors.newVirtualThreadPerTaskExecutor()); } public static void test(ExecutorService pool) { System.out.print(pool.getClass().getSimpleName()); Thread.interrupted(); // clear interrupt pool.shutdown(); try { Thread.currentThread().interrupt(); // self-interrupt pool.awaitTermination(10, TimeUnit.SECONDS); System.out.println(" - no early interrupt"); } catch (InterruptedException e) { System.out.println(" - early interrupt"); } } }
Output is the following:
AutoShutdownDelegatedExecutorService - no early interrupt ThreadPoolExecutor - no early interrupt ForkJoinPool - no early interrupt ThreadPerTaskExecutor - no early interrupt
Again, we can use our magic dynamic proxy to change that behaviour:
import java.util.concurrent.*; public class ThreadPoolEarlyThrowers { public static void main(String... args) { ThreadPoolAwait.test(proxy(Executors.newSingleThreadExecutor())); ThreadPoolAwait.test(proxy(Executors.newCachedThreadPool())); ThreadPoolAwait.test(proxy(Executors.newWorkStealingPool())); ThreadPoolAwait.test(proxy(Executors.newVirtualThreadPerTaskExecutor())); } private static ExecutorService proxy(ExecutorService pool) { return EarlyInterruptedExceptionThrower.proxy( ExecutorService.class, pool ); } }
Again, we see that it now exits early if the thread is interrupted:
$Proxy0 - early interrupt $Proxy0 - early interrupt $Proxy0 - early interrupt $Proxy0 - early interrupt
Perhaps one day the LinkedBlockingDeque will be made consistent with how the BlockingQueues work. It is an easy change, simply use lock.lockInterruptibly() instead of lock.lock() for the put() and take() methods. Interestingly, the timed offer() and poll() methods already do throw the InterruptedException early. However, if you want this functionality, simply wrap it with our dynamic proxy or make a subclass like our BetterLinkedBlockingDeque above.
Kind regards
Heinz
P.S. Besides our self-study Java Concurrency Aficionados 2024, we also teach in-house courses to companies, either in person or live virtual. Please contact me via email on heinz@javaspecialists.eu and I will be happy to help you figure out what works best for you.
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.