Java 5 Threadpools

Vanaf de introductie van Java is threading (het gelijktijdig uitvoeren van verschillende taken) een standaard feature van de taal. Het aanmaken en starten van een thread is eenvoudig maar Java bood tot versie 1.5 weinig hulpmiddelen voor programmatische locking, thread pools, schedulers, wachtrijen e.d. In de loop der tijd zijn er daarom verschillende externe bibliotheken op de markt gekomen (bijvoorbeeld het uitstekende Quartz).

Met Java 1.5 heeft Sun een aantal toevoegingen aan de standaard bibliotheek gedaan ten behoeve van thread management in de vorm van het java.util.concurrent package. Het doel van dit artikel is om met enkele eenvoudige voorbeelden te laten zien hoe gebruik gemaakt kan worden van met name de thread pools en Schedulers in dit package.
Er wordt vanuit gegaan dat de lezer bekend is met de basiseigenschappen van threading. Redenen om wel- of geen threads te gebruiken, thread safe programmeren, en de geavanceerde eigenschappen van de hulpmiddelen worden hier niet bespoken, zie hiervoor de referenties of praat eens met de auteur!


Threads starten in Java
Een thread is op 2 manieren aan te maken in Java:
1. De Thread class extenden en logica aan de run() methode toevoegen.
2. Een Thread instantiëren en aan de constructor een class meegeven die de Runnable implementeert.

De laatste methode hanteren we hier, omdat deze de logica scheidt van het thread management. In dit artikel gebruiken we telkens de volgende Runnable class:

public class FibonacciRunnable implements Runnable {
    private static AtomicInteger newId = new AtomicInteger(1);
 
    private int id;
 
    private long n;
 
    public FibonacciRunnable(long n) {
        this.id = newId.getAndIncrement();
        this.n = n;
    }
 
    private long fib(long n) {
        return (n == 0 || n == 1) ? n : fib(n - 1) + fib(n - 2);
    }
 
    public void run() {
        System.out.println(String.format("Taak %s gestart", id));
        fib(n);
        System.out.println(String.format("Taak %s beeindigd", id));
    }
}

Een Runnable heeft 1 vereiste methode, run(). Deze methode moet de gewenste logica uitvoeren. Bij het uitvoeren van de run() methode in dit voorbeeld wordt het Fibonacci getal uitgerekend. Als we dit in een aparte thread zouden willen laten uitvoeren in Java gebruiken we de volgende code:

long n = 35;
Thread myThread = new Thread(new FibonacciRunnable(n));
myThread.start();

Het starten van de Thread met de start() methode zorgt ervoor dat de run() methode van de Runnable aangeroepen wordt, in de nieuwe Thread. Dit betekent dat de start() methode direct retourneert en de uitvoering van het hoofdprogramma doorgaat.

Tot zover lijkt het starten van een Thread niet erg ingewikkeld. Denk je echter eens het volgende scenario in, je bouwt een applicatie welke taken aangeboden krijgt, bijvoorbeeld van verschillende client applicaties. Het starten van een nieuwe thread voor elke taak is een relatief dure operatie. En is het wel slim om voor alle aangeboden taken een nieuwe thread te starten? Alle lopende threads belasten de CPU, waardoor de throughput niet wordt bevordert. Logischer is het om een beperkt aantal threads te instantiëren en deze vervolgens her te gebruiken voor taken die via een wachtrij worden aangeboden. Dit mechanisme wordt thread pooling genoemd. Er zijn diverse externe bibliotheken beschikbaar om dit te doen, maar Java 1.5 heeft hier nu ook standaard hulpmiddelen voor in de vorm van het Executor framework.

Executor framework
In Java 1.5 is het executor framework toegevoegd in het java.util.concurrent package. Het framework bevat een thread pool en een taak Scheduler implementatie, met de belangrijke eigenschap dat het Thread management verborgen wordt en hierdoor de ontwikkelaar zich kan concentreren op applicatie-logica.

In dit volgende paragrafen zal de eerder gedefinieerde Runnable telkens gebruikt worden als voorbeeld taak. Het executor framework biedt ook de mogelijkheid om een taak die de Callable interface implementeert op analoge wijze uit te voeren. Een Callable kan, in tegenstelling tot een Runnable, een waarde retourneren of een checked exception gooien na uitvoering. De laatste voorbeeld paragraaf demonstreert het gebruik van een Callable.

