6.3.2 Thread-Pools

Wenn wir uns auf Threads beschränken, können wir die Techniken des vorangegangenen Beispiels in einer Hilfsklasse ThreadPool zusammenfassen. Diese Klasse übernimmt die Verwaltung der Arbeitsthreads und stellt Methoden zum Einfügen von Arbeitseinheiten in den internen Arbeitsstapel zur Verfügung. Die Anzahl der Arbeitsthreads wählt man entsprechend dem Minimum der verfügbaren CPUs und dem maximalen Parallelisierungsgrad der Anwendung. Die Anzahl der Teilaufgaben soll dann sehr viel größer als die Anzahl der Arbeitsthreads sein, damit eine gute Lastverteilung erreicht wird.

Im letzten Beispiel wurde der Arbeitsstapel nach dem FIFO-Prinzip abgearbeitet, hier wollen wir nun das LIFO-Prinzip anwenden. Daher benutzen wir als Arbeitsstapel einen einfachen Stack aus dem Packet java.util. Die Wahl des Einfüge-/Entnahme-Prinzips hängt natürlich von der jeweiligen Anwendung ab. Wenn die Teilaufgaben voneinander unabhängig sind (wie auch im Primzahl-Beispiel), so spielt das verwendete Prinzip keine Rolle. Hängen die späteren Teilaufgaben von Ergebnissen von früheren Teilaufgaben ab (wie bei manchen Iterationen), so muss der Arbeitsstapel nach dem FIFO-Prinzip verwaltet werden. Wenn aber Ergebnisse von späteren Teilaufgaben für die früheren Aufgaben benötigt werden (wie bei Rekursionen), so ist der Arbeitsstapel nach dem LIFO-Prinzip zu organisieren. In diesen Fällen muss allerdings die Anwendung durch Warten an geeigneten Bedingungsvariablen selbst sicherstellen, dass die jeweiligen benötigten Vorergebnisse fertig gestellt sind, wenn sie benötigt werden. Im nächsten Abschnitt 6.3.3 werden wir für den Fall der Rekursion diese Logik in die RecursionThreads-Klasse fest einbauen.

Wir beginnen mit der Beschreibung der Klasse ThreadPool und stellen anschließend die Hilfsklasse PoolThread vor, bevor wir ein einfaches Beispiel für den Einsatz dieser Klassen besprechen. Der Konstruktor der Klasse ThreadPool nimmt als Parameter die gewünschte Anzahl der zu startenden Threads und startet diese dann in der Variante PoolThread. Natürlich wird auch der Arbeitsstapel jobstack aufgesetzt. Die Methode terminate() wartet, bis der Arbeitsstapel leer ist, und terminiert dann die Arbeitsthreads. terminate() darf erst aufgerufen werden, wenn sicher keine neuen Teilaufgaben mehr erzeugt werden.

import java.util.Stack;

public class ThreadPool {
    static final int SIZE = 3;
    int size;
    PoolThread[] workers;
    int nojobs = 0;
    Stack jobstack;

    public ThreadPool() {
        this(SIZE);
    }

    public ThreadPool(int size) {
        jobstack = new Stack();
        this.size = size;
        workers = new PoolThread[size];
        for (int i=0; i<size; i++) {
            workers[i] = new PoolThread(this);
            workers[i].start();
        }
    }
    public void terminate() {
        while (!jobstack.empty()) {
            try {
                Thread.currentThread().sleep(100);
            }
            catch (InterruptedException e) {
            }
        }
        for (int i=0; i<size; i++) {
            try { 
                while ( workers[i].isAlive() ) {
                    workers[i].interrupt(); 
                    workers[i].join(100);
                }
            }
            catch (InterruptedException e) { 
            }
        }
    }

Für die weitere Benutzung der Klasse ist nur noch die Methode addJob(job) wichtig, mit der ein Objekt job, das das Runnable-Interface implementiert, auf den Arbeitsstapel gelegt wird. Intern wird geprüft, ob es Arbeitsthreads gibt, die nichts zu tun haben (nojobs > 0). Wenn solche existieren, wird einer davon mit notify() informiert. Für Arbeitsthreads, die durch PoolThread implementiert werden, ist die Methode getJob() gedacht. Diese liefert eine Arbeitseinheit, wenn der Arbeitsstapel nicht leer ist, und wartet mit wait(), falls gerade keine Arbeit vorhanden ist.

Die Methoden addJob() und getJob() müssen beide synchronisiert werden, da sie gemeinsam auf den Arbeitsstapel zugreifen und gemeinsam die Variable nojobs modifizieren. Der Arbeitsstapel jobstack muss hier nicht Threadsicher sein (d.h., die Methoden push() und pop() müssen nicht synchronisiert sein), da im Wesentlichen nur innerhalb von addJob() und getJob() der Stack verändert wird. Die Methode terminate() ist nicht synchronisiert und darf es auch nicht sein, da sonst nach ihrem Aufruf keine Jobs mehr entnommen werden könnten. Wenn eine Anwendung sehr viele kleine Arbeitseinheiten erzeugt, entsteht allerdings ein Engpass, dadurch dass addJob() und getJob() an demselben Objekt synchronisieren. Das heißt, wenn eine neue Arbeitseinheit hinzugefügt wird, kann nicht gleichzeitig eine Arbeitseinheit aus dem Stapel entnommen werden.

