Abstract: Timers in Java have suffered from the typical Command Pattern characteristics. Exceptions could stop the timer altogether and even with the new ScheduledPoolExecutor, a task that fails is cancelled. In this newsletter we explore how we could reschedule periodic tasks automatically.
Welcome to the 154th issue of The Java(tm) Specialists' Newsletter, which I started writing at 36000 feet flying in a brand new Airbus 320-200 en route to Frankfurt Sun Tech Days. Lovely plane, nice and smooth ride, probably partly due to the altitude it was flying at. The idea for this newsletter came from one of the participants of my recent talk in London on the Secrets of Concurrency; unfortunately this particular talk was not accessible to the general public.
Attention New Yorkers: On the 11th of December 2007, I have been invited to present a talk at the NYC Java SIG on the Secrets of Concurrency. If you are in the area and available that evening, it would be great to meet you! On the website they say that you need to RSVP. Our meeting place is unfortunately not that large, so I would strongly encourage you to let them know if you would like to come.
javaspecialists.teachable.com: Please visit our new self-study course catalog to see how you can upskill your Java knowledge.
In the early days of Java, when you needed a timer to periodically do some task, you would create a thread, do the task, sleep for some set time, and repeat ad infinitum. If an Error or RuntimeException occurred during the execution of the task, the thread would typically die, also stopping the scheduled task. It suffered from another problem, in that we needed a new thread for every scheduled task in the system. Most of these threads would sit around idle most of the time.
Enter JDK 1.3. With this version, we had a java.util.Timer class that shared one thread between lots of different periodic tasks. We therefore have less unnecessary threads, but this brings with it other problems: First off, if one of the tasks causes an Error or a RuntimeException, the TimerTask thread dies, thus none of the other tasks progress either. Here is a demonstration program:
import java.util.*; public class TimerTest { public static void main(String[] args) { Timer timer = new Timer(); for(int i=0; i<5; i++) { final int i1 = i; timer.schedule(new TimerTask() { public void run() { System.out.println("i = " + i1); if (Math.random() < 0.1) { throw new RuntimeException(); } } }, 1000, 1000); } } }
Secondly, if a task takes a long time to complete, the other tasks might be held-up longer than desired, since the TimerTask only uses one thread to execute the tasks.
These problems were solved in Java 5 with the ScheduledThreadPoolExecutor class. First off, when one task misbehaves by causing unchecked exceptions, only it gets cancelled and the thread pool continues execution. Secondly, since you can create this ExecutorService with several threads, the likelihood that one task is blocking others is decreased.
According to the definition in Goetz, livelock is a form of liveness failure in which a thread, while not blocked, still cannot make progress because it keeps retrying an operation that will always fail. This form of livelock often comes from overeager errorrecovery code that mistakenly treats an unrecoverable error as a recoverable one.
Therefore, cancelling the task that causes an exception is the only safe and sensible thing to do in general; otherwise we could potentially create a livelock. However, there are cases where we might want to reschedule a task that has failed.
By extending the ScheduledThreadPoolExecutor
class and overriding the
afterExecute(Runnable,Throwable)
method, we are
able to immediately discover when a task has failed.
Unfortunately the Runnable
that we see in the
afterExecute()
method parameter is not the same
as the submitted Runnable. It is a wrapper object, that does
not allow us to obtain the original submitted task. However,
it is the same object as is returned by the schedule()
methods. We also need to know what additional parameters
were submitted to the
ScheduledThreadPoolExecutor
so that we can
resubmit it.
As a start, we define a ScheduledExceptionHandler
interface that is notified when an exception occurs. The
exceptionOccurred()
method then needs to return
true
if the task must be rescheduled;
false
otherwise.
public interface ScheduledExceptionHandler { /** * @return true if the task should be rescheduled; * false otherwise */ boolean exceptionOccurred(Throwable e); }
The overridden class now uses that to bind the cancelled task
to the submitted task. We us an IdentityHashMap to make sure
that we are comparing actual objects, not their calculated
hash code. Something which can help us is that the
Future.isDone()
method returns true if a
task has been cancelled.
import java.util.*; import java.util.concurrent.*; public class ResubmittingScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { /** Default exception handler, always reschedules */ private static final ScheduledExceptionHandler NULL_HANDLER = new ScheduledExceptionHandler() { public boolean exceptionOccurred(Throwable e) { return true; } }; private final Map<Object, FixedRateParameters> runnables = new IdentityHashMap<Object, FixedRateParameters>(); private final ScheduledExceptionHandler handler; /** * @param reschedule when an exception causes a task to be * aborted, reschedule it and notify the * exception listener */ public ResubmittingScheduledThreadPoolExecutor(int poolSize) { this(poolSize, NULL_HANDLER); } public ResubmittingScheduledThreadPoolExecutor( int poolSize, ScheduledExceptionHandler handler) { super(poolSize); this.handler = handler; } private class FixedRateParameters { private Runnable command; private long period; private TimeUnit unit; /** * We do not need initialDelay, since we can set it to period */ public FixedRateParameters(Runnable command, long period, TimeUnit unit) { this.command = command; this.period = period; this.unit = unit; } } public ScheduledFuture<?> scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit) { ScheduledFuture<?> future = super.scheduleAtFixedRate( command, initialDelay, period, unit); runnables.put(future, new FixedRateParameters(command, period, unit)); return future; } protected void afterExecute(Runnable r, Throwable t) { ScheduledFuture future = (ScheduledFuture) r; // future.isDone() is always false for scheduled tasks, // unless there was an exception if (future.isDone()) { try { future.get(); } catch (ExecutionException e) { Throwable problem = e.getCause(); FixedRateParameters parms = runnables.remove(r); if (problem != null && parms != null) { boolean resubmitThisTask = handler.exceptionOccurred(problem); if (resubmitThisTask) { scheduleAtFixedRate(parms.command, parms.period, parms.period, parms.unit); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
We can see how this works by looking at an example that decides to retry five times and then gives up. Such a system would be more robust than just giving up after the first failure, and since we put a limit on the retries, we also avoid livelook issues.
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ResubmittingTest { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService service2 = new ResubmittingScheduledThreadPoolExecutor( 5, new MyScheduledExceptionHandler()); service2.scheduleAtFixedRate( new MyRunnable(), 2, 1, TimeUnit.SECONDS); } private static class MyRunnable implements Runnable { public void run() { if (Math.random() < 0.3) { System.out.println("I have a problem"); throw new IllegalArgumentException("I have a problem"); } System.out.println("I'm happy"); } } /** As an example, we give up after 5 failures. */ private static class MyScheduledExceptionHandler implements ScheduledExceptionHandler { private AtomicInteger problems = new AtomicInteger(); public boolean exceptionOccurred(Throwable e) { e.printStackTrace(); if (problems.incrementAndGet() >= 5) { System.err.println("We give up!"); return false; } System.err.println("Resubmitting task to scheduler"); return true; } } }
That's all for now - hope you enjoyed this newsletter and will find it useful in your work!
Heinz
P.S. Another solution to managing exceptions is to catch them in the tasks themselves, thus not letting them bubble up. In our case we were looking for a general solution to the cancelled task problems.
We are always happy to receive comments from our readers. Feel free to send me a comment via email or discuss the newsletter in our JavaSpecialists Slack Channel (Get an invite here)
We deliver relevant courses, by top Java developers to produce more resourceful and efficient programmers within their organisations.