Abstract: CompletionService queues finished tasks, making it easier to retrieve Futures in order of completion. But it lacks some basic functionality, such as a count of how many tasks have been submitted.
Welcome to the 214th issue of The Java(tm) Specialists' Newsletter, sent from sunny Crete. As a proud owner of a polyteknos card (meaning I have produced four new Greek taxpayers), I now get discounts almost everywhere I go in Greece. All forms of public transport, ferries, shops. Spar gives me 3% off everything, including beer. That's fair, I think. Beer is important.
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
ExecutorService
provides a convenient way to submit tasks
that can then be executed asynchronously by threads provided
by the service. Instead of paying the price of thread start
latency, the job can immediately be started by an available
thread. The decrease in thread start latency however only
gives us a performance advantage if a thread is currently
idle waiting for work. If that is not the case, then either
a thread will have to be started, as happens with the
cached thread pool, or the thread will have to be put in a
waiting queue, as you would find in the fixed thread pool.
In either case, our latency can be greater than if we had
just started a thread directly. As Kirk says: "When last did
you see a queue and say to yerself - yay, that will go fast!"
Let's think about queuing a bit, based on our experience in real life. Most of us have been to airports. Imagine you are walking towards the check-in queue. At the same time, you see a large family walking towards the queue, but from the other side. What do you do? You subtly up your pace a bit, hoping to get there first. Once you are in the queue, it does not really matter what is happening around you as you have to wait anyway. Until you get near the front, that is. For some reason, the person directly in front of me always seems fast asleep. The agent at the open counter is articulating wildly with her arms, but our hero is obliviously studying his fingernails. Even though the single queue with multiple server system theoretically gives us the shortest average wait time, we get contention at the head and tail of the single queue. The same contention happens with the traditional thread pool design and is one of the reasons why the ForkJoinPool employs several queues and work stealing. Tasks may sometimes be executed out-of-order, but because tasks are supposed to be independent, this should not cause issues.
Usually when you submit a Callable<V>
to an
ExecutorService
, you need to manage the Future<V>
that is
returned by the submit()
method. However, since each task
may take a different time to complete, you could quite easily
block on a future whilst another future might already be
available. Here is an example (using Java 8 lambdas - see
Maurice Naftalin's
Lambda FAQ):
import java.util.*; import java.util.concurrent.*; public class ExecutorServiceExample { public static void main(String... args) throws Exception { try (DotPrinter dp = new DotPrinter()) { ExecutorService pool = Executors.newCachedThreadPool(); Collection<Future<Integer>> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { int sleeptime = 5 - i % 5; int order = i; futures.add(pool.submit(() -> { TimeUnit.SECONDS.sleep(sleeptime); return order; })); } for (Future<Integer> future : futures) { System.out.printf("Job %d is done%n", future.get()); } pool.shutdown(); } } }
Here is the code for our DotPrinter, whose entire purpose in life is to keep our attention occupied by printing a meaningless dot once every second:
import java.util.concurrent.*; public class DotPrinter implements AutoCloseable { private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); public DotPrinter() { timer.scheduleAtFixedRate(() -> { System.out.print("."); System.out.flush(); }, 1, 1, TimeUnit.SECONDS); } public void close() { timer.shutdown(); } }
Output would be the following:
.....Job 0 is done Job 1 is done Job 2 is done Job 3 is done Job 4 is done Job 5 is done Job 6 is done Job 7 is done Job 8 is done Job 9 is done
As we see in the output, as we iterate through the futures
in the order we submitted them, we get the results in order
of submission. However, because the first job takes the
longest, we have to wait until that is done before seeing the
results of jobs that have been available for some time.
In order to solve that, Java also has an
ExecutorCompletionService
. It is a very simple class that
just contains a BlockingQueue
of completed tasks and
specialized Futures that enqueue themselves when the done()
method is called.
The ExecutorCompletionService
lacks some basic functionality,
such as a way to find out how many tasks have been submitted.
In our next example, we show how we could use the
CompletionService
to improve the ExecutorServiceExample
above.
import java.util.concurrent.*; public class CompletionServiceExample { public static void main(String... args) throws Exception { try (DotPrinter dp = new DotPrinter()) { ExecutorService pool = Executors.newCachedThreadPool(); CompletionService<Integer> service = new ExecutorCompletionService<>(pool); for (int i = 0; i < 10; i++) { int sleeptime = 5 - i % 5; int order = i; service.submit(() -> { // time to get used to lambdas? TimeUnit.SECONDS.sleep(sleeptime); return order; }); } for (int i = 0; i < 10; i++) { System.out.printf("Job %d is done%n", service.take().get()); } pool.shutdown(); } } }
The output now comes to:
.Job 4 is done Job 9 is done .Job 3 is done Job 8 is done .Job 2 is done Job 7 is done .Job 1 is done Job 6 is done .Job 0 is done Job 5 is done
Nice. However, we do need to remember how many tasks we added to the CompletionService. Instead, we could extend the ExecutorCompletionService and add that functionality:
import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class CountingCompletionService<V> extends ExecutorCompletionService<V> { private final AtomicLong submittedTasks = new AtomicLong(); private final AtomicLong completedTasks = new AtomicLong(); public CountingCompletionService(Executor executor) { super(executor); } public CountingCompletionService( Executor executor, BlockingQueue<Future<V>> queue) { super(executor, queue); } public Future<V> submit(Callable<V> task) { Future<V> future = super.submit(task); submittedTasks.incrementAndGet(); return future; } public Future<V> submit(Runnable task, V result) { Future<V> future = super.submit(task, result); submittedTasks.incrementAndGet(); return future; } public Future<V> take() throws InterruptedException { Future<V> future = super.take(); completedTasks.incrementAndGet(); return future; } public Future<V> poll() { Future<V> future = super.poll(); if (future != null) completedTasks.incrementAndGet(); return future; } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { Future<V> future = super.poll(timeout, unit); if (future != null) completedTasks.incrementAndGet(); return future; } public long getNumberOfCompletedTasks() { return completedTasks.get(); } public long getNumberOfSubmittedTasks() { return submittedTasks.get(); } public boolean hasUncompletedTasks() { return completedTasks.get() < submittedTasks.get(); } }
We can thus replace the result iterating loop like this:
for (int i = 0; i < service.getNumberOfSubmittedTasks(); i++) { System.out.printf("Job %d is done%n", service.take().get()); }
Of course, an even nicer solution would be to build an iterable CompletionService. The loop above could then be replaced with simply:
for (Future<Integer> future : service) { System.out.printf("Job %d is done%n", future.get()); }
One of the exercises in my Concurrency Specialist Course is writing such an Iterable. It looks really simple, but there are a few gotchas that you need to be aware of. If you'd like to test your skill, here is an outline of what you need to do:
import java.util.*; public class CompletionServiceIterable<V> implements Iterable<Future<V>> { public CompletionServiceIterable() { // TODO } public void submit(Callable<V> task) { // TODO } public Iterator<Future<V>> iterator() { // TODO } public void shutdown() { // TODO } public boolean isTerminated() { // TODO } }
If you send me your solution, I'll run it against my unit tests and then give you my personal feedback. You should solve it without using any external libraries, just the JDK (6, 7 or 8) and your own code. Good luck :-)
Kind regards
Heinz
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.