    public synchronized void addJob(Runnable job) {
        jobstack.push(job);
        System.out.println("Thread["
               +Thread.currentThread().getName()
               +"] adding job" );
        if (nojobs > 0) {
            nojobs--;
            System.out.println("Thread["
               +Thread.currentThread().getName()
               +"] notifying a jobless worker");
            notify();
        }
    }
    public synchronized Runnable getJob() 
                        throws InterruptedException {
        while (jobstack.empty()) {
            nojobs++;
            System.out.println("Thread["
               +Thread.currentThread().getName()
               +"] waiting");
            wait();
            nojobs--;
        }
        return (Runnable)jobstack.pop();
    }
}

Die Klasse PoolThread erweitert die Klasse Thread. Der Konstruktor benötigt als Parameter ein ThreadPool-Objekt, über dessen getJob()-Methode die Arbeitseinheiten geholt werden. Die run()-Methode holt in einer while-Schleife jeweils eine neue Arbeitseinheit und führt deren run()-Methode aus. getJob() blockiert, falls keine Arbeit anliegt. Tritt dabei eine Unterbrechung auf, terminiert die while-Schleife, da dann die Variable running auf false gesetzt wird. Damit terminiert der entsprechende Thread.

class PoolThread extends Thread {
    ThreadPool pool;

    public PoolThread(ThreadPool pool) {
        this.pool = pool;
    }

    public void run() {
        System.out.println( "Thread["
               +Thread.currentThread().getName()
               +"] ready" );
        boolean running = true;
        while (running) {
            try {
                System.out.println( "Thread["
                   +Thread.currentThread().getName()
                   +"] looking for a job" );
                pool.getJob().run();
            }
            catch (InterruptedException e) { 
                  running = false; 
            }
        }
        System.out.println( "Thread["
               +Thread.currentThread().getName()
               +"] terminated" );
    }
}

Im folgenden Beispiel werden einfach 10 `schwierige' Jobs in den Arbeitsstapel gelegt. new ThreadPool() erzeugt einen neuen Pool von Threads, dann werden mit addJob(new HardJob()) die Arbeitseinheiten erzeugt und abgelegt. Mit pool.terminate() wird schließlich auf die Beendigung der Threads gewartet.

public class ExThreadPool {

    public static void main (String[] args) {
        ThreadPool pool = new ThreadPool();
        for (int i=0; i<10; i++) {
            pool.addJob(new HardJob("job-"+i, 5));
        }
        pool.terminate();
    }
}

Die `schwierigen' Jobs werden hier durch die Klasse HardJob implementiert. Die run()-Methode macht weiter nichts, als den Arbeitsthread eine zufällige Zeitdauer schlafen zu legen.

public class HardJob implements Runnable {
    int delay;
    String name;

    public HardJob(String n, int d) {
        name = n; 
        delay = d;
    }

    public String getName() {
        return name;
    }

    public void run() {
        System.out.println("Thread["
               +Thread.currentThread().getName()
               +"] found "+name);
        try {
            Thread.currentThread().sleep(
                   delay*(long)(1000.0*Math.random()));
        }
        catch (InterruptedException e) {
        }
        System.out.println("Thread["
               +Thread.currentThread().getName()
               +"] finished "+name);
    }
}

Die Ausgabe von obigem Beispiel könnte wie folgt aussehen. Vom Hauptthread main werden die 10 Aufgaben hinzugefügt. Dann melden sich drei Threads Thread-4, Thread-5 und Thread-6 als gestartet und beginnen mit der Bearbeitung der Aufgaben. Da der Arbeitsstapel ein Stack ist, werden der Reihe nach die Jobs job-9, job-8 bis job-0 bearbeitet. Schließlich terminieren alle Threads und damit auch das Programm.

