Running on Java 24-ea+21-2447 (Preview)
Home of The JavaSpecialists' Newsletter

261Concurrent Queue Sizes and Hot Fields

Author: Dr. Heinz M. KabutzDate: 2018-09-20Java Version: 11Category: Concurrency
 

Abstract: ConcurrentLinkedQueue's size() method is not very useful in a multi-threaded environment, because it counts the number of elements in the queue, rather than relying on a "hot field" to store the size. The result might be completely incorrect, or in strange situations, never return.

 

Welcome to the 261st edition of The Java(tm) Specialists' Newsletter. Next month I am speaking at Oracle Code One (formerly JavaOne). My topics are concurrency (Phaser, StampedLock and VarHandles), performance (evolution of java.lang.String) and unconferences (a BoF about JCrete and how to disorganize such an event). Usually my talks fall in the category of "mildly entertaining", so unless you have something interesting going on during my slots, please join me :-)

At JCrete 2018 we put together a list of recommended books. Only 4/18 have Java in their title. Here they are - happy reading:

  • Streaming Data: Understanding the Real-Time Pipeline by Andrew Psaltis [ISBN 1617292281]
  • Thanks for the Feedback: The Science and Art of Receiving Feedback Well by Douglas Stone [ISBN 0670922633]
  • Optimizing Java: Practical techniques for improving JVM application performance by Benjamin Evans [ISBN 1492025798]
  • The Pyramid Principle: Logic in Writing and Thinking: Logical Writing, Thinking and Problem Solving by Barbara Minto [ISBN 0273710516]
  • The Java Module System by Nicolai Parlog [ISBN 1617294284]
  • Getting to Yes: Negotiating an agreement without giving in by Roger Fisher [ISBN 1847940935]
  • The Elements of Style by EB White [ISBN 020530902X]
  • Creativity, Inc.: Overcoming the Unseen Forces That Stand in the Way of True Inspiration by Ed Catmull [ISBN 0593070100]
  • Talking with Tech Leads: From Novices to Practitioners by Patrick Kua [ISBN 150581748X]
  • Testing Java Microservices by Alex Soto Bueno et al [ISBN 1617292893]
  • Godel, Escher, Bach: An Eternal Golden Braid by Douglas R. Hofstadter [ISBN 0465026567]
  • Java Concurrency in Practice by Brian Goetz [ISBN 0321349601]
  • Clean Architecture: A Craftsman's Guide to Software Structure and Design by Robert C. Martin [ISBN 0134494164]
  • Design Patterns: Elements of Reusable Object-Oriented Software by Erich Gamma et al [ISBN 0201633612]
  • Pattern Hatching: Design Patterns Applied by John M. Vlissides [ISBN 0201432935]
  • Soft Skills:The software developer's life manual by John Z. Sonmez [ISBN 1617292397]
  • The Switch: America's Global Energy Renaissance by Dan K. Eberhart [ISBN 162634258X]
  • The 5 Levels of Leadership: Proven Steps to Maximize Your Potential by John C. Maxwell [ISBN 1599953633]

javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.

Concurrent Queue Sizes and Hot Fields

java.util.Collection has a size() method. Who would not want to know how many elements are in their collection?

In concurrency, we have the concept of a "hot field". Such a field will need to be accessed whenever we call a method on the object. For example, inside the ArrayBlockingQueue, the items array is a "hot field". We cannot add an item whilst removing another, because both methods lock on the same ReentrantLock to access the array.

ConcurrentLinkedQueue was designed to be a generalized thread-safe FIFO queue for when we did not need a BlockingQueue. It should not matter whether we had multi-producer-multi-consumer systems (MPMC) or single-producer-single-consumer (SPSC) or a combination of both. In all cases it should work correctly. The queue should also be unbounded and non-blocking in its thread-safety implementation.

In the first version of ConcurrentLinkedQueue, thread safety was managed with AtomicReferenceFieldUpdater, but was changed in Java 7 to sun.misc.Unsafe and in Java 9 to use VarHandle. They fastidiously avoided "hot fields". For example, the head and tail of the ConcurrentLinkedQueue could be updated independently from each other. Thus one thread could add whilst another thread could remove elements from the queue without causing contention.

Another field where they wanted to avoid contention was the size of the queue. Even though java.util.Collection offers size() as a method, they decided that java.util.Queue would not have to implement it in a useful way. Yep, the JavaDocs even state this explicitly: "this method is typically not very useful in concurrent applications" Thus instead of storing the size inside an AtomicInteger, necessitating contended updates on both add and remove of the queue, they count the elements every time we called size().

Two effects can be seen. Firstly, the time it takes to execute size() is directly related to the length of the queue. Secondly, in size() we start counting from the head of the queue. If whilst we are counting, new elements are added to the back and old ones removed from the front, the result might be larger than the true number of items ever in the queue. In rare situations, these two effects could combine so that size() never returns. Consider this InfiniteSize class, written with the new Java 10 var syntax where it makes sense:

import java.util.*;
import java.util.concurrent.*;

