Abstract: We look at how we could internalize the iteration into a collection by introducing a Processor interface that is applied to each element. This allows us to manage concurrency from within the collection.
Welcome to the 178th issue of The Java(tm) Specialists' Newsletter, sent to you from Chania on the beautiful island of Crete. A few days ago, we spontaneously decided to make a spit roast with some friends from Denver. When we bought our house, it came with a fire pit and hooks for a spit, so recently I invested in a motor for turning the meat around. So here we were, sitting outside in our shorts at ten in the night in Europe in November, watching the fat drip onto the coals. The spit roast must be one of the easiest way of cooking the meat, you simply put it on and then sit there watching it get ready, whilst chatting about politics and philosophy. Now why did I not buy this equipment when I still lived in South Africa? Do let me know if you come to Chania in Crete and we'll have a feast together, assuming that I am home ... :-)
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
The Iterator design pattern, described in the Gang-of-Four book, lists more implementation options than are available in the java.util.* classes. One of the questions asked is: who controls the iteration? In the standard Java Iterator idiom, the client controls it, for example like this:
Iterator<String> it = names.iterator(); while(it.hasNext()) { String name = it.next(); System.out.println(name); }
When we use the Java 5 for-each loop, the iteration is still
controlled by the client, even though we do not explicitly
call the next()
method.
for(String name : names) { System.out.println(name); }
Concurrency and iteration is tricky. The approach in Java 1 was to synchronize everything. However, we could still iterate through the vector whilst it was being changed, resulting in unpredictable behaviour. In the Java 2 idiom, collections were by default unsynchronized. We would thus need to lock the entire collection whilst we were iterating to avoid a ConcurrentModificationException. In the Java 5 idiom, we see classes like CopyOnWriteArrayList, where the underlying array is copied every time it is modified. This is expensive, but guarantees lock-free iteration.
In this alternative approach, we control the iteration from within the collection, making concurrency easier.
We define an interface Processor
with a single
method process()
that returns a boolean. The
return value is used to determine whether we should stop
iterating. Our collection would then apply this processor to
every element in the collection. For example, here is my
Processor
interface with my collection's
iterate()
method.
public interface Processor<E> { boolean process(E e); } // in our collection class public void iterate(Processor<E> processor) { for (E e : wrappedCollection) { if (!processor.process(e)) break; } }
We can then use the ReadWriteLock to differentiate between methods that modify the state of the collection and those that do not. For example, the add(E) method would lock on a write lock, but the size() method only on a read lock.
The easiest way to implement a collection is to extend AbstractCollection. All we have to do is implement the size() and iterate() methods, like so:
import java.util.*; public class SimpleCollection<E> extends AbstractCollection<E> { private final E[] values; public SimpleCollection(E[] values) { this.values = values.clone(); } public Iterator<E> iterator() { return new Iterator<E>() { private int pos = 0; public boolean hasNext() { return pos < values.length; } public E next() { // Thanks to Volker Glave for pointing out that we need // to throw a NoSuchElementException if we try to go past // the end of the array. if (!hasNext()) throw new NoSuchElementException(); return values[pos++]; } public void remove() { throw new UnsupportedOperationException(); } }; } public int size() { return values.length; } }
This is an immutable collection, but by implementing the
add()
and Iterator.remove()
methods, we could make it into a normal mutable collection.
Methods like isEmpty()
and
toString()
simply use our internal
iterator()
and size()
methods.
public class SimpleCollectionTest { public static void main(String[] args) { String[] arr = {"John", "Andre", "Neil", "Heinz", "Anton"}; SimpleCollection<String> names = new SimpleCollection<String>(arr); for (String name : names) { // works System.out.println(name); } System.out.println(names); // works System.out.println(names.isEmpty()); // works names.add("Dirk"); // throws UnsupportedOperationException names.remove("Neil"); // throws UnsupportedOperationException } }
In our WalkingCollection, we also use the concept of ReadWriteLocks to differentiate between read-only and read-write methods. We should be able to iterate with multiple threads at the same time, but only one thread may modify it at once. I am a bit wary of ReadWriteLocks due to the possibility of starvation, as described in Newsletter #165.
An initial implementation of the WalkingCollection just
overrides the minimum number of methods. If you want to use
bulk update methods like addAll()
, it would be
more efficient to only acquire the lock a single time.
However, I have not called the bulk update methods very often
in my last 12.5 years of Java programming, so will leave that
as an "exercise to the reader".
One of the issues with the ReentrantReadWriteLock
implementation is that you can downgrade a write lock to a
read lock, but you cannot upgrade the read to a write. You
do not even get an exception, the thread just hangs up. In
our implementation, we thus keep a ThreadLocal to indicate
that we are busy iterating and check this before acquiring
the write lock. An exception is better to see than for the
thread to simply go into the BLOCKED
state.
import java.util.*; import java.util.concurrent.locks.*; public class WalkingCollection<E> extends AbstractCollection<E> { private static final ThreadLocal<Boolean> iterating = new ThreadLocal<Boolean>() { protected Boolean initialValue() { return false; } }; private final Collection<E> wrappedCollection; private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); public WalkingCollection(Collection<E> wrappedCollection) { this.wrappedCollection = wrappedCollection; } public void iterate(Processor<E> processor) { rwlock.readLock().lock(); try { iterating.set(true); for (E e : wrappedCollection) { if (!processor.process(e)) break; } } finally { iterating.set(false); rwlock.readLock().unlock(); } } public Iterator<E> iterator() { rwlock.readLock().lock(); try { final Iterator<E> wrappedIterator = wrappedCollection.iterator(); return new Iterator<E>() { public boolean hasNext() { rwlock.readLock().lock(); try { return wrappedIterator.hasNext(); } finally { rwlock.readLock().unlock(); } } public E next() { rwlock.readLock().lock(); try { return wrappedIterator.next(); } finally { rwlock.readLock().unlock(); } } public void remove() { checkForIteration(); rwlock.writeLock().lock(); try { wrappedIterator.remove(); } finally { rwlock.writeLock().unlock(); } } }; } finally { rwlock.readLock().unlock(); } } public int size() { rwlock.readLock().lock(); try { return wrappedCollection.size(); } finally { rwlock.readLock().unlock(); } } public boolean add(E e) { checkForIteration(); rwlock.writeLock().lock(); try { return wrappedCollection.add(e); } finally { rwlock.writeLock().unlock(); } } private void checkForIteration() { if (iterating.get()) throw new IllegalMonitorStateException( "Cannot modify whilst iterating"); } }
To use this, we define processors that can do something with
the contents of the collection. For example, the
PrintProcessor
prints each element to the
console:
public class PrintProcessor<E> implements Processor<E> { public boolean process(E o) { System.out.println(">>> " + o); return true; } }
The AddProcessor
adds up the numbers in the
collection and returns the total as a double:
public class AddProcessor<N extends Number> implements Processor<N> { private double total = 0; public boolean process(N n) { total += n.doubleValue(); return true; } public double getTotal() { return total; } public void reset() { total = 0; } }
We can even define a CompositeProcessor that aggregates several processors together:
import java.util.*; public class CompositeProcessor<E> implements Processor<E> { private final List<Processor<E>> processors = new ArrayList<Processor<E>>(); public void add(Processor<E> processor) { processors.add(processor); } public boolean process(E e) { for (Processor<E> processor : processors) { if (!processor.process(e)) return false; } return true; } }
We can combine these in a test class that processes a bunch of numbers, as such:
public class WalkingCollectionTest { public static void main(String[] args) { WalkingCollection<Long> ages = new WalkingCollection<Long>( new java.util.ArrayList<Long>() ); ages.add(10L); ages.add(35L); ages.add(12L); ages.add(33L); PrintProcessor<Long> pp = new PrintProcessor<Long>(); ages.iterate(pp); AddProcessor<Long> ap = new AddProcessor<Long>(); ages.iterate(ap); System.out.println("ap.getTotal() = " + ap.getTotal()); // composite System.out.println("Testing Composite"); ap.reset(); CompositeProcessor<Long> composite = new CompositeProcessor<Long>(); composite.add(new Processor<Long>() { private long previous = Long.MIN_VALUE; public boolean process(Long current) { boolean result = current >= previous; previous = current; return result; } }); composite.add(ap); composite.add(pp); ages.iterate(composite); System.out.println("ap.getTotal() = " + ap.getTotal()); } }
Here is the output from our test code:
>>> 10 >>> 35 >>> 12 >>> 33 ap.getTotal() = 90.0 Testing Composite >>> 10 >>> 35 ap.getTotal() = 45.0
One of the restrictions of the iterate() method is that we cannot modify the collection from within the process() methods. We can "downgrade" a write lock to a read lock, but not the other way round. During the iteration, we are holding the read lock. In our WalkingCollection, we throw an IllegalMonitorStateException when this happens:
public class WalkingCollectionBrokenTest { public static void main(String[] args) { final WalkingCollection<String> names = new WalkingCollection<String>( new java.util.ArrayList<String>() ); names.add("Maximilian"); names.add("Constance"); names.add("Priscilla"); names.add("Evangeline"); Processor<String> pp = new Processor<String>() { public boolean process(String s) { if ("Priscilla".equals(s)) names.remove(s); return true; } }; names.iterate(pp); } }
We see the following output:
Exception in thread "main" java.lang.IllegalMonitorStateException: Cannot modify whilst iterating at WalkingCollection.checkForIteration(WalkingCollection.java:93) ... etc.
I created a course on Design Patterns before my daughter Connie was born. She is now 8 years old. Over the years, I added patterns and removed obsolete ones, but a surprising amount of material has survived 8 years of programming advances. Once a company has bought one of my Design Patterns courses, they usually end up wanting to train dozens (or even hundreds) of their developers, so it remains one of my best selling courses of all time. Click here for more information about the Design Patterns Course.
Time to go out for a walk with my kids, so I hope you enjoyed this newsletter.
Kind regards
Heinz
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.