Thread[main] adding job
...
Thread[main] adding job
Thread[main] adding job
Thread[main] adding job
Thread[main] adding job
Thread[main] adding job
Thread[Thread-4] ready
Thread[Thread-4] looking for a job
Thread[Thread-4] found job-9
Thread[Thread-5] ready
Thread[Thread-5] looking for a job
Thread[Thread-5] found job-8
Thread[Thread-6] ready
Thread[Thread-6] looking for a job
...
Thread[Thread-4] finished job-1
Thread[Thread-4] looking for a job
Thread[Thread-4] waiting
Thread[Thread-4] terminated
Thread[Thread-5] finished job-3
Thread[Thread-5] looking for a job
Thread[Thread-5] waiting
Thread[Thread-5] terminated
Thread[Thread-6] finished job-0
Thread[Thread-6] looking for a job
Thread[Thread-6] waiting
Thread[Thread-6] terminated

Damit ist die Besprechung der Thread-Pools beendet. Wir wenden uns jetzt dem schwierigeren Problem der Parallelisierung von Rekursionen zu.

6.3.3 Rekursion

Für die Implementierung von rekursiven Algorithmen würde man gerne die rekursiven Methodenaufrufe einfach mit con umgeben, um die Rekursion zu parallelisieren. So würde man zum Beispiel beim Quicksort-Algorithmus (siehe Abbildung 6.1 und Seite [*]) einfach die beiden rekursiven Aufrufe von Quicksort(A,l,q); und Quicksort(A,q+1,r); durch con Quicksort(A,l,q); Quicksort(A,q+1,r); end parallelisieren. Die beiden Methoden würden dann wie in dem Programmfragment in Abbildung 6.1 aussehen.

Abbildung 6.1: Quicksort
\framebox{
\begin{minipage}[t]{5cm}
\begin{tabbing}
// sequenziell \\
{Quicksor...
...sort($A,q+1,r$)}; \- \\
{\bf end}\- \\
{\bf end}
\end{tabbing}\end{minipage}}

Wenn wir nun con wie in Kapitel 3 durch Erzeugen und Starten von zwei Threads implementieren würden, gäbe es folgende Nachteile:

  1. Je nach Größe des zu sortierenden Arrays, und damit der notwendigen Rekursionstiefe, müssten sehr viele Threads erzeugt werden.
  2. Da die zu sortierenden Teilarrays unterschiedliche Größe haben können, würde sich eine schlechte Lastverteilung ergeben.
Punkt 1 kann man dadurch abmildern, dass für Teilarrrays, die eine bestimmte Größe unterschreiten, der normale sequenzielle Algorithmus verwendet wird.

Zur allgemeinen Lösung empfiehlt es sich hier eine Variante des Arbeitsstapel-Verfahrens zu versuchen. Die rekursiven Aufrufe werden dabei als Arbeitseinheiten definiert und auf den Arbeitsstapel abgelegt. Die Anzahl der zu startenden Threads könnte dann wieder der Anzahl der verfügbaren CPUs angepasst werden, wodurch Nachteil 1 erledigt wäre. Entsprechend dem Arbeitsaufwand würden die Threads zwar unterschiedlich lange mit den einzelnen Teilen zu tun haben, aber wenn ein Thread mit einer Aufgabe fertig wäre, würde er sich vom Arbeitsstapel die nächste Aufgabe holen. Dadurch würde eine gute Lastverteilung erreicht werden und auch Nachteil 2 wäre erledigt.

Für den Arbeitsstapel sind allerdings einige Nebenbedingungen zu beachten:

  1. Der Arbeitsstapel muss nach dem LIFO-Prinzip verwaltet werden, da die Ergebnisse der zuletzt erzeugten Aufgaben als Erstes wieder benötigt werden.
  2. Die Threads müssen warten, bis die rekursiven Aufgaben erledigt sind. Dadurch würde sich sehr schnell eine Deadlock-Situation ergeben, da ja nur eine begrenzte Anzahl von Threads bereitstehen. Es empfiehlt sich also, sie in dieser Zeit mit anderen Aufgaben zu beschäftigen.
  3. Zur Verbesserung der Datenlokalität sollte ein Thread möglichst an den Subproblemen arbeiten, die er selbst generiert hat. Nur wenn für einen Thread keine Aufgaben mehr vorhanden sind, soll er von anderen Threads Teilaufgaben übernehmen.
