001/*
002 * $Id$
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.Semaphore;
014
015import org.apache.logging.log4j.Logger;
016import org.apache.logging.log4j.LogManager;
017
018import edu.jas.kern.MPIEngine;
019import edu.jas.poly.ExpVector;
020import edu.jas.poly.GenPolynomial;
021import edu.jas.structure.RingElem;
022import edu.jas.util.DistHashTableMPI;
023import edu.jas.util.MPIChannel;
024import edu.jas.util.Terminator;
025import edu.jas.util.ThreadPool;
026
027import mpi.Comm;
028import mpi.MPIException;
029
030
031/**
032 * Groebner Base distributed algorithm with MPI. Implements a distributed memory
033 * parallel version of Groebner bases. Using MPI and pairlist class, distributed
034 * tasks do reduction.
035 * @param <C> coefficient type
036 * @author Heinz Kredel
037 */
038
039public class GroebnerBaseDistributedMPI<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
040
041
042    private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedMPI.class);
043
044
045    /**
046     * Number of threads to use.
047     */
048    protected final int threads;
049
050
051    /**
052     * Default number of threads.
053     */
054    public static final int DEFAULT_THREADS = 2;
055
056
057    /*
058     * Pool of threads to use. <b>Note:</b> No ComputerThreads for one node
059     * tests
060     */
061    protected transient final ThreadPool pool;
062
063
064    /*
065     * Underlying MPI engine.
066     */
067    protected transient final Comm engine;
068
069
070    /**
071     * Constructor.
072     */
073    public GroebnerBaseDistributedMPI() throws IOException {
074        this(DEFAULT_THREADS);
075    }
076
077
078    /**
079     * Constructor.
080     * @param threads number of threads to use.
081     */
082    public GroebnerBaseDistributedMPI(int threads) throws IOException {
083        this(threads, new ThreadPool(threads));
084    }
085
086
087    /**
088     * Constructor.
089     * @param threads number of threads to use.
090     * @param pool ThreadPool to use.
091     */
092    public GroebnerBaseDistributedMPI(int threads, ThreadPool pool) throws IOException {
093        this(threads, pool, new OrderedPairlist<C>());
094    }
095
096
097    /**
098     * Constructor.
099     * @param threads number of threads to use.
100     * @param pl pair selection strategy
101     */
102    public GroebnerBaseDistributedMPI(int threads, PairList<C> pl) throws IOException {
103        this(threads, new ThreadPool(threads), pl);
104    }
105
106
107    /**
108     * Constructor.
109     * @param threads number of threads to use.
110     * @param pool ThreadPool to use.
111     * @param pl pair selection strategy
112     */
113    public GroebnerBaseDistributedMPI(int threads, ThreadPool pool, PairList<C> pl) throws IOException {
114        super(new ReductionPar<C>(), pl);
115        int size = 0;
116        try {
117            engine = MPIEngine.getCommunicator();
118            size = engine.Size();
119        } catch (MPIException e) {
120            throw new IOException(e);
121        }
122        if (size < 2) {
123            throw new IllegalArgumentException("Minimal 2 MPI processes required, not " + size);
124        }
125        if (threads != size || pool.getNumber() != size) {
126            throw new IllegalArgumentException(
127                            "threads != size: " + threads + " != " + size + ", #pool " + pool.getNumber());
128        }
129        this.threads = threads;
130        this.pool = pool;
131    }
132
133
134    /**
135     * Cleanup and terminate ThreadPool.
136     */
137    @Override
138    public void terminate() {
139        if (pool == null) {
140            return;
141        }
142        //pool.terminate();
143        pool.cancel();
144    }
145
146
147    /**
148     * Distributed Groebner base.
149     * @param modv number of module variables.
150     * @param F polynomial list.
151     * @return GB(F) a Groebner base of F or null, if a IOException occurs or on
152     *         MPI client part.
153     */
154    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
155        try {
156            if (engine.Rank() == 0) {
157                return GBmaster(modv, F);
158            }
159        } catch (MPIException e) {
160            logger.info("GBmaster: " + e);
161            e.printStackTrace();
162            return null;
163        } catch (IOException e) {
164            logger.info("GBmaster: " + e);
165            e.printStackTrace();
166            return null;
167        }
168        pool.terminate(); // not used on clients
169        try {
170            clientPart(0);
171        } catch (IOException e) {
172            logger.info("clientPart: " + e);
173            e.printStackTrace();
174        } catch (MPIException e) {
175            logger.info("clientPart: " + e);
176            e.printStackTrace();
177        }
178        return null;
179    }
180
181
182    /**
183     * Distributed Groebner base, part for MPI master.
184     * @param modv number of module variables.
185     * @param F polynomial list.
186     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
187     */
188    public List<GenPolynomial<C>> GBmaster(int modv, List<GenPolynomial<C>> F)
189                    throws MPIException, IOException {
190        List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>();
191        GenPolynomial<C> p;
192        PairList<C> pairlist = null;
193        boolean oneInGB = false;
194        //int l = F.size();
195        int unused = 0;
196        ListIterator<GenPolynomial<C>> it = F.listIterator();
197        while (it.hasNext()) {
198            p = it.next();
199            if (p.length() > 0) {
200                p = p.monic();
201                if (p.isONE()) {
202                    oneInGB = true;
203                    G.clear();
204                    G.add(p);
205                    //return G; must signal termination to others
206                }
207                if (!oneInGB) {
208                    G.add(p);
209                }
210                if (pairlist == null) {
211                    pairlist = strategy.create(modv, p.ring);
212                    if (!p.ring.coFac.isField()) {
213                        throw new IllegalArgumentException("coefficients not from a field");
214                    }
215                }
216                // theList not updated here
217                if (p.isONE()) {
218                    unused = pairlist.putOne();
219                } else {
220                    unused = pairlist.put(p);
221                }
222            } else {
223                //l--;
224            }
225        }
226        //if (l <= 1) {
227        //return G; must signal termination to others
228        //}
229        logger.info("done pairlist, initialize DHT: " + unused);
230
231        DistHashTableMPI<Integer, GenPolynomial<C>> theList = new DistHashTableMPI<Integer, GenPolynomial<C>>(
232                        engine);
233        theList.init();
234        //logger.info("done DHT: " + theList);
235
236        List<GenPolynomial<C>> al = pairlist.getList();
237        for (int i = 0; i < al.size(); i++) {
238            // no wait required
239            GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i));
240            if (nn != null) {
241                logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i));
242            }
243        }
244
245        Terminator fin = new Terminator(threads - 1);
246        MPIReducerServer<C> R;
247        for (int i = 1; i < threads; i++) {
248            logger.debug("addJob " + i + " of " + threads);
249            MPIChannel chan = new MPIChannel(engine, i); // closed in server
250            R = new MPIReducerServer<C>(i, fin, chan, theList, pairlist);
251            pool.addJob(R);
252        }
253        logger.debug("main loop waiting");
254        fin.waitDone();
255        int ps = theList.size();
256        logger.info("#distributed list = " + ps);
257        // make sure all polynomials arrived: not needed in master
258        // G = (ArrayList)theList.values();
259        G = pairlist.getList();
260        if (ps != G.size()) {
261            logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size());
262        }
263        long time = System.currentTimeMillis();
264        List<GenPolynomial<C>> Gp = minimalGB(G); // not jet distributed but threaded
265        time = System.currentTimeMillis() - time;
266        logger.debug("parallel gbmi = " + time);
267        G = Gp;
268        logger.info("theList.terminate()");
269        theList.terminate();
270        logger.info("end" + pairlist);
271        return G;
272    }
273
274
275    /**
276     * GB distributed client.
277     * @param rank of the MPI where the server runs on.
278     * @throws IOException
279     */
280    public void clientPart(int rank) throws IOException, MPIException {
281        if (rank != 0) {
282            throw new UnsupportedOperationException("only master at rank 0 implemented: " + rank);
283        }
284        Comm engine = MPIEngine.getCommunicator();
285
286        DistHashTableMPI<Integer, GenPolynomial<C>> theList = new DistHashTableMPI<Integer, GenPolynomial<C>>();
287        theList.init();
288
289        MPIChannel chan = new MPIChannel(engine, rank);
290
291        MPIReducerClient<C> R = new MPIReducerClient<C>(chan, theList);
292        R.run();
293
294        chan.close();
295        theList.terminate();
296        return;
297    }
298
299
300    /**
301     * Minimal ordered groebner basis.
302     * @param Fp a Groebner base.
303     * @return a reduced Groebner base of Fp.
304     */
305    @SuppressWarnings("unchecked")
306    @Override
307    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
308        GenPolynomial<C> a;
309        ArrayList<GenPolynomial<C>> G;
310        G = new ArrayList<GenPolynomial<C>>(Fp.size());
311        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
312        while (it.hasNext()) {
313            a = it.next();
314            if (a.length() != 0) { // always true
315                // already monic  a = a.monic();
316                G.add(a);
317            }
318        }
319        if (G.size() <= 1) {
320            return G;
321        }
322
323        ExpVector e;
324        ExpVector f;
325        GenPolynomial<C> p;
326        ArrayList<GenPolynomial<C>> F;
327        F = new ArrayList<GenPolynomial<C>>(G.size());
328        boolean mt;
329
330        while (G.size() > 0) {
331            a = G.remove(0);
332            e = a.leadingExpVector();
333
334            it = G.listIterator();
335            mt = false;
336            while (it.hasNext() && !mt) {
337                p = it.next();
338                f = p.leadingExpVector();
339                mt = e.multipleOf(f);
340            }
341            it = F.listIterator();
342            while (it.hasNext() && !mt) {
343                p = it.next();
344                f = p.leadingExpVector();
345                mt = e.multipleOf(f);
346            }
347            if (!mt) {
348                F.add(a);
349            } else {
350                // System.out.println("dropped " + a.length());
351            }
352        }
353        G = F;
354        if (G.size() <= 1) {
355            return G;
356        }
357        Collections.reverse(G); // important for lex GB
358
359        MiMPIReducerServer<C>[] mirs = (MiMPIReducerServer<C>[]) new MiMPIReducerServer[G.size()];
360        int i = 0;
361        F = new ArrayList<GenPolynomial<C>>(G.size());
362        while (G.size() > 0) {
363            a = G.remove(0);
364            // System.out.println("doing " + a.length());
365            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
366            R.addAll(G);
367            R.addAll(F);
368            mirs[i] = new MiMPIReducerServer<C>(R, a);
369            pool.addJob(mirs[i]);
370            i++;
371            F.add(a);
372        }
373        G = F;
374        F = new ArrayList<GenPolynomial<C>>(G.size());
375        for (i = 0; i < mirs.length; i++) {
376            a = mirs[i].getNF();
377            F.add(a);
378        }
379        return F;
380    }
381
382}
383
384
385/**
386 * Distributed server reducing worker threads.
387 * @param <C> coefficient type
388 */
389
390class MPIReducerServer<C extends RingElem<C>> implements Runnable {
391
392
393    /*
394     * Termination detection coordinator.
395     */
396    private final Terminator finaler;
397
398
399    /*
400     * Underlying MPI engine.
401     */
402    //protected transient final Comm engine;
403
404
405    /*
406     * MPI channel.
407     */
408    private final MPIChannel pairChannel;
409
410
411    /*
412     * GB rank.
413     */
414    final int rank;
415
416
417    /*
418     * Distributed HashTable of polynomials.
419     */
420    private final DistHashTableMPI<Integer, GenPolynomial<C>> theList;
421
422
423    /*
424     * Critical pair list of polynomials.
425     */
426    private final PairList<C> pairlist;
427
428
429    private static final Logger logger = LogManager.getLogger(MPIReducerServer.class);
430
431
432    /**
433     * Constructor.
434     * @param r MPI rank of partner.
435     * @param fin termination coordinator to use.
436     * @param c MPI channel to use.
437     * @param dl DHT to use.
438     * @param L pair selection strategy
439     */
440    MPIReducerServer(int r, Terminator fin, MPIChannel c, DistHashTableMPI<Integer, GenPolynomial<C>> dl,
441                    PairList<C> L) {
442        rank = r;
443        finaler = fin;
444        //engine = e;
445        theList = dl;
446        pairlist = L;
447        pairChannel = c;
448        logger.debug("reducer server constructor: "); // + r);
449    }
450
451
452    /**
453     * Main method.
454     */
455    @SuppressWarnings("unchecked")
456    public void run() {
457        logger.debug("reducer server running: "); // + rank);
458        // try {
459        //     pairChannel = new MPIChannel(engine, rank);
460        // } catch (IOException e) {
461        //     e.printStackTrace();
462        //     return;
463        // } catch (MPIException e) {
464        //     e.printStackTrace();
465        //     return;
466        // }
467        if (logger.isInfoEnabled()) {
468            logger.info("reducer server running: pairChannel = " + pairChannel);
469        }
470        Pair<C> pair;
471        GenPolynomial<C> H = null;
472        boolean set = false;
473        boolean goon = true;
474        int polIndex = -1;
475        int red = 0;
476        int sleeps = 0;
477
478        // while more requests
479        while (goon) {
480            // receive request
481            logger.debug("receive request");
482            Object req = null;
483            try {
484                req = pairChannel.receive();
485            } catch (IOException e) {
486                goon = false;
487                e.printStackTrace();
488            } catch (MPIException e) {
489                goon = false;
490                e.printStackTrace();
491            } catch (ClassNotFoundException e) {
492                goon = false;
493                e.printStackTrace();
494            }
495            //logger.debug("received request, req = " + req);
496            if (req == null) {
497                goon = false;
498                break;
499            }
500            if (!(req instanceof GBTransportMessReq)) {
501                goon = false;
502                break;
503            }
504
505            // find pair
506            logger.debug("find pair");
507            while (!pairlist.hasNext()) { // wait
508                if (!set) {
509                    finaler.beIdle();
510                    set = true;
511                }
512                if (!finaler.hasJobs() && !pairlist.hasNext()) {
513                    goon = false;
514                    break;
515                }
516                try {
517                    sleeps++;
518                    if (sleeps % 10 == 0) {
519                        logger.info(" reducer is sleeping");
520                    }
521                    Thread.sleep(100);
522                } catch (InterruptedException e) {
523                    goon = false;
524                    break;
525                }
526            }
527            if (!pairlist.hasNext() && !finaler.hasJobs()) {
528                goon = false;
529                break; //continue; //break?
530            }
531            if (set) {
532                set = false;
533                finaler.notIdle();
534            }
535
536            pair = pairlist.removeNext();
537            /*
538             * send pair to client, receive H
539             */
540            logger.debug("send pair = " + pair);
541            GBTransportMess msg = null;
542            if (pair != null) {
543                msg = new GBTransportMessPairIndex(pair);
544            } else {
545                msg = new GBTransportMess(); //End();
546                // goon ?= false;
547            }
548            try {
549                pairChannel.send(msg);
550            } catch (IOException e) {
551                e.printStackTrace();
552                goon = false;
553                break;
554            } catch (MPIException e) {
555                e.printStackTrace();
556                goon = false;
557                break;
558            }
559            logger.debug("#distributed list = " + theList.size());
560            Object rh = null;
561            try {
562                rh = pairChannel.receive();
563            } catch (IOException e) {
564                e.printStackTrace();
565                goon = false;
566                break;
567            } catch (MPIException e) {
568                e.printStackTrace();
569                goon = false;
570                break;
571            } catch (ClassNotFoundException e) {
572                e.printStackTrace();
573                goon = false;
574                break;
575            }
576            //logger.debug("received H polynomial");
577            if (rh == null) {
578                if (pair != null) {
579                    pair.setZero();
580                }
581            } else if (rh instanceof GBTransportMessPoly) {
582                // update pair list
583                red++;
584                H = ((GBTransportMessPoly<C>) rh).pol;
585                if (logger.isDebugEnabled()) {
586                    logger.debug("H = " + H);
587                }
588                if (H == null) {
589                    if (pair != null) {
590                        pair.setZero();
591                    }
592                } else {
593                    if (H.isZERO()) {
594                        pair.setZero();
595                    } else {
596                        if (H.isONE()) {
597                            polIndex = pairlist.putOne();
598                            //GenPolynomial<C> nn = 
599                            theList.putWait(Integer.valueOf(polIndex), H);
600                            goon = false;
601                            break;
602                        }
603                        polIndex = pairlist.put(H);
604                        // use putWait ? but still not all distributed
605                        //GenPolynomial<C> nn = 
606                        theList.putWait(Integer.valueOf(polIndex), H);
607                    }
608                }
609            }
610        }
611        logger.info("terminated, done " + red + " reductions");
612
613        /*
614         * send end mark to client
615         */
616        logger.debug("send end");
617        try {
618            pairChannel.send(new GBTransportMessEnd());
619        } catch (IOException e) {
620            if (logger.isDebugEnabled()) {
621                e.printStackTrace();
622            }
623        } catch (MPIException e) {
624            if (logger.isDebugEnabled()) {
625                e.printStackTrace();
626            }
627        }
628        finaler.beIdle();
629        pairChannel.close();
630    }
631
632}
633
634
635/**
636 * Distributed clients reducing worker threads.
637 */
638
639class MPIReducerClient<C extends RingElem<C>> implements Runnable {
640
641
642    private final MPIChannel pairChannel;
643
644
645    private final DistHashTableMPI<Integer, GenPolynomial<C>> theList;
646
647
648    private final ReductionPar<C> red;
649
650
651    private static final Logger logger = LogManager.getLogger(MPIReducerClient.class);
652
653
654    /**
655     * Constructor.
656     * @param pc MPI communication channel.
657     * @param dl DHT to use.
658     */
659    MPIReducerClient(MPIChannel pc, DistHashTableMPI<Integer, GenPolynomial<C>> dl) {
660        pairChannel = pc;
661        theList = dl;
662        red = new ReductionPar<C>();
663    }
664
665
666    /**
667     * Main run method.
668     */
669    @SuppressWarnings("unchecked")
670    public void run() {
671        logger.debug("reducer client running");
672        Pair<C> pair = null;
673        GenPolynomial<C> pi, pj, ps;
674        GenPolynomial<C> S;
675        GenPolynomial<C> H = null;
676        //boolean set = false;
677        boolean goon = true;
678        int reduction = 0;
679        //int sleeps = 0;
680        Integer pix, pjx, psx;
681
682        while (goon) {
683            /* protocol:
684             * request pair, process pair, send result
685             */
686            // pair = (Pair) pairlist.removeNext();
687            Object req = new GBTransportMessReq();
688            logger.debug("send request");
689            try {
690                pairChannel.send(req);
691            } catch (IOException e) {
692                goon = false;
693                e.printStackTrace();
694                break;
695            } catch (MPIException e) {
696                goon = false;
697                e.printStackTrace();
698                break;
699            }
700            logger.debug("receive pair, goon");
701            Object pp = null;
702            try {
703                pp = pairChannel.receive();
704            } catch (IOException e) {
705                goon = false;
706                if (logger.isDebugEnabled()) {
707                    e.printStackTrace();
708                }
709                break;
710            } catch (MPIException e) {
711                goon = false;
712                if (logger.isDebugEnabled()) {
713                    e.printStackTrace();
714                }
715                break;
716            } catch (ClassNotFoundException e) {
717                goon = false;
718                e.printStackTrace();
719            }
720            if (logger.isDebugEnabled()) {
721                logger.debug("received pair = " + pp);
722            }
723            H = null;
724            if (pp == null) { // should not happen
725                continue;
726            }
727            if (pp instanceof GBTransportMessEnd) {
728                goon = false;
729                continue;
730            }
731            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
732                pi = pj = ps = null;
733                if (pp instanceof GBTransportMessPair) {
734                    pair = ((GBTransportMessPair<C>) pp).pair;
735                    if (pair != null) {
736                        pi = pair.pi;
737                        pj = pair.pj;
738                        //logger.debug("pair: pix = " + pair.i 
739                        //               + ", pjx = " + pair.j);
740                    }
741                }
742                if (pp instanceof GBTransportMessPairIndex) {
743                    pix = ((GBTransportMessPairIndex) pp).i;
744                    pjx = ((GBTransportMessPairIndex) pp).j;
745                    psx = ((GBTransportMessPairIndex) pp).s;
746                    pi = theList.getWait(pix);
747                    pj = theList.getWait(pjx);
748                    ps = theList.getWait(psx);
749                    //logger.info("pix = " + pix + ", pjx = " + pjx + ", psx = " + psx);
750                }
751                if (pi != null && pj != null) {
752                    S = red.SPolynomial(pi, pj);
753                    //System.out.println("S   = " + S);
754                    if (S.isZERO()) {
755                        // pair.setZero(); does not work in dist
756                    } else {
757                        if (logger.isDebugEnabled()) {
758                            logger.info("ht(S) = " + S.leadingExpVector());
759                        }
760                        H = red.normalform(theList, S);
761                        reduction++;
762                        if (H.isZERO()) {
763                            // pair.setZero(); does not work in dist
764                        } else {
765                            H = H.monic();
766                            if (logger.isInfoEnabled()) {
767                                logger.info("ht(H) = " + H.leadingExpVector());
768                            }
769                        }
770                    }
771                } else {
772                    logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps);
773                }
774            }
775
776            // send H or must send null
777            if (logger.isDebugEnabled()) {
778                logger.debug("#distributed list = " + theList.size());
779                logger.debug("send H polynomial = " + H);
780            }
781            try {
782                pairChannel.send(new GBTransportMessPoly<C>(H));
783            } catch (IOException e) {
784                goon = false;
785                e.printStackTrace();
786            } catch (MPIException e) {
787                goon = false;
788                e.printStackTrace();
789            }
790        }
791        logger.info("terminated, done " + reduction + " reductions");
792        pairChannel.close();
793    }
794}
795
796
797/**
798 * Distributed server reducing worker threads for minimal GB Not jet distributed
799 * but threaded.
800 */
801
802class MiMPIReducerServer<C extends RingElem<C>> implements Runnable {
803
804
805    private final List<GenPolynomial<C>> G;
806
807
808    private GenPolynomial<C> H;
809
810
811    private final Semaphore done = new Semaphore(0);
812
813
814    private final Reduction<C> red;
815
816
817    private static final Logger logger = LogManager.getLogger(MiMPIReducerServer.class);
818
819
820    /**
821     * Constructor.
822     * @param G polynomial list.
823     * @param p polynomial.
824     */
825    MiMPIReducerServer(List<GenPolynomial<C>> G, GenPolynomial<C> p) {
826        this.G = G;
827        H = p;
828        red = new ReductionPar<C>();
829    }
830
831
832    /**
833     * getNF. Blocks until the normal form is computed.
834     * @return the computed normal form.
835     */
836    public GenPolynomial<C> getNF() {
837        try {
838            done.acquire(); //done.P();
839        } catch (InterruptedException e) {
840        }
841        return H;
842    }
843
844
845    /**
846     * Main run method.
847     */
848    public void run() {
849        if (logger.isDebugEnabled()) {
850            logger.debug("ht(H) = " + H.leadingExpVector());
851        }
852        H = red.normalform(G, H); //mod
853        done.release(); //done.V();
854        if (logger.isDebugEnabled()) {
855            logger.debug("ht(H) = " + H.leadingExpVector());
856        }
857        // H = H.monic();
858    }
859}