Executor Threadpool
Een thread pool werkt met een x aantal threads om taken uit te voeren en een wachtrij (queue) implementatie waarin de aangeboden taken worden geplaatst. Een aangeboden taak wordt in de wachtrij geplaatst totdat er een thread beschikbaar komt om de taak uit te voeren.
Het volgende voorbeeld laat zien hoe de eerder gedefinieerde Runnable taak via een executor uitgevoerd kan worden:

// Maak een wachtrij voor taken
BlockingQueue<Runnable> workqueue = new LinkedBlockingQueue<Runnable>();
 
// Maak de thread pool
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2,              // De omvang van de core pool (aantal threads)
    3,              // De maximale pool omvang (aantal threads)
    60,             // Inactieve threads blijven 60s actief (keep alive time)
    TimeUnit.SECONDS,
    workqueue);     // De te gebruiken wachtrij
 
// Fibonacci nummer
int n = 35;
 
// Plaats een Fibonacci taak 40 maal in de wachtrij
for (int i = 0; i <40; i++) {
    FibonacciRunnable myRunnable = new FibonacciRunnable(n);
    executor.execute(myRunnable);
}
 
// Bij het beëindigen van het programma sluiten we de scheduler netjes af
executor.shutdown();

In dit voorbeeld wordt er eerst een ThreadPoolExecutor aangemaakt, welke gebruik maakt van een LinkedBlockingQueue als wachtrij (een First In First Out wachtrij met onbeperkte capaciteit), de minimale thread pool omvang is 2 threads (dit is de omvang van de core pool) en een maximale omvang van 3 (maximum van de pool). Indien er meer dan het minimum aantal threads (2) beschikbaar zijn en deze geen taken uitvoeren (dus idle zijn) worden deze 60 seconden beschikbaar gehouden na het beëindigen van een taak (keepAliveTime).
De for-loop zorgt ervoor dat de FibonacciRunnable 40 keer in de wachtrij wordt geplaatst met de execute() methode. De Runnable wordt niet direct uitgevoerd, wat betekend dat bij het verlaten van de loop de wachtrij met taken gevuld is en deze stuk voor stuk door de threads van de executor worden uitgevoerd. de aanroep van shutdown() zorgt ervoor dat de thread pool netjes wordt afgesloten, waarbij eerst alle reeds wachtende taken nog worden uitgevoerd. Dit betekent dat de shutdown() methode dus blokkeert totdat alle taken klaar zijn.

Nog een paar opmerkingen bij dit voorbeeld:

  • Het is mogelijk om een wachtrij te gebruiken met een beperkte capaciteit, indien de wachtrij vol zitten worden de taak geweigerd, in dat geval wordt een RejectionHandler gebruikt om logica aan deze situatie te koppelen. De RejectionHandler kan zelf geschreven worden of er kan een standaard handler worden gebruikt.
  • Het aanmaken van threads gebeurt met behulp van een zogenaamde ThreadFactory. Indien je als ontwikkelaar invloed wilt uitoefenen over het aanmaken van threads (bijvoorbeeld om de prioriteit in te kunnen stellen) kan er een eigen ThreadFactory implementatie worden meegegeven aan de Executor.
  • In dit voorbeeld wordt gelijk na het in de wachtrij plaatsen van taken shutdown() aangeroepen. In een eigen applicatie zal een Executor vaker worden gebruikt en pas bij het beëindigen van de applicatie worden afgesloten.
  • In plaats van de shutdown() methode kan ook shutdownNow() worden gebruikt, deze zorgt ervoor dat de actieve taken worden afgebroken (deze dienen dat te ondersteunen) en wachtende taken worden niet meer uitgevoerd.