Nebenbedingung 1 lässt sich einfach erfüllen, indem wir den Arbeitsstapel durch einen Stack implementieren. Nebenbedingung 3 lässt sich erfüllen, indem wir für jeden Thread einen eigenen Arbeitsstapel vorsehen. Diese müssen natürlich auch allen anderen Threads zugänglich sein, damit sie sich bei leerem eigenem Arbeitsstapel von dort Aufgaben holen können. Wenn wir die Arbeitsstapel als Deque realisieren, können wir an einem Ende in LIFO-Ordnung und am anderen Ende in FIFO-Ordnung Aufgaben entnehmen. In FIFO-Ordnung holen wir die Aufgaben für fremde Threads und in LIFO-Ordnung entnehmen wir die Aufgaben für den eigenen Thread. Damit werden relativ große Aufgaben zwischen Threads ausgetauscht und relativ kleine Aufgaben bleiben beim erzeugenden Thread.

Nebenbedingung 2 ist am aufwendigsten zu realisieren. Zunächst müssen alle Teilaufgaben mit einer Bedingungsvariablen versehen werden, über die festgestellt werden kann, ob die Aufgabe erledigt ist, und an der bei Bedarf gewartet werden kann, bis sie erledigt ist. Wir werden dazu unsere bekannten Semaphore einsetzen. Das con-Statement werden wir durch eine Methode concurrent() implementieren. Diese wird nicht blind auf die Beendigung der Teilaufgaben warten, sondern sich an der Abarbeitung des eigenen Arbeitsstapels beteiligen.

Im Folgenden wollen wir eine Implementierung vorstellen, die sich an den Artikeln [Lea00b] und [FLR98] orientiert. Die parallelen Teilaufgaben werden jetzt nicht wie im vorangegangenen Abschnitt als Klassen, die das Runnable-Interface implementieren, realisiert, sondern als Klassen, die die abstrakte Klasse RecursionTask implementieren. RecursionTask stellt zum einen alle notwendigen internen Methoden zur Verfügung, wie das Testen auf Beendigung der Aufgabe, und zum anderen die von einer Anwendung benutzten Methoden compute() und concurrent().

public abstract class RecursionTask  {
    public abstract void compute();
    public void concurrent(RecursionTask t1, 
                           RecursionTask t2);
}
Die compute()-Methode ist abstract deklariert und muss auf jeden Fall von der Anwendung implementiert werden. Sie entspricht in etwa der run()-Methode bei Threads. Die Arbeitsstsstapel und die Threads werden in einer Klasse RecursionThreads verwaltet.
public class RecursionThreads extends Thread {
    public RecursionThreads(int number);
    public static void terminate();
    public void invoke(RecursionTask t);
}
Über den Konstruktor wird die gewünschte Anzahl von Arbeitsthreads gestartet. Die Methode terminate() dient dem geordneten Beenden aller Threads. Die invoke()-Methode wird von der Anwendung beim ersten Aufruf des Rekursionssystems verwendet.

Unser Quicksort-Beispiel lässt sich damit wie folgt skizzieren:

public class ParQuickSort extends RecursionTask {

    private static int[] A; 
            // QuickSort uses same Array
    private int l, r;

    private ParQuickSort(int l, int r) {
        this.l = l; this.r = r;
    }

    public ParQuickSort(int[] A) {
        this.A = A;
        l = 0; r = A.length-1;
        workers = new RecursionThreads(number);
    }

    public synchronized void sort() {
        workers.invoke(this); // calls compute
        workers.terminate();
    }

    public void compute() {
        cormenQuicksort(l,r);
    }

    private void cormenQuicksort(int l, int r) {
        if (l >= r) return;
        int q = partition(l,r);
        // sequential recursion 
        if ( (r-l) < thresh ) { 
           cormenQuicksort(l,q); 
           cormenQuicksort(q+1,r);
           return;
        } 
        // recursion in parallel
        ParQuickSort t1 = new ParQuickSort(l,q);
        ParQuickSort t2 = new ParQuickSort(q+1,r);
        concurrent(t1,t2);
    }
}

Mit new ParQuickSort() werden neue Arbeitseinheiten t1 und t2 generiert, die anschließend mit concurrent(t1,t2) nebeneinander ausgeführt werden. In der Methode cormenQuicksort(int l, int r) findet die eigentliche Sortierung statt. Diese Methode wird in der Rekursion immer über die Methode compute() entsprechend den eingestellten linken und rechten Feldindizes aufgerufen. Im sequenziellen Fall wird sie direkt rekursiv von sich selbst aufgerufen. Zum ersten Mal wird compute() über die Methode invoke() benutzt. Die Methoden invoke() und concurrent() terminieren erst, wenn die jeweilige Aufgabe erledigt ist.

