001/*
002 * $Id$
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.atomic.AtomicInteger;
014
015import mpi.Comm;
016import mpi.MPIException;
017
018import org.apache.logging.log4j.Logger;
019import org.apache.logging.log4j.LogManager;
020
021import edu.jas.kern.MPIEngine;
022import edu.jas.poly.ExpVector;
023import edu.jas.poly.GenPolynomial;
024import edu.jas.structure.RingElem;
025import edu.jas.util.DistHashTableMPI;
026import edu.jas.util.MPIChannel;
027import edu.jas.util.Terminator;
028import edu.jas.util.ThreadPool;
029
030
031/**
032 * Groebner Base distributed hybrid algorithm with MPI. Implements a distributed
033 * memory with multi-core CPUs parallel version of Groebner bases with MPI.
034 * Using pairlist class, distributed multi-threaded tasks do reduction, one
035 * communication channel per remote node.
036 * @param <C> coefficient type
037 * @author Heinz Kredel
038 */
039
040public class GroebnerBaseDistributedHybridMPI<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
041
042
043    private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedHybridMPI.class);
044
045
046    public final boolean debug = logger.isDebugEnabled();
047
048
049    /**
050     * Number of threads to use.
051     */
052    protected final int threads;
053
054
055    /**
056     * Default number of threads.
057     */
058    protected static final int DEFAULT_THREADS = 2;
059
060
061    /**
062     * Number of threads per node to use.
063     */
064    protected final int threadsPerNode;
065
066
067    /**
068     * Default number of threads per compute node.
069     */
070    protected static final int DEFAULT_THREADS_PER_NODE = 1;
071
072
073    /**
074     * Pool of threads to use.
075     */
076    //protected final ExecutorService pool; // not for single node tests
077    protected transient final ThreadPool pool;
078
079
080    /*
081     * Underlying MPI engine.
082     */
083    protected transient final Comm engine;
084
085
086    /**
087     * Message tag for pairs.
088     */
089    public static final int pairTag = GroebnerBaseDistributedHybridEC.pairTag.intValue();
090
091
092    /**
093     * Message tag for results.
094     */
095    public static final int resultTag = GroebnerBaseDistributedHybridEC.resultTag.intValue();
096
097
098    /**
099     * Message tag for acknowledgments.
100     */
101    public static final int ackTag = GroebnerBaseDistributedHybridEC.ackTag.intValue();
102
103
104    /**
105     * Constructor.
106     */
107    public GroebnerBaseDistributedHybridMPI() throws IOException {
108        this(DEFAULT_THREADS);
109    }
110
111
112    /**
113     * Constructor.
114     * @param threads number of threads to use.
115     */
116    public GroebnerBaseDistributedHybridMPI(int threads) throws IOException {
117        this(threads, new ThreadPool(threads));
118    }
119
120
121    /**
122     * Constructor.
123     * @param threads number of threads to use.
124     * @param threadsPerNode threads per node to use.
125     */
126    public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode) throws IOException {
127        this(threads, threadsPerNode, new ThreadPool(threads));
128    }
129
130
131    /**
132     * Constructor.
133     * @param threads number of threads to use.
134     * @param pool ThreadPool to use.
135     */
136    public GroebnerBaseDistributedHybridMPI(int threads, ThreadPool pool) throws IOException {
137        this(threads, DEFAULT_THREADS_PER_NODE, pool);
138    }
139
140
141    /**
142     * Constructor.
143     * @param threads number of threads to use.
144     * @param threadsPerNode threads per node to use.
145     * @param pl pair selection strategy
146     */
147    public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode, PairList<C> pl)
148                    throws IOException {
149        this(threads, threadsPerNode, new ThreadPool(threads), pl);
150    }
151
152
153    /**
154     * Constructor.
155     * @param threads number of threads to use.
156     * @param threadsPerNode threads per node to use.
157     */
158    public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode, ThreadPool pool)
159                    throws IOException {
160        this(threads, threadsPerNode, pool, new OrderedPairlist<C>());
161    }
162
163
164    /**
165     * Constructor.
166     * @param threads number of threads to use.
167     * @param threadsPerNode threads per node to use.
168     * @param pool ThreadPool to use.
169     * @param pl pair selection strategy
170     */
171    public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl)
172                    throws IOException {
173        super(new ReductionPar<C>(), pl);
174        int size = 0;
175        try {
176            engine = MPIEngine.getCommunicator();
177            size = engine.Size();
178        } catch (MPIException e) {
179            throw new IOException(e);
180        }
181        if (size < 2) {
182            throw new IllegalArgumentException("Minimal 2 MPI processes required, not " + size);
183        }
184        if (threads != size || pool.getNumber() != size) {
185            throw new IllegalArgumentException("threads != size: " + threads + " != " + size + ", #pool "
186                            + pool.getNumber());
187        }
188        this.threads = threads;
189        this.pool = pool;
190        this.threadsPerNode = threadsPerNode;
191        //logger.info("generated pool: " + pool);
192    }
193
194
195    /**
196     * Cleanup and terminate.
197     */
198    @Override
199    public void terminate() {
200        if (pool == null) {
201            return;
202        }
203        //pool.terminate();
204        pool.cancel();
205    }
206
207
208    /**
209     * Distributed Groebner base.
210     * @param modv number of module variables.
211     * @param F polynomial list.
212     * @return GB(F) a Groebner base of F or null, if a IOException occurs or on
213     *         MPI client part.
214     */
215    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
216        try {
217            if (engine.Rank() == 0) {
218                return GBmaster(modv, F);
219            }
220        } catch (MPIException e) {
221            logger.info("GBmaster: " + e);
222            e.printStackTrace();
223            return null;
224        } catch (IOException e) {
225            logger.info("GBmaster: " + e);
226            e.printStackTrace();
227            return null;
228        }
229        pool.terminate(); // not used on clients
230        try {
231            clientPart(0); // only 0
232        } catch (IOException e) {
233            logger.info("clientPart: " + e);
234            e.printStackTrace();
235        } catch (MPIException e) {
236            logger.info("clientPart: " + e);
237            e.printStackTrace();
238        }
239        return null;
240    }
241
242
243    /**
244     * Distributed hybrid Groebner base.
245     * @param modv number of module variables.
246     * @param F polynomial list.
247     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
248     */
249    public List<GenPolynomial<C>> GBmaster(int modv, List<GenPolynomial<C>> F) throws MPIException,
250                    IOException {
251        long t = System.currentTimeMillis();
252        GenPolynomial<C> p;
253        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
254        PairList<C> pairlist = null;
255        boolean oneInGB = false;
256        //int l = F.size();
257        int unused = 0;
258        ListIterator<GenPolynomial<C>> it = F.listIterator();
259        while (it.hasNext()) {
260            p = it.next();
261            if (p.length() > 0) {
262                p = p.monic();
263                if (p.isONE()) {
264                    oneInGB = true;
265                    G.clear();
266                    G.add(p);
267                    //return G; must signal termination to others
268                }
269                if (!oneInGB) {
270                    G.add(p);
271                }
272                if (pairlist == null) {
273                    //pairlist = new OrderedPairlist<C>(modv, p.ring);
274                    pairlist = strategy.create(modv, p.ring);
275                    if (!p.ring.coFac.isField()) {
276                        throw new IllegalArgumentException("coefficients not from a field");
277                    }
278                }
279                // theList not updated here
280                if (p.isONE()) {
281                    unused = pairlist.putOne();
282                } else {
283                    unused = pairlist.put(p);
284                }
285            } else {
286                //l--;
287            }
288        }
289        //if (l <= 1) {
290        //return G; must signal termination to others
291        //}
292        logger.info("pairlist " + pairlist + ": " + unused);
293
294        logger.debug("looking for clients");
295        DistHashTableMPI<Integer, GenPolynomial<C>> theList = new DistHashTableMPI<Integer, GenPolynomial<C>>(
296                        engine);
297        theList.init();
298
299        List<GenPolynomial<C>> al = pairlist.getList();
300        for (int i = 0; i < al.size(); i++) {
301            // no wait required
302            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
303            if (nn != null) {
304                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
305            }
306        }
307
308        Terminator finner = new Terminator((threads - 1) * threadsPerNode);
309        HybridReducerServerMPI<C> R;
310        logger.info("using pool = " + pool);
311        for (int i = 1; i < threads; i++) {
312            MPIChannel chan = new MPIChannel(engine, i); // closed in server
313            R = new HybridReducerServerMPI<C>(i, threadsPerNode, finner, chan, theList, pairlist);
314            pool.addJob(R);
315            //logger.info("server submitted " + R);
316        }
317        logger.info("main loop waiting " + finner);
318        finner.waitDone();
319        int ps = theList.size();
320        logger.info("#distributed list = " + ps);
321        // make sure all polynomials arrived: not needed in master
322        G = pairlist.getList();
323        if (ps != G.size()) {
324            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
325        }
326        for (GenPolynomial<C> q : theList.getValueList()) {
327            if (q != null && !q.isZERO()) {
328                logger.debug("final q = " + q.leadingExpVector());
329            }
330        }
331        logger.debug("distributed list end");
332        long time = System.currentTimeMillis();
333        List<GenPolynomial<C>> Gp;
334        Gp = minimalGB(G); // not jet distributed but threaded
335        time = System.currentTimeMillis() - time;
336        logger.debug("parallel gbmi time = " + time);
337        G = Gp;
338        logger.info("server theList.terminate() " + theList.size());
339        theList.terminate();
340        t = System.currentTimeMillis() - t;
341        logger.info("server GB end, time = " + t + ", " + pairlist.toString());
342        return G;
343    }
344
345
346    /**
347     * GB distributed client.
348     * @param rank of the MPI where the server runs on.
349     * @throws IOException
350     */
351    public void clientPart(int rank) throws IOException, MPIException {
352        if (rank != 0) {
353            throw new UnsupportedOperationException("only master at rank 0 implemented: " + rank);
354        }
355        Comm engine = MPIEngine.getCommunicator();
356
357        DistHashTableMPI<Integer, GenPolynomial<C>> theList = new DistHashTableMPI<Integer, GenPolynomial<C>>(
358                        engine);
359        theList.init();
360
361        MPIChannel chan = new MPIChannel(engine, rank);
362
363        ThreadPool pool = new ThreadPool(threadsPerNode);
364        logger.info("client using pool = " + pool);
365        for (int i = 0; i < threadsPerNode; i++) {
366            HybridReducerClientMPI<C> Rr = new HybridReducerClientMPI<C>(chan, theList); // i
367            pool.addJob(Rr);
368        }
369        if (debug) {
370            logger.info("clients submitted");
371        }
372        pool.terminate();
373        logger.info("client pool.terminate()");
374        chan.close();
375        theList.terminate();
376        return;
377    }
378
379
380    /**
381     * Minimal ordered groebner basis.
382     * @param Fp a Groebner base.
383     * @return a reduced Groebner base of Fp.
384     */
385    @SuppressWarnings("unchecked")
386    @Override
387    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
388        GenPolynomial<C> a;
389        ArrayList<GenPolynomial<C>> G;
390        G = new ArrayList<GenPolynomial<C>>(Fp.size());
391        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
392        while (it.hasNext()) {
393            a = it.next();
394            if (a.length() != 0) { // always true
395                // already monic  a = a.monic();
396                G.add(a);
397            }
398        }
399        if (G.size() <= 1) {
400            return G;
401        }
402
403        ExpVector e;
404        ExpVector f;
405        GenPolynomial<C> p;
406        ArrayList<GenPolynomial<C>> F;
407        F = new ArrayList<GenPolynomial<C>>(G.size());
408        boolean mt;
409
410        while (G.size() > 0) {
411            a = G.remove(0);
412            e = a.leadingExpVector();
413
414            it = G.listIterator();
415            mt = false;
416            while (it.hasNext() && !mt) {
417                p = it.next();
418                f = p.leadingExpVector();
419                mt = e.multipleOf(f);
420            }
421            it = F.listIterator();
422            while (it.hasNext() && !mt) {
423                p = it.next();
424                f = p.leadingExpVector();
425                mt = e.multipleOf(f);
426            }
427            if (!mt) {
428                F.add(a);
429            } else {
430                // System.out.println("dropped " + a.length());
431            }
432        }
433        G = F;
434        if (G.size() <= 1) {
435            return G;
436        }
437        Collections.reverse(G); // important for lex GB
438
439        MiMPIReducerServer<C>[] mirs = (MiMPIReducerServer<C>[]) new MiMPIReducerServer[G.size()];
440        int i = 0;
441        F = new ArrayList<GenPolynomial<C>>(G.size());
442        while (G.size() > 0) {
443            a = G.remove(0);
444            // System.out.println("doing " + a.length());
445            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
446            R.addAll(G);
447            R.addAll(F);
448            mirs[i] = new MiMPIReducerServer<C>(R, a);
449            pool.addJob(mirs[i]);
450            i++;
451            F.add(a);
452        }
453        G = F;
454        F = new ArrayList<GenPolynomial<C>>(G.size());
455        for (i = 0; i < mirs.length; i++) {
456            a = mirs[i].getNF();
457            F.add(a);
458        }
459        return F;
460    }
461
462}
463
464
465/**
466 * Distributed server reducing worker proxy threads.
467 * @param <C> coefficient type
468 */
469
470class HybridReducerServerMPI<C extends RingElem<C>> implements Runnable {
471
472
473    private static final Logger logger = LogManager.getLogger(HybridReducerServerMPI.class);
474
475
476    public final boolean debug = logger.isDebugEnabled();
477
478
479    private final Terminator finner;
480
481
482    private final MPIChannel pairChannel;
483
484
485    //protected transient final Comm engine;
486
487
488    private final DistHashTableMPI<Integer, GenPolynomial<C>> theList;
489
490
491    private final PairList<C> pairlist;
492
493
494    private final int threadsPerNode;
495
496
497    final int rank;
498
499
500    /**
501     * Message tag for pairs.
502     */
503    public static final int pairTag = GroebnerBaseDistributedHybridMPI.pairTag;
504
505
506    /**
507     * Constructor.
508     * @param r MPI rank of partner.
509     * @param tpn number of threads per node
510     * @param fin terminator
511     * @param chan MPIChannel
512     * @param dl distributed hash table
513     * @param L ordered pair list
514     */
515    HybridReducerServerMPI(int r, int tpn, Terminator fin, MPIChannel chan,
516                    DistHashTableMPI<Integer, GenPolynomial<C>> dl, PairList<C> L) {
517        rank = r;
518        threadsPerNode = tpn;
519        finner = fin;
520        this.pairChannel = chan;
521        theList = dl;
522        pairlist = L;
523        //logger.info("reducer server created " + this);
524    }
525
526
527    /**
528     * Work loop.
529     * @see java.lang.Runnable#run()
530     */
531    @Override
532    @SuppressWarnings("unchecked")
533    public void run() {
534        //logger.info("reducer server running with " + engine);
535        // try {
536        //     pairChannel = new MPIChannel(engine, rank); //,pairTag
537        // } catch (IOException e) {
538        //     e.printStackTrace();
539        //     return;
540        // } catch (MPIException e) {
541        //     e.printStackTrace();
542        //     return;
543        // }
544        if (logger.isInfoEnabled()) {
545            logger.info("reducer server running: pairChannel = " + pairChannel);
546        }
547        // record idle remote workers (minus one?)
548        //finner.beIdle(threadsPerNode-1);
549        finner.initIdle(threadsPerNode);
550        AtomicInteger active = new AtomicInteger(0);
551
552        // start receiver
553        HybridReducerReceiverMPI<C> receiver = new HybridReducerReceiverMPI<C>(rank, finner, active,
554                        pairChannel, theList, pairlist);
555        receiver.start();
556
557        Pair<C> pair;
558        //boolean set = false;
559        boolean goon = true;
560        //int polIndex = -1;
561        int red = 0;
562        int sleeps = 0;
563
564        // while more requests
565        while (goon) {
566            // receive request if thread is reported incactive
567            logger.debug("receive request");
568            Object req = null;
569            try {
570                req = pairChannel.receive(pairTag);
571                //} catch (InterruptedException e) {
572                //goon = false;
573                //e.printStackTrace();
574            } catch (IOException e) {
575                goon = false;
576                e.printStackTrace();
577            } catch (MPIException e) {
578                e.printStackTrace();
579                return;
580            } catch (ClassNotFoundException e) {
581                goon = false;
582                e.printStackTrace();
583            }
584            logger.debug("received request, req = " + req);
585            if (req == null) {
586                goon = false;
587                break;
588            }
589            if (!(req instanceof GBTransportMessReq)) {
590                goon = false;
591                break;
592            }
593
594            // find pair and manage termination status
595            logger.debug("find pair");
596            while (!pairlist.hasNext()) { // wait
597                if (!finner.hasJobs() && !pairlist.hasNext()) {
598                    goon = false;
599                    break;
600                }
601                try {
602                    sleeps++;
603                    if (sleeps % 3 == 0) {
604                        logger.info("waiting for reducers, remaining = " + finner.getJobs());
605                    }
606                    Thread.sleep(100);
607                } catch (InterruptedException e) {
608                    goon = false;
609                    break;
610                }
611            }
612            if (!pairlist.hasNext() && !finner.hasJobs()) {
613                logger.info("termination detection: no pairs and no jobs left");
614                goon = false;
615                break; //continue; //break?
616            }
617            finner.notIdle(); // before pairlist get!!
618            pair = pairlist.removeNext();
619            // send pair to client, even if null
620            if (debug) {
621                logger.info("active count = " + active.get());
622                logger.info("send pair = " + pair);
623            }
624            GBTransportMess msg = null;
625            if (pair != null) {
626                msg = new GBTransportMessPairIndex(pair);
627            } else {
628                msg = new GBTransportMess(); //not End(); at this time
629                // goon ?= false;
630            }
631            try {
632                red++;
633                pairChannel.send(pairTag, msg);
634                @SuppressWarnings("unused")
635                int a = active.getAndIncrement();
636            } catch (IOException e) {
637                e.printStackTrace();
638                goon = false;
639                break;
640            } catch (MPIException e) {
641                e.printStackTrace();
642                goon = false;
643                break;
644            }
645            //logger.debug("#distributed list = " + theList.size());
646        }
647        logger.info("terminated, send " + red + " reduction pairs");
648
649        /*
650         * send end mark to clients
651         */
652        logger.debug("send end");
653        try {
654            for (int i = 0; i < threadsPerNode; i++) { // -1
655                pairChannel.send(pairTag, new GBTransportMessEnd());
656            }
657            logger.info("sent end to clients");
658            // send also end to receiver, no more
659            //pairChannel.send(resultTag, new GBTransportMessEnd(), engine.Rank());
660        } catch (IOException e) {
661            if (logger.isDebugEnabled()) {
662                e.printStackTrace();
663            }
664        } catch (MPIException e) {
665            e.printStackTrace();
666        }
667        int d = active.get();
668        if (d > 0) {
669            logger.info("remaining active tasks = " + d);
670        }
671        receiver.terminate();
672        //logger.info("terminated, send " + red + " reduction pairs");
673        pairChannel.close();
674        logger.info("redServ pairChannel.close()");
675        finner.release();
676    }
677}
678
679
680/**
681 * Distributed server receiving worker thread.
682 * @param <C> coefficient type
683 */
684
685class HybridReducerReceiverMPI<C extends RingElem<C>> extends Thread {
686
687
688    private static final Logger logger = LogManager.getLogger(HybridReducerReceiverMPI.class);
689
690
691    public final boolean debug = logger.isDebugEnabled();
692
693
694    private final DistHashTableMPI<Integer, GenPolynomial<C>> theList;
695
696
697    private final PairList<C> pairlist;
698
699
700    private final MPIChannel pairChannel;
701
702
703    final int rank;
704
705
706    private final Terminator finner;
707
708
709    //private final int threadsPerNode;
710
711
712    private final AtomicInteger active;
713
714
715    private volatile boolean goon;
716
717
718    /**
719     * Message tag for results.
720     */
721    public static final int resultTag = GroebnerBaseDistributedHybridMPI.resultTag;
722
723
724    /**
725     * Message tag for acknowledgments.
726     */
727    public static final int ackTag = GroebnerBaseDistributedHybridMPI.ackTag;
728
729
730    /**
731     * Constructor.
732     * @param r MPI rank of partner.
733     * @param fin terminator
734     * @param a active remote tasks count
735     * @param pc tagged socket channel
736     * @param dl distributed hash table
737     * @param L ordered pair list
738     */
739    HybridReducerReceiverMPI(int r, Terminator fin, AtomicInteger a, MPIChannel pc,
740                    DistHashTableMPI<Integer, GenPolynomial<C>> dl, PairList<C> L) {
741        rank = r;
742        active = a;
743        //threadsPerNode = tpn;
744        finner = fin;
745        pairChannel = pc;
746        theList = dl;
747        pairlist = L;
748        goon = true;
749        //logger.info("reducer server created " + this);
750    }
751
752
753    /**
754     * Work loop.
755     * @see java.lang.Thread#run()
756     */
757    @Override
758    @SuppressWarnings("unchecked")
759    public void run() {
760        //Pair<C> pair = null;
761        GenPolynomial<C> H = null;
762        int red = 0;
763        int polIndex = -1;
764        //Integer senderId; // obsolete
765
766        // while more requests
767        while (goon) {
768            // receive request
769            logger.debug("receive result");
770            //senderId = null;
771            Object rh = null;
772            try {
773                rh = pairChannel.receive(resultTag);
774                @SuppressWarnings("unused")
775                int i = active.getAndDecrement();
776                //} catch (InterruptedException e) {
777                //goon = false;
778                ////e.printStackTrace();
779                ////?? finner.initIdle(1);
780                //break;
781            } catch (IOException e) {
782                e.printStackTrace();
783                goon = false;
784                finner.initIdle(1);
785                break;
786            } catch (MPIException e) {
787                e.printStackTrace();
788                goon = false;
789                finner.initIdle(1);
790                break;
791            } catch (ClassNotFoundException e) {
792                e.printStackTrace();
793                goon = false;
794                finner.initIdle(1);
795                break;
796            }
797            logger.info("received result");
798            if (rh == null) {
799                if (this.isInterrupted()) {
800                    goon = false;
801                    finner.initIdle(1);
802                    break;
803                }
804                //finner.initIdle(1);
805            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
806                logger.info("received GBTransportMessEnd");
807                goon = false;
808                //?? finner.initIdle(1);
809                break;
810            } else if (rh instanceof GBTransportMessPoly) {
811                // update pair list
812                red++;
813                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
814                H = mpi.pol;
815                //senderId = mpi.threadId;
816                if (H != null) {
817                    if (logger.isInfoEnabled()) { // debug
818                        logger.info("H = " + H.leadingExpVector());
819                    }
820                    if (!H.isZERO()) {
821                        if (H.isONE()) {
822                            polIndex = pairlist.putOne();
823                            //GenPolynomial<C> nn = 
824                            theList.putWait(Integer.valueOf(polIndex), H);
825                            //goon = false; must wait for other clients
826                            //finner.initIdle(1);
827                            //break;
828                        } else {
829                            polIndex = pairlist.put(H);
830                            // use putWait ? but still not all distributed
831                            //GenPolynomial<C> nn = 
832                            theList.putWait(Integer.valueOf(polIndex), H);
833                        }
834                    }
835                }
836            }
837            // only after recording in pairlist !
838            finner.initIdle(1);
839            try {
840                pairChannel.send(ackTag, new GBTransportMess());
841                logger.debug("send acknowledgement");
842            } catch (IOException e) {
843                e.printStackTrace();
844                goon = false;
845                break;
846            } catch (MPIException e) {
847                e.printStackTrace();
848                goon = false;
849                break;
850            }
851        } // end while
852        goon = false;
853        logger.info("terminated, received " + red + " reductions");
854    }
855
856
857    /**
858     * Terminate.
859     */
860    public void terminate() {
861        goon = false;
862        try {
863            this.join();
864            //this.interrupt();
865        } catch (InterruptedException e) {
866            // unfug Thread.currentThread().interrupt();
867        }
868        logger.info("terminate end");
869    }
870
871}
872
873
874/**
875 * Distributed clients reducing worker threads.
876 */
877
878class HybridReducerClientMPI<C extends RingElem<C>> implements Runnable {
879
880
881    private static final Logger logger = LogManager.getLogger(HybridReducerClientMPI.class);
882
883
884    public final boolean debug = logger.isDebugEnabled();
885
886
887    private final MPIChannel pairChannel;
888
889
890    private final DistHashTableMPI<Integer, GenPolynomial<C>> theList;
891
892
893    private final ReductionPar<C> red;
894
895
896    //private final int threadsPerNode;
897
898
899    /*
900     * Identification number for this thread.
901     */
902    //public final Integer threadId; // obsolete
903
904
905    /**
906     * Message tag for pairs.
907     */
908    public static final int pairTag = GroebnerBaseDistributedHybridMPI.pairTag;
909
910
911    /**
912     * Message tag for results.
913     */
914    public static final int resultTag = GroebnerBaseDistributedHybridMPI.resultTag;
915
916
917    /**
918     * Message tag for acknowledgments.
919     */
920    public static final int ackTag = GroebnerBaseDistributedHybridMPI.ackTag;
921
922
923    /**
924     * Constructor.
925     * @param tc tagged socket channel
926     * @param dl distributed hash table
927     */
928    HybridReducerClientMPI(MPIChannel tc, DistHashTableMPI<Integer, GenPolynomial<C>> dl) {
929        //this.threadsPerNode = tpn;
930        pairChannel = tc;
931        //threadId = 100 + tid; // keep distinct from other tags
932        theList = dl;
933        red = new ReductionPar<C>();
934    }
935
936
937    /**
938     * Work loop.
939     * @see java.lang.Runnable#run()
940     */
941    @Override
942    @SuppressWarnings("unchecked")
943    public void run() {
944        if (debug) {
945            logger.info("pairChannel   = " + pairChannel + " reducer client running");
946        }
947        Pair<C> pair = null;
948        GenPolynomial<C> pi, pj, ps;
949        GenPolynomial<C> S;
950        GenPolynomial<C> H = null;
951        //boolean set = false;
952        boolean goon = true;
953        boolean doEnd = true;
954        int reduction = 0;
955        //int sleeps = 0;
956        Integer pix, pjx, psx;
957
958        while (goon) {
959            /* protocol:
960             * request pair, process pair, send result, receive acknowledgment
961             */
962            // pair = (Pair) pairlist.removeNext();
963            Object req = new GBTransportMessReq();
964            logger.debug("send request = " + req);
965            try {
966                pairChannel.send(pairTag, req);
967            } catch (IOException e) {
968                goon = false;
969                if (debug) {
970                    e.printStackTrace();
971                }
972                logger.info("receive pair, exception ");
973                break;
974            } catch (MPIException e) {
975                goon = false;
976                if (debug) {
977                    e.printStackTrace();
978                }
979                logger.info("receive pair, exception ");
980                break;
981            }
982            logger.debug("receive pair, goon = " + goon);
983            doEnd = true;
984            Object pp = null;
985            try {
986                pp = pairChannel.receive(pairTag);
987                //} catch (InterruptedException e) {
988                //goon = false;
989                //e.printStackTrace();
990            } catch (IOException e) {
991                goon = false;
992                if (debug) {
993                    e.printStackTrace();
994                }
995                break;
996            } catch (MPIException e) {
997                goon = false;
998                if (debug) {
999                    e.printStackTrace();
1000                }
1001                break;
1002            } catch (ClassNotFoundException e) {
1003                goon = false;
1004                e.printStackTrace();
1005            }
1006            if (debug) {
1007                logger.info("received pair = " + pp);
1008            }
1009            H = null;
1010            if (pp == null) { // should not happen
1011                continue;
1012            }
1013            if (pp instanceof GBTransportMessEnd) {
1014                goon = false;
1015                //doEnd = false; // bug
1016                continue;
1017            }
1018            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1019                pi = pj = ps = null;
1020                if (pp instanceof GBTransportMessPair) {
1021                    pair = ((GBTransportMessPair<C>) pp).pair;
1022                    if (pair != null) {
1023                        pi = pair.pi;
1024                        pj = pair.pj;
1025                        //logger.debug("pair: pix = " + pair.i 
1026                        //               + ", pjx = " + pair.j);
1027                    }
1028                }
1029                if (pp instanceof GBTransportMessPairIndex) {
1030                    pix = ((GBTransportMessPairIndex) pp).i;
1031                    pjx = ((GBTransportMessPairIndex) pp).j;
1032                    psx = ((GBTransportMessPairIndex) pp).s;
1033                    pi = theList.getWait(pix);
1034                    pj = theList.getWait(pjx);
1035                    ps = theList.getWait(psx);
1036                    //logger.info("pix = " + pix + ", pjx = " +pjx + ", psx = " +psx);
1037                }
1038
1039                if (pi != null && pj != null) {
1040                    S = red.SPolynomial(pi, pj);
1041                    //System.out.println("S   = " + S);
1042                    logger.info("ht(S) = " + S.leadingExpVector());
1043                    if (S.isZERO()) {
1044                        // pair.setZero(); does not work in dist
1045                        H = S;
1046                    } else {
1047                        if (debug) {
1048                            logger.debug("ht(S) = " + S.leadingExpVector());
1049                        }
1050                        H = red.normalform(theList, S);
1051                        reduction++;
1052                        if (H.isZERO()) {
1053                            // pair.setZero(); does not work in dist
1054                        } else {
1055                            H = H.monic();
1056                            if (logger.isInfoEnabled()) {
1057                                logger.info("ht(H) = " + H.leadingExpVector());
1058                            }
1059                        }
1060                    }
1061                } else {
1062                    logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps);
1063                }
1064            }
1065            if (pp instanceof GBTransportMess) {
1066                logger.debug("null pair results in null H poly");
1067            }
1068
1069            // send H or must send null, if not at end
1070            if (debug) {
1071                logger.debug("#distributed list = " + theList.size());
1072                logger.debug("send H polynomial = " + H);
1073            }
1074            try {
1075                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1076                doEnd = false;
1077            } catch (IOException e) {
1078                goon = false;
1079                e.printStackTrace();
1080            } catch (MPIException e) {
1081                goon = false;
1082                e.printStackTrace();
1083            }
1084            logger.debug("done send poly message of " + pp);
1085            try {
1086                pp = pairChannel.receive(ackTag);
1087                //} catch (InterruptedException e) {
1088                //goon = false;
1089                //e.printStackTrace();
1090            } catch (IOException e) {
1091                goon = false;
1092                if (debug) {
1093                    e.printStackTrace();
1094                }
1095                break;
1096            } catch (MPIException e) {
1097                goon = false;
1098                if (debug) {
1099                    e.printStackTrace();
1100                }
1101                break;
1102            } catch (ClassNotFoundException e) {
1103                goon = false;
1104                e.printStackTrace();
1105            }
1106            if (!(pp instanceof GBTransportMess)) {
1107                logger.error("invalid acknowledgement " + pp);
1108            }
1109            logger.debug("received acknowledgment " + pp);
1110        }
1111        logger.info("terminated, done " + reduction + " reductions");
1112        if (doEnd) {
1113            try {
1114                pairChannel.send(resultTag, new GBTransportMessEnd());
1115            } catch (IOException e) {
1116                //e.printStackTrace();
1117            } catch (MPIException e) {
1118                //e.printStackTrace();
1119            }
1120            logger.info("terminated, send done");
1121        }
1122    }
1123}