ScheduledExecutor
In de voorgaande paragraaf is de thread pool implementatie van het executor framework besproken. De aangeboden taken worden uitgevoerd zodra er een vrije thread beschikbaar komt. Het kan ook voorkomen dat je taken hebt die je niet direct wilt uitvoeren maar na een bepaalde tijd of dat bepaalde taken herhaaldelijk uitgevoerd moeten worden. In Java bestond sinds versie 1.3 hier reeds een hulpmiddel voor, java.util.Timer. Deze class maakt gebruik van een thread om aangeboden taken op bepaalde momenten, al dan niet herhalend, uit te voeren. De Timer class is simpel in het gebruik maar heeft echter een aantal nadelen:

  • Er wordt maar 1 thread gebruikt, indien er 2 taken zijn die op hetzelfde moment uitgevoerd moeten worden zal er 1 moeten wachten tot de ander de thread weer vrijgeeft.
  • Taken moeten de TimerTask class extenden, waardoor het niet meer mogelijk is om (door het single inheritance principe van Java) een eigen class te extenden.
  • Doordat een TimerTask de Runnable interface implementeert kunnen taken geen waardes worden geretourneerd door taken (zoals bij de Callable interface wel kan).

De ScheduledExecutor van het Executor framework lost deze problemen op. De ScheduledExector extend de Executor thread pool (dit maakt dat de scheduler meerdere threads kan gebruiken). De taken kunnen de Runnable of de Callable interface implementeren (een return waarde is dus mogelijk). Het volgende voorbeeld toont het (tweemaal) uitgesteld uitvoeren van de eerder beschreven taak:

// Maak de scheduler aan, de enige parameter is de omvang van de core pool (2)
ScheduledThreadPoolExecutor scheduledExecutor =
    new ScheduledThreadPoolExecutor(2);
 
// Fibonacci nummer
int n = 40;
 
// Maak een FibonacciRunnable en voer deze na 500 milliseconden uit
FibonacciRunnable myRunnable = new FibonacciRunnable(n);
scheduledExecutor.schedule(myRunnable, 500, TimeUnit.MILLISECONDS);
 
// Maak een FibonacciRunnable en voer deze na 700 milliseconden uit
myRunnable = new FibonacciRunnable(n);
scheduledExecutor.schedule(myRunnable, 700, TimeUnit.MILLISECONDS);
 
// Bij het beëindigen van het programma sluiten we de scheduler netjes af
scheduledExecutor.shutdown();

Vergeleken met het voorbeeld uit de vorige paragraaf zijn er een aantal verschillen:

  • De executor wordt aangemaakt met enkel een core omvang. Het maximum aantal threads is bij de ScheduledExecutor altijd gelijk aan de core omvang.
  • Er is geen keepAliveTime, omdat er geen verschil is in het maximum en core aantal threads is dit niet nodig.

Bij het aanroepen van shutdown() worden alle nog klaarstaande taken uitgevoerd, dit betekent in het voorbeeld dat beide taken, die nog niet actief zijn bij het aanroepen van shutdown(), nog uitgevoerd gaan worden. De shutdown() methode blokkeert totdat beide taken klaar zijn.

Nog een paar aanvullende opmerkingen bij dit voorbeeld:

  • Het is mogelijk om met de methodes scheduleAtFixedRate() en scheduleWithFixedDelay() taken herhaaldelijk uit te voeren.
  • Het schedulen van een taak retourneert een ScheduledFuture object, dit object kan gebruikt worden om de status van de taak op te vragen (wanneer uitvoeren, reeds uitgevoerd), de taak te annuleren of, indien de taak een Callable is, het resultaat van de taak op te vragen.
  • Met continueExistingPeriodicTaskAfterShutdownPolicy en executeExistingDelayedTaskAfterShutdownPolicy kan het gedrag van de executor in het geval van herhalende of uitgestelde taken bij een shutdown worden bepaald.
  • Een ScheduledExecutor is iets ingewikkelder in het gebruik dan een Timer, indien de genoemde nadelen niet van toepassing zijn op de applicatie die je ontwikkelt kan de Timer implementatie ook een goede keuze zijn.