Der public-Konstruktor von ParQuickSort wird extern verwendet und der private-Konstruktor wird nur innerhalb der Rekursion eingesetzt. Die Klasse ParQuickSort könnte dann wie folgt verwendet werden:

    int[] A ... 
    new ParQuickSort(A).sort();

Wir kommen nun zur Entwicklung und der Implementierung der Klassen RecursionTask und RecursionThreads.

6.3.3.1 Recursion Tasks und Threads

Das Rekursionssystem besteht wie schon skizziert aus den zwei Klassen RecursionTask und RecursionThreads. Die Klasse RecursionTask implementiert dabei die Arbeitseinheiten und die Klasse RecursionThreads die Arbeitsstapel und die Arbeitsthreads.

Die Klasse RecursionTask definiert einen Semaphor done, mit dem geprüft werden kann, ob die Arbeit erledigt ist (Methode isDone()). Die Klasse Semaphore entspricht unserer in Abschnitt 3.4 entwickelten Klasse Sema, mit dem Unterschied, dass sie die Lösung zu Aufgabe 9 enthält. Falls erforderlich kann mit waitDone() gewartet werden, bis die Aufgabe fertig bearbeitet ist. Die Methode runit(), die von den Arbeitsthreads aufgerufen wird, ruft ihrerseits die von der Anwendung definierte Methode compute() auf und inkrementiert dann den Semaphor. In der privaten Variablen mythread ist der Arbeitsthread vermerkt, der diese Arbeitseinheit bearbeitet. Diese Variable wird mit der Methode setThread() von RecursionThreads gesetzt. Die beiden Methoden concurrent(RecursionTask t1, RecursionTask t2) und concurrent(RecursionTask[] t) stoßen die Verarbeitung von zwei (t1, t2) oder von einem Array t von parallelen rekursiven Arbeitseinheiten an. Die beiden Methoden delegieren diese an die entsprechenden Methoden des Arbeitsthreads.

public abstract class RecursionTask  {

    private Semaphore done = new Semaphore(0);
    private RecursionThreads mythread = null;

    public boolean isDone() {
        return done.isPositive();
    }

    public void waitDone() 
           throws InterruptedException {
        done.P();
    }

    public void runit() {
        compute();
        done.V();
    }

    public abstract void compute();

    public void setThread(RecursionThreads t) {
        mythread = t;
    }

    public  void concurrent(RecursionTask t1, 
                            RecursionTask t2) {
        mythread.concurrent(t1,t2);
    }

    public  void concurrent(RecursionTask[] t) {
        mythread.concurrent(t);
    }
}

Die Klasse RecursionThreads hat eine Doppelfunktion: Sie verwaltet über statische Variablen die Arbeitsstapel und die Arbeitsthreads und stellt selbst die Implementierung der Arbeitsthreads zur Verfügung.

Der Konstruktor unterscheidet, ob der Verwaltungsfall oder der Arbeitsfall vorliegt, an der Frage, ob die Variablen works (die Arbeitsstapel) und threads (die Arbeitsthreads) gesetzt sind oder nicht. Im Verwaltungsfall werden entsprechend der Anzahl der Arbeitsthreads die Arbeitsstapel als Deque (Double Ended Queue) initialisiert und anschließend die Arbeitsthreads erzeugt und gestartet. Die Verwaltungsmethode terminate() kann verwendet werden, um die Arbeitssthreads geordnet herunterzufahren. Dabei wird gewartet, bis alle Arbeitsstapel leer sind und alle Threads ihre Arbeit beendet haben, dann werden die Threads mit interrupt() unterbrochen und mit join() beendet. Der Interrupt wird ggf. wiederholt, wenn der Thread nach einer gewissen Zeit nicht beendet ist. Die letzte Verwaltungsmethode ist invoke(). Sie stellt die Arbeitseinheit in einen zufällig ausgewählten Arbeitsstapel und wartet dann auf die Beendigung der Arbeitseinheit.

import java.io.*;
import java.util.*;

public class RecursionThreads extends Thread {

    private static int numw = 1000;
    private static Deque[] works = null;
    private static int wunit = 0;

    private static int numt = 4;
    private static RecursionThreads[] threads = null;
    private static int tcount = -1;
    private int myw = -1;

