001/*
002 * $Id: GroebnerBaseDistributedHybridMPJ.java 4952 2014-10-12 19:41:46Z axelclk
003 * $
004 */
005
006package edu.jas.gb;
007
008
009import java.io.IOException;
010import java.util.ArrayList;
011import java.util.Collections;
012import java.util.List;
013import java.util.ListIterator;
014import java.util.concurrent.atomic.AtomicInteger;
015
016import mpi.Comm;
017
018import org.apache.logging.log4j.Logger;
019import org.apache.logging.log4j.LogManager;
020
021import edu.jas.kern.MPJEngine;
022import edu.jas.poly.ExpVector;
023import edu.jas.poly.GenPolynomial;
024import edu.jas.poly.GenPolynomialRing;
025import edu.jas.poly.PolyUtil;
026import edu.jas.structure.RingElem;
027import edu.jas.util.DistHashTableMPJ;
028import edu.jas.util.MPJChannel;
029import edu.jas.util.Terminator;
030import edu.jas.util.ThreadPool;
031
032
033/**
034 * Groebner Base distributed hybrid algorithm with MPJ. Implements a distributed
035 * memory with multi-core CPUs parallel version of Groebner bases with MPJ.
036 * Using pairlist class, distributed multi-threaded tasks do reduction, one
037 * communication channel per remote node.
038 * @param <C> coefficient type
039 * @author Heinz Kredel
040 */
041
042public class GroebnerBaseDistributedHybridMPJ<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
043
044
045    private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedHybridMPJ.class);
046
047
048    public final boolean debug = logger.isDebugEnabled();
049
050
051    /**
052     * Number of threads to use.
053     */
054    protected final int threads;
055
056
057    /**
058     * Default number of threads.
059     */
060    protected static final int DEFAULT_THREADS = 2;
061
062
063    /**
064     * Number of threads per node to use.
065     */
066    protected final int threadsPerNode;
067
068
069    /**
070     * Default number of threads per compute node.
071     */
072    protected static final int DEFAULT_THREADS_PER_NODE = 1;
073
074
075    /**
076     * Pool of threads to use.
077     */
078    //protected final ExecutorService pool; // not for single node tests
079    protected transient final ThreadPool pool;
080
081
082    /*
083     * Underlying MPJ engine.
084     */
085    protected transient final Comm engine;
086
087
088    /**
089     * Message tag for pairs.
090     */
091    public static final int pairTag = GroebnerBaseDistributedHybridEC.pairTag.intValue();
092
093
094    /**
095     * Message tag for results.
096     */
097    public static final int resultTag = GroebnerBaseDistributedHybridEC.resultTag.intValue();
098
099
100    /**
101     * Message tag for acknowledgments.
102     */
103    public static final int ackTag = GroebnerBaseDistributedHybridEC.ackTag.intValue();
104
105
106    /**
107     * Constructor.
108     */
109    public GroebnerBaseDistributedHybridMPJ() throws IOException {
110        this(DEFAULT_THREADS);
111    }
112
113
114    /**
115     * Constructor.
116     * @param threads number of threads to use.
117     */
118    public GroebnerBaseDistributedHybridMPJ(int threads) throws IOException {
119        this(threads, new ThreadPool(threads));
120    }
121
122
123    /**
124     * Constructor.
125     * @param threads number of threads to use.
126     * @param threadsPerNode threads per node to use.
127     */
128    public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode) throws IOException {
129        this(threads, threadsPerNode, new ThreadPool(threads));
130    }
131
132
133    /**
134     * Constructor.
135     * @param threads number of threads to use.
136     * @param pool ThreadPool to use.
137     */
138    public GroebnerBaseDistributedHybridMPJ(int threads, ThreadPool pool) throws IOException {
139        this(threads, DEFAULT_THREADS_PER_NODE, pool);
140    }
141
142
143    /**
144     * Constructor.
145     * @param threads number of threads to use.
146     * @param threadsPerNode threads per node to use.
147     * @param pl pair selection strategy
148     */
149    public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode, PairList<C> pl)
150                    throws IOException {
151        this(threads, threadsPerNode, new ThreadPool(threads), pl);
152    }
153
154
155    /**
156     * Constructor.
157     * @param threads number of threads to use.
158     * @param threadsPerNode threads per node to use.
159     */
160    public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode, ThreadPool pool)
161                    throws IOException {
162        this(threads, threadsPerNode, pool, new OrderedPairlist<C>());
163    }
164
165
166    /**
167     * Constructor.
168     * @param threads number of threads to use.
169     * @param threadsPerNode threads per node to use.
170     * @param pool ThreadPool to use.
171     * @param pl pair selection strategy
172     */
173    public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl)
174                    throws IOException {
175        super(new ReductionPar<C>(), pl);
176        this.engine = MPJEngine.getCommunicator();
177        int size = engine.Size();
178        if (size < 2) {
179            throw new IllegalArgumentException("Minimal 2 MPJ processes required, not " + size);
180        }
181        if (threads != size || pool.getNumber() != size) {
182            throw new IllegalArgumentException("threads != size: " + threads + " != " + size + ", #pool "
183                            + pool.getNumber());
184        }
185        this.threads = threads;
186        this.pool = pool;
187        this.threadsPerNode = threadsPerNode;
188        //logger.info("generated pool: " + pool);
189    }
190
191
192    /**
193     * Cleanup and terminate.
194     */
195    @Override
196    public void terminate() {
197        if (pool == null) {
198            return;
199        }
200        //pool.terminate();
201        pool.cancel();
202    }
203
204
205    /**
206     * Distributed Groebner base.
207     * @param modv number of module variables.
208     * @param F polynomial list.
209     * @return GB(F) a Groebner base of F or null, if a IOException occurs or on
210     *         MPJ client part.
211     */
212    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
213        try {
214            if (engine.Rank() == 0) {
215                return GBmaster(modv, F);
216            }
217        } catch (IOException e) {
218            logger.info("GBmaster: " + e);
219            e.printStackTrace();
220            return null;
221        }
222        pool.terminate(); // not used on clients
223        try {
224            clientPart(0); // only 0
225        } catch (IOException e) {
226            logger.info("clientPart: " + e);
227            e.printStackTrace();
228        }
229        return null;
230    }
231
232
233    /**
234     * Distributed hybrid Groebner base.
235     * @param modv number of module variables.
236     * @param F polynomial list.
237     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
238     */
239    List<GenPolynomial<C>> GBmaster(int modv, List<GenPolynomial<C>> F) throws IOException {
240        long t = System.currentTimeMillis();
241
242        List<GenPolynomial<C>> G = normalizeZerosOnes(F);
243        G = PolyUtil.<C> monic(G);
244        if (G.size() <= 1) {
245            //return G; 
246        }
247        if (G.isEmpty()) {
248            throw new IllegalArgumentException("empty F / zero ideal not allowed");
249        }
250        GenPolynomialRing<C> ring = G.get(0).ring;
251        if (!ring.coFac.isField()) {
252            throw new IllegalArgumentException("coefficients not from a field");
253        }
254        PairList<C> pairlist = strategy.create(modv, ring);
255        pairlist.put(G);
256
257        /*
258        GenPolynomial<C> p;
259        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
260        PairList<C> pairlist = null;
261        boolean oneInGB = false;
262        int l = F.size();
263        int unused = 0;
264        ListIterator<GenPolynomial<C>> it = F.listIterator();
265        while (it.hasNext()) {
266            p = it.next();
267            if (p.length() > 0) {
268                p = p.monic();
269                if (p.isONE()) {
270                    oneInGB = true;
271                    G.clear();
272                    G.add(p);
273                    //return G; must signal termination to others
274                }
275                if (!oneInGB) {
276                    G.add(p);
277                }
278                if (pairlist == null) {
279                    //pairlist = new OrderedPairlist<C>(modv, p.ring);
280                    pairlist = strategy.create(modv, p.ring);
281                    if (!p.ring.coFac.isField()) {
282                        throw new IllegalArgumentException("coefficients not from a field");
283                    }
284                }
285                // theList not updated here
286                if (p.isONE()) {
287                    unused = pairlist.putOne();
288                } else {
289                    unused = pairlist.put(p);
290                }
291            } else {
292                l--;
293            }
294        }
295        //if (l <= 1) {
296        //return G; must signal termination to others
297        //}
298        */
299        logger.info("pairlist " + pairlist);
300
301        logger.debug("looking for clients");
302        DistHashTableMPJ<Integer, GenPolynomial<C>> theList = new DistHashTableMPJ<Integer, GenPolynomial<C>>(
303                        engine);
304        theList.init();
305
306        List<GenPolynomial<C>> al = pairlist.getList();
307        for (int i = 0; i < al.size(); i++) {
308            // no wait required
309            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
310            if (nn != null) {
311                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
312            }
313        }
314
315        Terminator finner = new Terminator((threads - 1) * threadsPerNode);
316        HybridReducerServerMPJ<C> R;
317        logger.info("using pool = " + pool);
318        for (int i = 1; i < threads; i++) {
319            MPJChannel chan = new MPJChannel(engine, i); // closed in server
320            R = new HybridReducerServerMPJ<C>(i, threadsPerNode, finner, chan, theList, pairlist);
321            pool.addJob(R);
322            //logger.info("server submitted " + R);
323        }
324        logger.info("main loop waiting " + finner);
325        finner.waitDone();
326        int ps = theList.size();
327        logger.info("#distributed list = " + ps);
328        // make sure all polynomials arrived: not needed in master
329        G = pairlist.getList();
330        if (ps != G.size()) {
331            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
332        }
333        for (GenPolynomial<C> q : theList.getValueList()) {
334            if (q != null && !q.isZERO()) {
335                logger.debug("final q = " + q.leadingExpVector());
336            }
337        }
338        logger.debug("distributed list end");
339        long time = System.currentTimeMillis();
340        List<GenPolynomial<C>> Gp;
341        Gp = minimalGB(G); // not jet distributed but threaded
342        time = System.currentTimeMillis() - time;
343        logger.debug("parallel gbmi time = " + time);
344        G = Gp;
345        logger.info("server theList.terminate() " + theList.size());
346        theList.terminate();
347        t = System.currentTimeMillis() - t;
348        logger.info("server GB end, time = " + t + ", " + pairlist.toString());
349        return G;
350    }
351
352
353    /**
354     * GB distributed client.
355     * @param rank of the MPJ where the server runs on.
356     * @throws IOException
357     */
358    public void clientPart(int rank) throws IOException {
359        if (rank != 0) {
360            throw new UnsupportedOperationException("only master at rank 0 implemented: " + rank);
361        }
362        Comm engine = MPJEngine.getCommunicator();
363
364        DistHashTableMPJ<Integer, GenPolynomial<C>> theList = new DistHashTableMPJ<Integer, GenPolynomial<C>>();
365        theList.init();
366
367        MPJChannel chan = new MPJChannel(engine, rank);
368
369        ThreadPool pool = new ThreadPool(threadsPerNode);
370        logger.info("client using pool = " + pool);
371        for (int i = 0; i < threadsPerNode; i++) {
372            HybridReducerClientMPJ<C> Rr = new HybridReducerClientMPJ<C>(chan, theList); // i
373            pool.addJob(Rr);
374        }
375        if (debug) {
376            logger.info("clients submitted");
377        }
378        pool.terminate();
379        logger.info("client pool.terminate()");
380
381        chan.close();
382        theList.terminate();
383        return;
384    }
385
386
387    /**
388     * Minimal ordered groebner basis.
389     * @param Fp a Groebner base.
390     * @return a reduced Groebner base of Fp.
391     */
392    @SuppressWarnings("unchecked")
393    @Override
394    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
395        GenPolynomial<C> a;
396        ArrayList<GenPolynomial<C>> G;
397        G = new ArrayList<GenPolynomial<C>>(Fp.size());
398        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
399        while (it.hasNext()) {
400            a = it.next();
401            if (a.length() != 0) { // always true
402                // already monic  a = a.monic();
403                G.add(a);
404            }
405        }
406        if (G.size() <= 1) {
407            return G;
408        }
409
410        ExpVector e;
411        ExpVector f;
412        GenPolynomial<C> p;
413        ArrayList<GenPolynomial<C>> F;
414        F = new ArrayList<GenPolynomial<C>>(G.size());
415        boolean mt;
416
417        while (G.size() > 0) {
418            a = G.remove(0);
419            e = a.leadingExpVector();
420
421            it = G.listIterator();
422            mt = false;
423            while (it.hasNext() && !mt) {
424                p = it.next();
425                f = p.leadingExpVector();
426                mt = e.multipleOf(f);
427            }
428            it = F.listIterator();
429            while (it.hasNext() && !mt) {
430                p = it.next();
431                f = p.leadingExpVector();
432                mt = e.multipleOf(f);
433            }
434            if (!mt) {
435                F.add(a);
436            } else {
437                // System.out.println("dropped " + a.length());
438            }
439        }
440        G = F;
441        if (G.size() <= 1) {
442            return G;
443        }
444        Collections.reverse(G); // important for lex GB
445
446        MiMPJReducerServer<C>[] mirs = (MiMPJReducerServer<C>[]) new MiMPJReducerServer[G.size()];
447        int i = 0;
448        F = new ArrayList<GenPolynomial<C>>(G.size());
449        while (G.size() > 0) {
450            a = G.remove(0);
451            // System.out.println("doing " + a.length());
452            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
453            R.addAll(G);
454            R.addAll(F);
455            mirs[i] = new MiMPJReducerServer<C>(R, a);
456            pool.addJob(mirs[i]);
457            i++;
458            F.add(a);
459        }
460        G = F;
461        F = new ArrayList<GenPolynomial<C>>(G.size());
462        for (i = 0; i < mirs.length; i++) {
463            a = mirs[i].getNF();
464            F.add(a);
465        }
466        return F;
467    }
468
469}
470
471
472/**
473 * Distributed server reducing worker proxy threads.
474 * @param <C> coefficient type
475 */
476
477class HybridReducerServerMPJ<C extends RingElem<C>> implements Runnable {
478
479
480    private static final Logger logger = LogManager.getLogger(HybridReducerServerMPJ.class);
481
482
483    public final boolean debug = logger.isDebugEnabled();
484
485
486    private final Terminator finner;
487
488
489    private final MPJChannel pairChannel;
490
491
492    //protected transient final Comm engine;
493
494
495    private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList;
496
497
498    private final PairList<C> pairlist;
499
500
501    private final int threadsPerNode;
502
503
504    final int rank;
505
506
507    /**
508     * Message tag for pairs.
509     */
510    public static final int pairTag = GroebnerBaseDistributedHybridMPJ.pairTag;
511
512
513    /**
514     * Constructor.
515     * @param r MPJ rank of partner.
516     * @param tpn number of threads per node
517     * @param fin terminator
518     * @param chan MPJ channel
519     * @param dl distributed hash table
520     * @param L ordered pair list
521     */
522    HybridReducerServerMPJ(int r, int tpn, Terminator fin, MPJChannel chan,
523                    DistHashTableMPJ<Integer, GenPolynomial<C>> dl, PairList<C> L) {
524        rank = r;
525        threadsPerNode = tpn;
526        finner = fin;
527        this.pairChannel = chan;
528        theList = dl;
529        pairlist = L;
530        //logger.info("reducer server created " + this);
531    }
532
533
534    /**
535     * Work loop.
536     * @see java.lang.Runnable#run()
537     */
538    @Override
539    @SuppressWarnings("unchecked")
540    public void run() {
541        //logger.info("reducer server running with " + engine);
542        // try {
543        //     pairChannel = new MPJChannel(engine, rank); //,pairTag
544        // } catch (IOException e) {
545        //     e.printStackTrace();
546        //     return;
547        // }
548        if (logger.isInfoEnabled()) {
549            logger.info("reducer server running: pairChannel = " + pairChannel);
550        }
551        // record idle remote workers (minus one?)
552        //finner.beIdle(threadsPerNode-1);
553        finner.initIdle(threadsPerNode);
554        AtomicInteger active = new AtomicInteger(0);
555
556        // start receiver
557        HybridReducerReceiverMPJ<C> receiver = new HybridReducerReceiverMPJ<C>(rank, finner, active,
558                        pairChannel, theList, pairlist);
559        receiver.start();
560
561        Pair<C> pair;
562        //boolean set = false;
563        boolean goon = true;
564        //int polIndex = -1;
565        int red = 0;
566        int sleeps = 0;
567
568        // while more requests
569        while (goon) {
570            // receive request if thread is reported incactive
571            logger.debug("receive request");
572            Object req = null;
573            try {
574                req = pairChannel.receive(pairTag);
575                //} catch (InterruptedException e) {
576                //goon = false;
577                //e.printStackTrace();
578            } catch (IOException e) {
579                goon = false;
580                e.printStackTrace();
581            } catch (ClassNotFoundException e) {
582                goon = false;
583                e.printStackTrace();
584            }
585            logger.debug("received request, req = " + req);
586            if (req == null) {
587                goon = false;
588                break;
589            }
590            if (!(req instanceof GBTransportMessReq)) {
591                goon = false;
592                break;
593            }
594
595            // find pair and manage termination status
596            logger.debug("find pair");
597            while (!pairlist.hasNext()) { // wait
598                if (!finner.hasJobs() && !pairlist.hasNext()) {
599                    goon = false;
600                    break;
601                }
602                try {
603                    sleeps++;
604                    if (sleeps % 3 == 0) {
605                        logger.info("waiting for reducers, remaining = " + finner.getJobs());
606                    }
607                    Thread.sleep(100);
608                } catch (InterruptedException e) {
609                    goon = false;
610                    break;
611                }
612            }
613            if (!pairlist.hasNext() && !finner.hasJobs()) {
614                logger.info("termination detection: no pairs and no jobs left");
615                goon = false;
616                break; //continue; //break?
617            }
618            finner.notIdle(); // before pairlist get!!
619            pair = pairlist.removeNext();
620            // send pair to client, even if null
621            if (debug) {
622                logger.info("active count = " + active.get());
623                logger.info("send pair = " + pair);
624            }
625            GBTransportMess msg = null;
626            if (pair != null) {
627                msg = new GBTransportMessPairIndex(pair);
628            } else {
629                msg = new GBTransportMess(); //not End(); at this time
630                // goon ?= false;
631            }
632            try {
633                red++;
634                pairChannel.send(pairTag, msg);
635                @SuppressWarnings("unused")
636                int a = active.getAndIncrement();
637            } catch (IOException e) {
638                e.printStackTrace();
639                goon = false;
640                break;
641            }
642            //logger.debug("#distributed list = " + theList.size());
643        }
644        logger.info("terminated, send " + red + " reduction pairs");
645
646        /*
647         * send end mark to clients
648         */
649        logger.debug("send end");
650        try {
651            for (int i = 0; i < threadsPerNode; i++) { // -1
652                pairChannel.send(pairTag, new GBTransportMessEnd());
653            }
654            logger.info("sent end to clients");
655            // send also end to receiver, no more
656            //pairChannel.send(resultTag, new GBTransportMessEnd(), engine.Rank());
657        } catch (IOException e) {
658            if (logger.isDebugEnabled()) {
659                e.printStackTrace();
660            }
661        }
662        int d = active.get();
663        if (d > 0) {
664            logger.info("remaining active tasks = " + d);
665        }
666        receiver.terminate();
667        //logger.info("terminated, send " + red + " reduction pairs");
668        pairChannel.close();
669        logger.info("redServ pairChannel.close()");
670        finner.release();
671    }
672}
673
674
675/**
676 * Distributed server receiving worker thread.
677 * @param <C> coefficient type
678 */
679
680class HybridReducerReceiverMPJ<C extends RingElem<C>> extends Thread {
681
682
683    private static final Logger logger = LogManager.getLogger(HybridReducerReceiverMPJ.class);
684
685
686    public final boolean debug = logger.isDebugEnabled();
687
688
689    private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList;
690
691
692    private final PairList<C> pairlist;
693
694
695    private final MPJChannel pairChannel;
696
697
698    final int rank;
699
700
701    private final Terminator finner;
702
703
704    //private final int threadsPerNode;
705
706
707    private final AtomicInteger active;
708
709
710    private volatile boolean goon;
711
712
713    /**
714     * Message tag for results.
715     */
716    public static final int resultTag = GroebnerBaseDistributedHybridMPJ.resultTag;
717
718
719    /**
720     * Message tag for acknowledgments.
721     */
722    public static final int ackTag = GroebnerBaseDistributedHybridMPJ.ackTag;
723
724
725    /**
726     * Constructor.
727     * @param r MPJ rank of partner.
728     * @param fin terminator
729     * @param a active remote tasks count
730     * @param pc tagged socket channel
731     * @param dl distributed hash table
732     * @param L ordered pair list
733     */
734    HybridReducerReceiverMPJ(int r, Terminator fin, AtomicInteger a, MPJChannel pc,
735                    DistHashTableMPJ<Integer, GenPolynomial<C>> dl, PairList<C> L) {
736        rank = r;
737        active = a;
738        //threadsPerNode = tpn;
739        finner = fin;
740        pairChannel = pc;
741        theList = dl;
742        pairlist = L;
743        goon = true;
744        //logger.info("reducer server created " + this);
745    }
746
747
748    /**
749     * Work loop.
750     * @see java.lang.Thread#run()
751     */
752    @Override
753    @SuppressWarnings("unchecked")
754    public void run() {
755        //Pair<C> pair = null;
756        GenPolynomial<C> H = null;
757        int red = 0;
758        int polIndex = -1;
759        //Integer senderId; // obsolete
760
761        // while more requests
762        while (goon) {
763            // receive request
764            logger.debug("receive result");
765            //senderId = null;
766            Object rh = null;
767            try {
768                rh = pairChannel.receive(resultTag);
769                @SuppressWarnings("unused")
770                int i = active.getAndDecrement();
771                //} catch (InterruptedException e) {
772                //goon = false;
773                ////e.printStackTrace();
774                ////?? finner.initIdle(1);
775                //break;
776            } catch (IOException e) {
777                e.printStackTrace();
778                goon = false;
779                finner.initIdle(1);
780                break;
781            } catch (ClassNotFoundException e) {
782                e.printStackTrace();
783                goon = false;
784                finner.initIdle(1);
785                break;
786            }
787            logger.info("received result");
788            if (rh == null) {
789                if (this.isInterrupted()) {
790                    goon = false;
791                    finner.initIdle(1);
792                    break;
793                }
794                //finner.initIdle(1);
795            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
796                logger.info("received GBTransportMessEnd");
797                goon = false;
798                //?? finner.initIdle(1);
799                break;
800            } else if (rh instanceof GBTransportMessPoly) {
801                // update pair list
802                red++;
803                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
804                H = mpi.pol;
805                //senderId = mpi.threadId;
806                if (H != null) {
807                    if (logger.isInfoEnabled()) { // debug
808                        logger.info("H = " + H.leadingExpVector());
809                    }
810                    if (!H.isZERO()) {
811                        if (H.isONE()) {
812                            polIndex = pairlist.putOne();
813                            //GenPolynomial<C> nn = 
814                            theList.putWait(Integer.valueOf(polIndex), H);
815                            //goon = false; must wait for other clients
816                            //finner.initIdle(1);
817                            //break;
818                        } else {
819                            polIndex = pairlist.put(H);
820                            // use putWait ? but still not all distributed
821                            //GenPolynomial<C> nn = 
822                            theList.putWait(Integer.valueOf(polIndex), H);
823                        }
824                    }
825                }
826            }
827            // only after recording in pairlist !
828            finner.initIdle(1);
829            try {
830                pairChannel.send(ackTag, new GBTransportMess());
831                logger.debug("send acknowledgement");
832            } catch (IOException e) {
833                e.printStackTrace();
834                goon = false;
835                break;
836            }
837        } // end while
838        goon = false;
839        logger.info("terminated, received " + red + " reductions");
840    }
841
842
843    /**
844     * Terminate.
845     */
846    public void terminate() {
847        goon = false;
848        try {
849            this.join();
850            //this.interrupt();
851        } catch (InterruptedException e) {
852            // unfug Thread.currentThread().interrupt();
853        }
854        logger.info("terminate end");
855    }
856
857}
858
859
860/**
861 * Distributed clients reducing worker threads.
862 */
863
864class HybridReducerClientMPJ<C extends RingElem<C>> implements Runnable {
865
866
867    private static final Logger logger = LogManager.getLogger(HybridReducerClientMPJ.class);
868
869
870    public final boolean debug = logger.isDebugEnabled();
871
872
873    private final MPJChannel pairChannel;
874
875
876    private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList;
877
878
879    private final ReductionPar<C> red;
880
881
882    //private final int threadsPerNode;
883
884
885    /*
886     * Identification number for this thread.
887     */
888    //public final Integer threadId; // obsolete
889
890
891    /**
892     * Message tag for pairs.
893     */
894    public static final int pairTag = GroebnerBaseDistributedHybridMPJ.pairTag;
895
896
897    /**
898     * Message tag for results.
899     */
900    public static final int resultTag = GroebnerBaseDistributedHybridMPJ.resultTag;
901
902
903    /**
904     * Message tag for acknowledgments.
905     */
906    public static final int ackTag = GroebnerBaseDistributedHybridMPJ.ackTag;
907
908
909    /**
910     * Constructor.
911     * @param tc tagged socket channel
912     * @param dl distributed hash table
913     */
914    HybridReducerClientMPJ(MPJChannel tc, DistHashTableMPJ<Integer, GenPolynomial<C>> dl) {
915        //this.threadsPerNode = tpn;
916        pairChannel = tc;
917        //threadId = 100 + tid; // keep distinct from other tags
918        theList = dl;
919        red = new ReductionPar<C>();
920    }
921
922
923    /**
924     * Work loop.
925     * @see java.lang.Runnable#run()
926     */
927    @Override
928    @SuppressWarnings("unchecked")
929    public void run() {
930        if (debug) {
931            logger.info("pairChannel   = " + pairChannel + " reducer client running");
932        }
933        Pair<C> pair = null;
934        GenPolynomial<C> pi, pj, ps;
935        GenPolynomial<C> S;
936        GenPolynomial<C> H = null;
937        //boolean set = false;
938        boolean goon = true;
939        boolean doEnd = true;
940        int reduction = 0;
941        //int sleeps = 0;
942        Integer pix, pjx, psx;
943
944        while (goon) {
945            /* protocol:
946             * request pair, process pair, send result, receive acknowledgment
947             */
948            // pair = (Pair) pairlist.removeNext();
949            Object req = new GBTransportMessReq();
950            logger.debug("send request = " + req);
951            try {
952                pairChannel.send(pairTag, req);
953            } catch (IOException e) {
954                goon = false;
955                if (debug) {
956                    e.printStackTrace();
957                }
958                logger.info("receive pair, exception ");
959                break;
960            }
961            logger.debug("receive pair, goon = " + goon);
962            doEnd = true;
963            Object pp = null;
964            try {
965                pp = pairChannel.receive(pairTag);
966                //} catch (InterruptedException e) {
967                //goon = false;
968                //e.printStackTrace();
969            } catch (IOException e) {
970                goon = false;
971                if (debug) {
972                    e.printStackTrace();
973                }
974                break;
975            } catch (ClassNotFoundException e) {
976                goon = false;
977                e.printStackTrace();
978            }
979            if (debug) {
980                logger.info("received pair = " + pp);
981            }
982            H = null;
983            if (pp == null) { // should not happen
984                continue;
985            }
986            if (pp instanceof GBTransportMessEnd) {
987                goon = false;
988                //doEnd = false; // bug
989                continue;
990            }
991            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
992                pi = pj = ps = null;
993                if (pp instanceof GBTransportMessPair) {
994                    pair = ((GBTransportMessPair<C>) pp).pair;
995                    if (pair != null) {
996                        pi = pair.pi;
997                        pj = pair.pj;
998                        //logger.debug("pair: pix = " + pair.i 
999                        //               + ", pjx = " + pair.j);
1000                    }
1001                }
1002                if (pp instanceof GBTransportMessPairIndex) {
1003                    pix = ((GBTransportMessPairIndex) pp).i;
1004                    pjx = ((GBTransportMessPairIndex) pp).j;
1005                    psx = ((GBTransportMessPairIndex) pp).s;
1006                    pi = theList.getWait(pix);
1007                    pj = theList.getWait(pjx);
1008                    ps = theList.getWait(psx);
1009                    //logger.info("pix = " + pix + ", pjx = " +pjx + ", psx = " +psx);
1010                }
1011
1012                if (pi != null && pj != null) {
1013                    S = red.SPolynomial(pi, pj);
1014                    //System.out.println("S   = " + S);
1015                    logger.info("ht(S) = " + S.leadingExpVector());
1016                    if (S.isZERO()) {
1017                        // pair.setZero(); does not work in dist
1018                        H = S;
1019                    } else {
1020                        if (debug) {
1021                            logger.debug("ht(S) = " + S.leadingExpVector());
1022                        }
1023                        H = red.normalform(theList, S);
1024                        reduction++;
1025                        if (H.isZERO()) {
1026                            // pair.setZero(); does not work in dist
1027                        } else {
1028                            H = H.monic();
1029                            if (logger.isInfoEnabled()) {
1030                                logger.info("ht(H) = " + H.leadingExpVector());
1031                            }
1032                        }
1033                    }
1034                } else {
1035                    logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps);
1036                }
1037            }
1038            if (pp instanceof GBTransportMess) {
1039                logger.debug("null pair results in null H poly");
1040            }
1041
1042            // send H or must send null, if not at end
1043            if (debug) {
1044                logger.debug("#distributed list = " + theList.size());
1045                logger.debug("send H polynomial = " + H);
1046            }
1047            try {
1048                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1049                doEnd = false;
1050            } catch (IOException e) {
1051                goon = false;
1052                e.printStackTrace();
1053            }
1054            logger.debug("done send poly message of " + pp);
1055            try {
1056                pp = pairChannel.receive(ackTag);
1057                //} catch (InterruptedException e) {
1058                //goon = false;
1059                //e.printStackTrace();
1060            } catch (IOException e) {
1061                goon = false;
1062                if (debug) {
1063                    e.printStackTrace();
1064                }
1065                break;
1066            } catch (ClassNotFoundException e) {
1067                goon = false;
1068                e.printStackTrace();
1069            }
1070            if (!(pp instanceof GBTransportMess)) {
1071                logger.error("invalid acknowledgement " + pp);
1072            }
1073            logger.debug("received acknowledgment " + pp);
1074        }
1075        logger.info("terminated, done " + reduction + " reductions");
1076        if (doEnd) {
1077            try {
1078                pairChannel.send(resultTag, new GBTransportMessEnd());
1079            } catch (IOException e) {
1080                //e.printStackTrace();
1081            }
1082            logger.info("terminated, send done");
1083        }
1084    }
1085}