Callables
In plaats van een taak de Runnable interface te laten implementeren kan deze ook de Callable interface implementeren. Dit zou voor het Fibonacci voorbeeld ook logischer zijn geweest, aangezien deze een waarde uitrekent die we graag terug willen geven aan het hoofdprogramma. Als we de eerder gebruikte Runnable herschrijven zodat deze de Callable interface implementeert krijgen we het volgende:

 public class FibonacciCallable implements Callable<Long> {
    private static AtomicInteger newId = new AtomicInteger(1);
 
    private int id;
 
    private long n;
 
    public FibonacciCallable(long n) {
        this.id = newId.getAndIncrement();
        this.n = n;
    }
 
    private long fib(long n) {
        return (n == 0 || n == 1) ? n : fib(n - 1) + fib(n - 2);
    }
 
    public Long call() throws Exception {
        System.out.println(String.format("Taak %s gestart", id));
        long result = fib(n);
        System.out.println(String.format("Taak %s beeindigd", id));
        return result;
    }
}

Het enige verschil met de oorspronkelijke Runnable is dat nu de methode call() wordt geïmplementeerd, welke een Long retourneert. Om de Callable in een thread uit te voeren gebruiken we de volgende code:

// Maak een wachtrij voor taken
BlockingQueue<Runnable> workqueue = new LinkedBlockingQueue<Runnable>();
 
// Maak de thread pool
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, // De omvang van de core pool (aantal threads)
    3, // De maximale pool omvang (aantal threads)
    60, // De tijd dat inactieve threads actief blijven (keep alive time)
    TimeUnit.SECONDS, workqueue); // De te gebruiken wachtrij
 
// Fibonacci nummer
int n = 35;
 
// Plaats een Fibonacci callable in de wachtrij
FibonacciCallable myCallable = new FibonacciCallable(n);
Future<Long> future = executor.submit(myCallable);
 
try {
    // Door get op de Future aan te roepen wordt de applicatie
    // geblokkeerd totdat de taak klaar is en zijn resultaat terug
    // heeft gegeven.
    Long callableResult = future.get();
 
    // Druk het resultaat op het scherm af
    System.out.println(callableResult);
 
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
 
// Bij het beëindigen van het programma sluiten we de scheduler netjes af
executor.shutdown();

Dit voorbeeld gebruikt dezelfde executor als uit het eerste thread pool voorbeeld. Nu wordt echter de methode submit() gebruikt om de callable in de wachtrij te plaatsen. Deze methode retourneert een object dat de Future interface implementeert. De Future geeft ons de mogelijkheid om te controleren of de taak klaar is, deze eventueel te annuleren en deze kan gebruikt worden om het resultaat van de Callable taak op te vragen. Dit doen we met behulp van de get() methode. Op het moment dat we de get() methode aanroepen hoeft de taak nog niet uitgevoerd te zijn, get blokkeert daarom totdat de taak klaar is en retourneert vervolgens het resultaat van de Callable.

Opmerkingen bij dit voorbeeld:

  • Een Callable kan een checked exception gooien bij het uitvoeren van de taak. Indien dit gebeurt wordt deze door de Future opgevangen en bij Indien het aanroepen van de get() methode worden doorgegooid, gewrapt in een ExecutionException.
  • Bij een ScheduledExecutor kan op analoge wijze ook gebruik worden gemaakt van een Callable taak.
  • Er is ook een implementatie van de ExecutorService.submit() methode waar een Runnable aan kan worden meegegeven. Deze methode retourneert ook een Future welke dezelfde mogelijkheden heeft als een Future bij een Callable. De get() methode van de Future zorgt er ook voor dat de uitvoering van de thread waarin deze wordt aangeroepen blokkeert totdat de taak klaar is. De get() methode retourneert voor een Runnable null of een bij het aanroepen van submit() gespecificeerd object.

Executor hulpmiddelen
Het Executor framework bevat een utility class, Excutors. Met de methodes in deze class kunnen verschillende standaard Executor implementatie eenvoudig aangemaakt worden.

Conclusie
Met het executor framework is het gebruik van threads in (standaard) Java een stuk handiger geworden. Het framework is flexibel opgezet waardoor het goed toepasbaar is voor applicaties waar verder geen uitgebreide taakmanagement mogelijkheden of een taakpersistentie mechanisme nodig zijn. Indien dergelijke eigenschappen gewenst zijn is een externe bibliotheek zoals het Quartz framework van OpenSymphony aan te raden.

Referenties


Reageer

RSS feed for comments on this post · TrackBack URI