    private static boolean running = false;
    private boolean working = false;
    private static Random rand = new Random();

    public RecursionThreads() {
        this(numt);
    }

    public RecursionThreads(int number) {
        numt = number;
        if (numt <= 0) numt = 1;
        tcount++; myw = tcount;
        if ( (works == null) && (threads == null) ) {
           wunit = 0;
           works = new Deque[numt];
           for ( int i = 0; i < numt; i++ ) {
                 works[i] = new Deque(numw);
           }
           threads = new RecursionThreads[numt];
           threads[0] = this; 
           this.start();
           for ( int i = 1; i < numt; i++ ) {
                 RecursionThreads t = 
                        new RecursionThreads();
                 threads[i] = t;
                 t.start();
           }
        }
    }
    public static void terminate() {
        RecursionThreads t = null;
        RecursionTask w = null;
        int i = numt-1;
        while ( i >= 0 ) {
              while ( !works[i].empty() ) {
                      try { sleep(100); } 
                      catch (InterruptedException e) { 
                      }
              }
              i--;
        }
        running = false;
        i = numt-1;
        while ( i > 0 ) {
            try { 
                t = threads[i]; i--;
                while ( t.isWorking() ) {
                    sleep(100);
                }
                while ( t.isAlive() ) {
                        t.interrupt(); t.join(100);
                }
            } catch (InterruptedException e) { }
        }
        threads = null;
        works = null;
        tcount = -1;
    }

    public  void invoke(RecursionTask t) {
        int myt = Math.abs( rand.nextInt() % numt );
        t.setThread(threads[myt]);
        try {
            works[myt].push( t ); wunit++;
            t.waitDone(); 
        } catch (InterruptedException e) { }
    }

Zu den Arbeitsmethoden gehören die Methoden isWorking() sowie run() und die beiden Methoden concurrent(). Über die Methode isWorking lässt sich feststellen, ob der Thread gerade an einer Aufgabe arbeitet oder auf eine neue Aufgabe wartet. Die run()-Methode besteht aus einer while-Schleife, in der zunächst nach einer Arbeit gesucht wird, die anschließend bearbeitet wird (durch Ausführen der runit()-Methode). Der Suchalgorithmus beginnt mit dem eigenen Arbeitsstapel (myw) in LIFO-Ordnung (Methode pop()). Falls dort nichts gefunden wird bzw. auch innerhalb von 100 ms keine Arbeit eintrifft, werden die Arbeitsstapel der anderen Threads in FIFO-Ordnung (Methode get()) durchsucht. Auch hier wird bis zu 100 ms auf eine Arbeitseinheit gewartet, bevor der nächste Arbeitsstapel durchsucht wird. Dieser Algorithmus hat den Nachteil, dass eventuell unnötig lange an einem Arbeitsstapel gewartet wird, bevor zum nächsten weitergegangen wird. Hier könnte man als Verbesserung mit kleineren Wartezeiten beginnen und nach jedem Suchdurchlauf die Wartezeiten erhöhen. Eine andere Verbesserung bestünde in einem direkten Weiterspringen zu einem nicht leeren Arbeitsstapel.

    public boolean isWorking() {
        return working;
    }

    public void run() {
        running = true;
        RecursionTask w;
        Object o;
        while (running) {
            w = null;
            try { // search some work
              int i = myw; 
              while ( w == null && running ) { 
                  if (i == myw) { 
                     o = works[i].pop(100); }
                  else { o = works[i].get(100); 
                  }
                  if (o instanceof RecursionTask) {
                     w = (RecursionTask)o; 
                     w.setThread(this);
                     if (i != myw) {
                        System.out.println(myw 
                               +" has stolen from "+i); 
                     }
                  }
                  i++; if (i >= numt) i -= numt;
              }
            } 
            catch (InterruptedException e) { 
                  running = false; 
            }
            if ( w != null ) { 
               working = true;
               w.runit(); 
               working = false;
            }
        }
    }

Die beiden Methoden concurrent() werden schon von einem Arbeitsthread aus aufgerufen. Sie fügen zunächst die Arbeitseinheiten RecursionTask t1, t2 bzw. t[i] in ihren Arbeitsstapel ein. Dann warten sie, bis diese Arbeitseinheiten fertig bearbeitet sind (Methoden t.isDone()). Während der Wartezeit beteiligen sie sich an der Abarbeitung des eigenen Arbeitsstapels. Im Normalfall werden sie dabei ihre eigenen gerade gestapelten Einheiten abarbeiten. Es kann allerdings auch vorkommen, dass die eigenen Einheiten von einem anderen Thread übernommen wurden und der eigene Arbeitsstapel leer ist. In diesem Fall wird einfach mit sleep() 100 ms lang gewartet und dann wieder geprüft, ob die Arbeiten erledigt sind. Eine Alternative zu sleep() bestünde im Beenden der Schleife und im Aufrufen von waitDone(). Nach der while-Schleife sind zur Illustration noch die waitDone()-Aufrufe eingefügt, obwohl sie hier überflüssig sind. Aus Gründen der Performance ist es vermutlich besser, mit waitDone() auf die Beendigung zu warten, da sonst der Thread unnötig lange mit anderen Arbeiten beschäftigt wird. Das heißt, sleep() sollte besser durch break ersetzt werden, um die while-Schleife zu verlassen. Aus dem gleichen Grund ist es vermutlich auch besser, bei der Entnahme vom Arbeitsstapel (Methode pop(100)) nicht erst 100 ms zu warten, da sonst unnötige Fremdarbeiten angenommen werden, die nur die Terminierung von concurrent() verzögern.