public class InfiniteSize {
  public static void main(String... args) {
    var queue = new ConcurrentLinkedQueue<String>();
    for (int i = 0; i < 40_000_000; i++) {
      queue.add("test");
    }

    var phaser = new Phaser(900);
    for (int i = 0; i < 1000; i++) { // 1000 threads?  Seriously?
      Thread t = new Thread(() -> {
        phaser.arriveAndAwaitAdvance();
        while (true) {
          queue.add("test");
          queue.remove();
        }
      });
      t.setDaemon(true);
      t.start();
    }
    var time = System.nanoTime();
    try {
      System.out.println("Measuring queue size");
      System.out.println(queue.size()); // might never return
    } finally {
      time = System.nanoTime() - time;
      System.out.printf("time = %dms%n", (time / 1_000_000));
    }
  }
}

The code in InfiniteSize is truly dreadful. 1000 threads are all contending for the queue. Note how I use Phaser to let the threads start, but then throttle them back a bit. I ran this with the new OpenJDK 11 Epsilon Garbage Collector (-verbose:gc -Xmx16g -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC) and after it printed "Measuring queue size" it eventually crashed with all 16 GB exhausted. In case you have not heard of Epsilon, it does not collect any unused objects. It is a great collector to use for running performance tests if you want to eliminate GC costs from your experiment. I also tried with G1, but that was hopeless and spent 50% of time in GC. I let it run for 400 seconds before killing the process. Since Epsilon GC does not spend resources trying to free memory, we can probably conclude that the call to size() was not keeping up with the other 1000 threads adding and removing elements. Or maybe there was something else. As I said, it is dreadful code and fortunately not at all realistic of what we would find in production. For normal code, it won't happen that size() takes forever to return. It is an O(n) method call, but if your queues are so long that this is significant, you have bigger problems and should perhaps consider a career at airport security ;-)

I think we can thus safely ignore the O(n) cost of size() and instead focus on how correct it is. It is not. The value might not be at all related to how many elements were ever in the queue at one time.

Here is another class that creates a queue with 10 elements and then adds and removes elements with another thread. Whilst that is going on, size() is called and we see what the largest value is. Note that I again use the Java 10 var syntax, but not to replace the for (int ... since that would neither make the code clearer nor save typing.

import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;

public class QueueSize {
  public static final int RESIDENT_QUEUE_SIZE = 10;
  public static final int ADD_AND_REMOVES = 1_000_000;

  public static void main(String... args) {
    for (int i = 0; i < 10; i++) {
      // size either 10 or 11
      test(LinkedBlockingQueue::new);

      // size always >= 10
      test(LinkedTransferQueue::new);
      test(ConcurrentLinkedQueue::new);
      System.out.println();
    }
  }

  private static void test(Supplier<Queue<String>> queueType) {
    var queue = queueType.get();
    for (int i = 0; i < RESIDENT_QUEUE_SIZE; i++) {
      queue.add("test" + i);
    }

    var thread = addRemoveThread(queue);

    var maxSize = 0;
    while (thread.isAlive()) {
      maxSize = Math.max(maxSize, queue.size());
    }
    System.out.printf(Locale.US, "%s: maxSize=%d%n",
        queue.getClass().getSimpleName(), maxSize);
  }

  private static Thread addRemoveThread(Queue<String> queue) {
    var thread = new Thread(() -> {
      for (int i = 0; i < ADD_AND_REMOVES; i++) {
        queue.add("test" + (i + RESIDENT_QUEUE_SIZE));
        queue.remove();
      }
    }, "addRemoveThread");
    thread.start();
    return thread;
  }
}

The largest value of size() with the LinkedBlockingQueue is 11. LinkedTransferQueue and ConcurrentLinkedQueue could return anything >= 10, for example I saw:

LinkedBlockingQueue: maxSize=11
LinkedTransferQueue: maxSize=54
ConcurrentLinkedQueue: maxSize=38

The values for size() are not just stale, but can be completely wrong. It would have been more honest to either always return 0 or to throw an UnsupportedOperationException. In a single-threaded situation, size() would work, but then we would rather use an ArrayDeque or a LinkedList. The challenge is that the contract of Collection.size() does not say anything about the possibility of an UnsupportedOperationException, therefore implementations should also desist from throwing it.

Of course, if you know how your queue will be used, it is better to use a queue specific to your requirements. Have a look at Java Champion Nitsan Wakart's excellent JCTools, inspired by Martin Thompson's work on Mechanical Sympathy. The default queue that is created by QueueFactory when we specify an unbounded MPMC queue is the ConcurrentLinkedQueue. Make it bounded or change it to SPSC and the resultant queue can be factors faster.

Back to the hot field. I believe there was a thought recently to store ConcurrentLinkedQueue's size inside a LongAdder, since that has much better performance under contention than AtomicInteger. Whilst a good argument, it might also dramatically increase the size of the ConcurrentLinkedQueue object on multi-processor systems if the field ends up being contended.

Thanks for reading this and please check out our concurrency training if you would like to learn more.

Kind regards

Heinz

 

Comments

We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)

When you load these comments, you'll be connected to Disqus. Privacy Statement.

Related Articles

Browse the Newsletter Archive

About the Author

Heinz Kabutz Java Conference Speaker

Java Champion, author of the Javaspecialists Newsletter, conference speaking regular... About Heinz

Superpack '23

Superpack '23 Our entire Java Specialists Training in one huge bundle more...

Free Java Book

Dynamic Proxies in Java Book
Java Training

We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.

Java Consulting

We can help make your Java application run faster and trouble-shoot concurrency and performance bugs...