Abstract: LinkedHashSet is a set that can also maintain order. To make this thread-safe, we can wrap it with Collections.synchronizedSet(). However, this is not a good option, because iteration would still be fast-fail. And what's the point of a thread-safe LinkedHashSet that we cannot iterate over? In this newsletter we try to create a concurrent set that behaves like a LinkedHashSet, but with minimal locking and with a weakly-consistent iteration.
Welcome to the 296th edition of The Java(tm) Specialists' Newsletter, and warm greetings from a wet and cold Island of Crete. Our summer tourists would be quite amazed at how drenched we get. Which other place has had its bad weather inscribed into the Holy Bible? (Saint Paul got caught in a hurricane while trying to navigate the South of Crete and after a shipwreck ended up on Malta.) Oh, and we have earthquakes galore. One of my friends sent me this email a few days ago: "Earthquakes happen very often now... Is it safe to stay there?" Good question indeed. No idea. But it is a nice place to live when it ain't rockin' and rollin', or raining cats and dogs.
And don't forget - today is the last day of 2021 and your last chance to spend 2021 training budget. For example, by grabbing our Java Specialists Superpack :-)
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
What's wrong with this code?
private final Set<Connection> connections = Collections.synchronizedSet(new LinkedHashSet<>());
During a recent talk, a friend showed a class with such a field. Someone
complained. This would "encourage programmers to use
Collections.synchronizedSet()
in production code. A concurrent
collection would be preferable."
Since I have a particular interest in concurrency, I jumped to my friend's
defense. It is difficult writing slides. Not every snippet of code we show
is an example of perfect production code. Then, a synchronized collection
is not always slower than a ConcurrentHashMap
. After all, the
ConcurrentHashMap
itself maintains its invariants with
synchronized
.
I do not know whether the synchronized wrapped LinkedHashSet
would be slow or
not. Uncontended synchronization is fast. Since we are doing networking, I would
expect that to be far more costly than a little lock. A bigger concern to me
is that the LinkedHashSet
iteration is fast-fail. The synchronized wrapper
would not protect us against a ConcurrentModificationException
.
We could lock the entire set during iteration, but iteration
is O(n) cost and thus locking might not be such a good idea.
In a threaded environment, weakly-consistent iteration is preferable.
I began to wonder how we could create a concurrent form of
LinkedHashSet
. Instead of TreeSet
, we
can use the concurrent, thread-safe ConcurrenSkipListSet
.
Similarly, instead of HashSet
, we could use
ConcurrentHashMap.newKeySet()
. But LinkedHashSet
did not seem to have an obvious concurrent alternative.
Here is my attempt, combining a ConcurrentHashMap
with a
ConcurrentSkipListSet
.
Our class offers a reduced interface of Set
, thus only:
add(e)
remove(e)
contains(e)
stream()
clear()
iterator()
toString()
The ConcurrentSkipListSet
maintains the insertion order. The
ConcurrentHashMap
ensures that elements are distinct. We store
the element and insertion order inside the InsertionOrder
record
. These are then stored inside the
ConcurrentSkipListSet
, sorted by the insertion order.
We store our element as a key inside the ConcurrentHashMap
.
The values are the same InsertionOrder
s that are inside the
ConcurrentSkipListSet
.
When we add an element, we call the computeIfAbsent()
method
on our map. If the element does not exist in the map yet, we create our
InsertionOrder
and add it to the set. When we want to remove it
again, we use computeIfPresent()
to also remove it from the set.
Both of these compute methods are performed atomically on a
ConcurrentHashMap
.
Here is the class. Please shout
if you can think of a better approach for a
thread-safe concurrent LinkedHashSet
. I have not
done extensive testing on this class. Neither have I benchmarked the
performance. I have thus no idea whether it is faster or slower than the
original synchronize wrapped LinkedHashSet
. I'm not even sure
that the code is correct. Please do not use it in production without
extensive testing. And when (not if) you find glaring errors, please let me
know so that I can update this newsletter :-)
Note: Small change to the original newsletter, thanks to
Jesper Udby. Instead of a static shared AtomicLong
, we let each
set contain its own and pass the order into the record.
Thanks Jesper! Also thanks to Cor Takken for
suggesting better names for the fields and record.
import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.stream.*; public class ConcurrentLinkedReducedHashSet<E> implements Iterable<E> { /** * A tuple holding our value and its insertion order. */ private record InsertionOrder<T>(T value, long order) { } /** * Contains a mapping from our element to its insertionOrder. */ private final Map<E, InsertionOrder<E>> elements = new ConcurrentHashMap<>(); /** * The insertion order maintained in a ConcurrentSkipListSet, * so that we iterate in the correct order. */ private final Set<InsertionOrder<E>> elementsOrderedByInsertion = new ConcurrentSkipListSet<>( Comparator.comparingLong(InsertionOrder::order)); /** * AtomicLong for generating the next insertion order. */ private final AtomicLong nextOrder = new AtomicLong(); public boolean add(E e) { var added = new AtomicBoolean(false); elements.computeIfAbsent( e, key -> { var holder = new InsertionOrder<>(e, nextOrder.getAndIncrement()); elementsOrderedByInsertion.add(holder); added.set(true); return holder; } ); return added.get(); } public boolean remove(E e) { var removed = new AtomicBoolean(false); elements.computeIfPresent(e, (key, holder) -> { elementsOrderedByInsertion.remove(holder); removed.set(true); return null; // will remove the entry }); return removed.get(); } public boolean contains(E e) { return elements.containsKey(e); } public Stream<E> stream() { return elementsOrderedByInsertion.stream().map(InsertionOrder::value); } public void clear() { // slow, but ensures we remove all entries in both collections stream().forEach(this::remove); } @Override public Iterator<E> iterator() { return stream().iterator(); } @Override public String toString() { return stream() .map(String::valueOf) .collect(Collectors.joining(", ", "[", "]")); } }
In this demo, we iterate in insertion order
without a ConcurrentModificationException
:
public class WeaklyConsistentOrderedDemo { public static void main(String... args) { // var set = Collections.synchronizedSet(new LinkedHashSet<String>()); // CME // var set = ConcurrentHashMap.<String>newKeySet(); // works, wrong order var set = new ConcurrentLinkedReducedHashSet<String>(); // Perfect (maybe) set.add("hello"); set.add("world"); Iterator<String> it = set.iterator(); set.add("Goodbye"); while (it.hasNext()) { String next = it.next(); System.out.println(next); } } }
Output from our reduced set is:
hello world Goodbye
In the next demo, we remove "hello" after having iterated past it, and add it again. We would expect "hello" to thus show up at the end, and it does indeed.
public class WeaklyConsistentOrderedDemoComplex { public static void main(String... args) { var set = new ConcurrentLinkedReducedHashSet<String>(); set.add("hello"); set.add("world"); set.add("Goodbye"); Iterator<String> it = set.iterator(); System.out.println(it.next()); // hello System.out.println(it.next()); // world set.remove("hello"); set.add("hello"); System.out.println(it.next()); // Goodbye System.out.println(it.next()); // hello System.out.println(it.hasNext()); // false } }
The output is:
hello world Goodbye hello false
So far, so good. Next we have a demo that removes and adds 1.6m random numbers between 0..9 into our set. There should be no duplicates at the of the run, if everything is working.
import java.util.concurrent.*; public class ConcurrentUpdatesDemo { public static void main(String... args) throws InterruptedException { var set = new ConcurrentLinkedReducedHashSet<Integer>(); ExecutorService pool = Executors.newFixedThreadPool(16); for (int i = 0; i < 16; i++) { pool.submit(() -> { ThreadLocalRandom random = ThreadLocalRandom.current(); for (int j = 0; j < 100_000; j++) { int value = random.nextInt(0, 10); set.remove(value); set.add(value); } }); } pool.shutdown(); while (!pool.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("Waiting for pool to shut down"); } System.out.println("set = " + set); } }
That also seems to work:
set = [7, 6, 1, 3, 5, 2, 8, 9, 0, 4]
At the outset, I said that this would be a reduced set that
only has the methods that we are going to use. But what if
we need a java.util.Set
? We could throw
UnsupportedOperationException
for all the
methods, besides those we implemented.
This is surprisingly easy to do if you've read my book on
dynamic proxies :-) [Or grab my course on dynamic proxies in Java
here.]
We first create a Set
that throws
UnsupportedOperationException
for all methods:
Set<String> angrySet = Proxies.castProxy( Set.class, (p, m, a) -> { throw new UnsupportedOperationException( m.getName() + "() not implemented"); } );
Next we wrap that inside a dynamic object adapter, with our
ConcurrentLinkedReducedHashSet
as the adapter. The dynamic
object adapter will give precedence to our methods. It will thus throw an
UnsupportedOperationException
whenever we try call another
method from the Set
. The resulting type will be a
Set
and we could add more methods as needed.
Set<String> set = Proxies.adapt( Set.class, // target interface angrySet, // adaptee new ConcurrentLinkedReducedHashSet<>() // adapter );
A complete demo would look like this:
// Maven: eu.javaspecialists.books.dynamicproxies:core:2.0.0 import eu.javaspecialists.books.dynamicproxies.*; import java.util.*; public class DynamicProxiesDemo { public static void main(String... args) { Set<String> angrySet = Proxies.castProxy( Set.class, (p, m, a) -> { throw new UnsupportedOperationException( m.getName() + "() not implemented"); } ); Set<String> set = Proxies.adapt( Set.class, // target interface angrySet, // adaptee new ConcurrentLinkedReducedHashSet<>() // adapter ); set.add("hello"); set.add("world"); Iterator<String> it = set.iterator(); set.add("Goodbye"); while (it.hasNext()) { String next = it.next(); System.out.println(next); } set.clear(); set.addAll(List.of("one")); // UnsupportedOperationException } }
Again, no guarantees about how fast this will be, nor whether it works at all. I would be delighted to see a better solution, using the standard JDK classes.
Kind regards
Heinz
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.