    public  void concurrent(RecursionTask t1, 
                            RecursionTask t2) {
        int myt = myw; 
        try {
            t1.setThread(threads[myt]);
            works[myt].push( t1 ); wunit++;
            t2.setThread(threads[myt]);
            works[myt].push( t2 ); wunit++; 
        } catch (InterruptedException e) { }
        RecursionTask t = null;
        while ( !t1.isDone() || !t2.isDone() ) {
            Object o;
              try { 
                  o = works[myt].pop(100);
                  if (o instanceof RecursionTask) {
                     t = (RecursionTask)o; 
                     if (t != null) t.runit();
                  } else sleep(100);
              } catch (InterruptedException e) { }
        }
        try { 
            t1.waitDone(); t2.waitDone();
        } catch (InterruptedException e) { }
    }

    public  void concurrent(RecursionTask[] rt) {
        int myt = myw; 
        try {
            for (int i = 0; i < rt.length; i++) {
                rt[i].setThread(threads[myt]);
                works[myt].push( rt[i] ); wunit++;
            }
        } catch (InterruptedException e) { }
        RecursionTask t = null;
        Object o = null;
        boolean done = false;
        while ( !done ) {
              done = true;
              for (int i = 0; i < rt.length; i++) {
                  done = done && rt[i].isDone();
              }
              if (done) break;
              try { 
                  o = works[myt].pop(100);
                  if (o instanceof RecursionTask) {
                     t = (RecursionTask)o; 
                     if (t != null) t.runit();
                  } else sleep(100);
              } catch (InterruptedException e) { }

        }
        try { 
            for (int i = 0; i < rt.length; i++) {
                rt[i].waitDone(); 
            }
        } catch (InterruptedException e) { }
    }
}

Bemerkung: Durch das Blockieren bzw. das Schlafenlegen der Threads in den concurrent()-Methoden, besteht theoretisch die Gefahr von Deadlocks. Das Blockieren bzw. Schlafen kann eintreten, wenn in concurrent() neue Aufgaben auf den Arbeitsstapel abgelegt werden, die dann sofort von anderen Threads übernommen werden. Diese Situation kann in Folge dann in jedem weiteren Thread eintreten. Allerdings kann das Übernehmen von Arbeitseinheiten nur in der run()-Methode stattfinden. Beim letzten Thread, bei dem das Blockieren/Schlafen eintritt, kann aber keine andere run()-Methode mehr aktiv werden und Arbeit übernehmen. Dieser letzte Thread muss eventuell die gesamte restliche Arbeit alleine machen, aber es tritt kein Deadlock ein. Nur die Lastverteilung ist in dieser Situation extrem schlecht.

Damit haben wir die beiden Klassen des Rekursionssystems vorgestellt und sehen uns jetzt eine Anwendung in einem parallelen Quicksort-Algorithmus an.

6.3.3.2 Paraller Quicksort

Der Quicksort-Algorithmus sortiert ein Feld von ganzen Zahlen durch Aufteilen des Feldes in zwei Teile und rekursives Sortieren der beiden Teile. Das Aufteilen des Feldes wird von einer Methode partition() vorgenommen (die auch Elemente des Feldes vertauscht). Der im Folgenden verwendete Algorithmus folgt der Version in [CLR90]. Wir parallelisieren den Algorithmus mit Hilfe des oben entwickelten Rekursionssystems, indem wir die rekursive Sortierung der beiden Teilfelder parallel abarbeiten (siehe Abbildung 6.1).

Unsere Implementierung in der Klasse ParQuickSort benutzt zwei Konstruktoren. Einer wird nur intern verwendet und einer dient der externen Anwendung der Klasse. Der externe Konstruktor initialisiert die RecursionThreads, der interne Konstruktor definiert die jeweiligen linken und rechten Feldgrenzen, in denen sortiert werden soll. Mit der externen (public) Methode sort() wird die Sortierung angestoßen. sort() ruft die Methode invoke() auf, die in einem Thread dann die Methode compute() anstößt. Danach ruft die Methode sort() die Methode terminate() auf, um die RekursionThreads herunterzufahren. In der compute()-Methode wird hier wiederum nur die Methode cormenQuicksort() mit den entsprechenden aktuellen Feldgrenzen aufgerufen. Innerhalb von cormenQuicksort() findet dann die Partitionierung des Feldes statt (Methode partition()). In Abhängigkeit von der Größe des restlichen zu sortierenden Feldbereichs wird dann der normale sequenzielle Algorithmus verwendet oder es werden die beiden RekusionTasks erzeugt, die dann mit concurrent() rekursiv parallel ausgeführt werden.

import java.io.*;

public class ParQuickSort extends RecursionTask {

    private static int[] A; 
            // QuickSort uses same Array
    private int l, r;

    private static final int thresh = 1000;
    private int numw = 4;
    private static RecursionThreads workers = null;

    private ParQuickSort(int l, int r) {
        this.l = l;
        this.r = r;
    }

    public ParQuickSort(int[] A) {
        this.A = A;
        l = 0;
        r = A.length-1;
        numw = Math.min(numw,r/thresh);
        if (workers == null) {
            workers = new RecursionThreads(numw);
        }
    }

    public synchronized void sort() {
        workers.invoke(this); 
          // cormenQuicksort(0, A.length-1);
        workers.terminate(); 
        workers = null;
        return;
    }

    public void compute() {
        cormenQuicksort(l,r);
        return;
    }

    private int partition(int l, int r) {
        // if (l >= r) return l;
        int x, t, i, j;
        x = A[l]; i = l-1; j = r+1;
        while (true) {
            do { j--; } while ( A[j] > x );
            do { i++; } while ( A[i] < x );
            if ( i < j ) {
               t = A[i]; A[i] = A[j]; A[j] = t; 
            } else return j;
        }
    }
    private void cormenQuicksort(int l, int r) {
        if (l >= r) return;
        int q = partition(l,r);

        // sequential recursion 
        if ( (r-l) < thresh ) { 
           cormenQuicksort(l,q); 
           cormenQuicksort(q+1,r);
           return;
        } 

        // recursion in parallel
        ParQuickSort t1 = new ParQuickSort(l,q);
        ParQuickSort t2 = new ParQuickSort(q+1,r);
        concurrent(t1,t2);

        return;
    }
}

Die Anwendung der Klasse ParQuickSort zeigt folgendes Programm ExParQuickSort. Zuerst wird ein Feld mit Zufallszahlen besetzt, dieses wird dann sortiert und zum Schluss wird die richtige Sortierung überprüft.

import java.io.*;
import java.util.*;

public class ExParQuickSort {

   public static void main (String[] args) {
     Random rand = new Random();
     PrintWriter out = new PrintWriter(System.out,true);
     int size = 1000000;
     int[] A = new int[size];
     for (int i = 0; i < size; i++) {
         A[i] = Math.abs(rand.nextInt() % size);
     }

     ParQuickSort qs = new ParQuickSort(A);
     qs.sort();

     String ok = "ok";
     for (int i = 0; i < size-1; i++) {
         if ( A[i] > A[i+1] ) {
             ok = "not ok";
         }
     }
     out.println("Sorting done "+ok);
   }
}

Die Ausgabe der sortierten Felder ist hier unnötig, da wir die korrekte Sortierung getestet haben. Die Ausgabe zur Arbeitssuche könnte wie folgt aussehen:

  1 has stolen from 2
  0 has stolen from 1
  3 has stolen from 0
  Sorting done ok

Wir haben mit dem Workpile-Verfahren nun ein neues Verfahren zur Parallelisierung kennen gelernt. Es ist das fexibelste und effizienteste Verfahren für unstrukturierte Aufgaben.


© Universität Mannheim, Rechenzentrum, 2000-2017.

Last modified: Mon Jan 20 23:54:40 CET 2017