001/*
002 * $Id$
003 */
004
005package edu.jas.gb;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Collections;
011import java.util.List;
012import java.util.ListIterator;
013import java.util.concurrent.atomic.AtomicInteger;
014import java.util.concurrent.Executors;
015import java.util.concurrent.ExecutorService;
016import java.util.concurrent.ThreadPoolExecutor;
017import java.util.concurrent.TimeUnit;
018
019import org.apache.logging.log4j.Logger;
020import org.apache.logging.log4j.LogManager; 
021
022import edu.jas.poly.ExpVector;
023import edu.jas.poly.GenPolynomial;
024import edu.jas.poly.GenPolynomialRing;
025import edu.jas.poly.PolyUtil;
026import edu.jas.structure.RingElem;
027import edu.jas.util.ChannelFactory;
028import edu.jas.util.DistHashTable;
029import edu.jas.util.DistHashTableServer;
030import edu.jas.util.DistThreadPool;
031import edu.jas.util.RemoteExecutable;
032import edu.jas.util.SocketChannel;
033import edu.jas.util.TaggedSocketChannel;
034import edu.jas.util.Terminator;
035
036
037/**
038 * Groebner Base distributed hybrid algorithm. Implements a distributed memory
039 * with multi-core CPUs parallel version of Groebner bases with executable
040 * channels. Using pairlist class, distributed multi-threaded tasks do
041 * reduction, one communication channel per remote node.
042 * @param <C> coefficient type
043 * @author Heinz Kredel
044 */
045
046public class GroebnerBaseDistributedHybridEC<C extends RingElem<C>> extends GroebnerBaseAbstract<C> {
047
048
049    private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedHybridEC.class);
050
051
052    private static final boolean debug = logger.isDebugEnabled();
053
054
055    /**
056     * Number of threads to use.
057     */
058    protected final int threads;
059
060
061    /**
062     * Default number of threads.
063     */
064    protected static final int DEFAULT_THREADS = 2;
065
066
067    /**
068     * Number of threads per node to use.
069     */
070    protected final int threadsPerNode;
071
072
073    /**
074     * Default number of threads per compute node.
075     */
076    protected static final int DEFAULT_THREADS_PER_NODE = 1;
077
078
079    /**
080     * Pool of threads to use.
081     */
082    //protected final ExecutorService pool; // not for single node tests
083    protected transient final ExecutorService pool;
084
085
086    /**
087     * Default server port.
088     */
089    protected static final int DEFAULT_PORT = 55711;
090
091
092    /**
093     * Default distributed hash table server port.
094     */
095    protected final int DHT_PORT;
096
097
098    /**
099     * machine file to use.
100     */
101    protected final String mfile;
102
103
104    /**
105     * Server port to use.
106     */
107    protected final int port;
108
109
110    /**
111     * Distributed thread pool to use.
112     */
113    private final transient DistThreadPool dtp;
114
115
116    /**
117     * Distributed hash table server to use.
118     */
119    private final transient DistHashTableServer<Integer> dhts;
120
121
122    /**
123     * Message tag for pairs.
124     */
125    public static final Integer pairTag = Integer.valueOf(1);
126
127
128    /**
129     * Message tag for results.
130     */
131    public static final Integer resultTag = Integer.valueOf(2);
132
133
134    /**
135     * Message tag for acknowledgments.
136     */
137    public static final Integer ackTag = Integer.valueOf(3);
138
139
140    /**
141     * Constructor.
142     * @param mfile name of the machine file.
143     */
144    public GroebnerBaseDistributedHybridEC(String mfile) {
145        this(mfile, DEFAULT_THREADS, DEFAULT_PORT);
146    }
147
148
149    /**
150     * Constructor.
151     * @param mfile name of the machine file.
152     * @param threads number of threads to use.
153     */
154    public GroebnerBaseDistributedHybridEC(String mfile, int threads) {
155        this(mfile, threads, Executors.newFixedThreadPool(threads), DEFAULT_PORT);
156    }
157
158
159    /**
160     * Constructor.
161     * @param mfile name of the machine file.
162     * @param threads number of threads to use.
163     * @param port server port to use.
164     */
165    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int port) {
166        this(mfile, threads, Executors.newFixedThreadPool(threads), port);
167    }
168
169
170    /**
171     * Constructor.
172     * @param mfile name of the machine file.
173     * @param threads number of threads to use.
174     * @param threadsPerNode threads per node to use.
175     * @param port server port to use.
176     */
177    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, int port) {
178        this(mfile, threads, threadsPerNode, Executors.newFixedThreadPool(threads), port);
179    }
180
181
182    /**
183     * Constructor.
184     * @param mfile name of the machine file.
185     * @param threads number of threads to use.
186     * @param pool ExecutorService to use.
187     * @param port server port to use.
188     */
189    public GroebnerBaseDistributedHybridEC(String mfile, int threads, ExecutorService pool, int port) {
190        this(mfile, threads, DEFAULT_THREADS_PER_NODE, pool, port);
191    }
192
193
194    /**
195     * Constructor.
196     * @param mfile name of the machine file.
197     * @param threads number of threads to use.
198     * @param threadsPerNode threads per node to use.
199     * @param pl pair selection strategy
200     * @param port server port to use.
201     */
202    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, PairList<C> pl,
203                    int port) {
204        this(mfile, threads, threadsPerNode, Executors.newFixedThreadPool(threads), pl, port);
205    }
206
207
208    /**
209     * Constructor.
210     * @param mfile name of the machine file.
211     * @param threads number of threads to use.
212     * @param threadsPerNode threads per node to use.
213     * @param port server port to use.
214     */
215    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ExecutorService pool,
216                    int port) {
217        this(mfile, threads, threadsPerNode, pool, new OrderedPairlist<C>(), port);
218    }
219
220
221    /**
222     * Constructor.
223     * @param mfile name of the machine file.
224     * @param threads number of threads to use.
225     * @param threadsPerNode threads per node to use.
226     * @param pool ExecutorService to use.
227     * @param pl pair selection strategy
228     * @param port server port to use.
229     */
230    public GroebnerBaseDistributedHybridEC(String mfile, int threads, int threadsPerNode, ExecutorService pool,
231                    PairList<C> pl, int port) {
232        super(new ReductionPar<C>(), pl);
233        this.threads = threads;
234        if (mfile == null || mfile.length() == 0) {
235            this.mfile = "../util/machines"; // contains localhost
236        } else {
237            this.mfile = mfile;
238        }
239        if (threads < 1) {
240            threads = 1;
241        }
242        this.threadsPerNode = threadsPerNode;
243        if (pool == null) {
244            pool = Executors.newFixedThreadPool(threads);
245            logger.error("pool must not be null: {}", pool);
246            //throw new IllegalArgumentException("pool must not be null");
247        }
248        this.pool = pool;
249        this.port = port;
250        logger.info("machine file {}, port = {}, pool = {}", mfile, port, pool);
251        this.dtp = new DistThreadPool(this.threads, this.mfile);
252        logger.info("running {}", dtp);
253        this.DHT_PORT = this.dtp.getEC().getMasterPort() + 100;
254        this.dhts = new DistHashTableServer<Integer>(this.DHT_PORT);
255        this.dhts.init();
256        logger.info("running {}", dhts);
257    }
258
259
260    /**
261     * Cleanup and terminate ExecutorService.
262     */
263    @Override
264    public void terminate() {
265        terminate(true);
266    }
267
268
269    /**
270     * Terminates the distributed thread pools.
271     * @param shutDown true, if shut-down of the remote executable servers is
272     *            requested, false, if remote executable servers stay alive.
273     */
274    public void terminate(boolean shutDown) {
275        pool.shutdown();
276        try {
277            while (!pool.isTerminated()) {
278                //logger.info("await");
279                boolean rest = pool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
280            }
281        } catch (InterruptedException e) {
282            e.printStackTrace();
283        }
284        logger.info(pool.toString());
285        dtp.terminate(shutDown);
286        logger.info("dhts.terminate()");
287        dhts.terminate();
288    }
289
290
291    /**
292     * Distributed Groebner base.
293     * @param modv number of module variables.
294     * @param F polynomial list.
295     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
296     */
297    public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) {
298        List<GenPolynomial<C>> Fp = normalizeZerosOnes(F);
299        Fp = PolyUtil.<C> monic(Fp);
300        if (Fp.size() <= 1) {
301            return Fp;
302        }
303        if (!Fp.get(0).ring.coFac.isField()) {
304            throw new IllegalArgumentException("coefficients not from a field");
305        }
306
307        String master = dtp.getEC().getMasterHost();
308        //int port = dtp.getEC().getMasterPort(); // wrong port
309        GBHybridExerClient<C> gbc = new GBHybridExerClient<C>(master, threadsPerNode, port, DHT_PORT);
310        for (int i = 0; i < threads; i++) {
311            // schedule remote clients
312            dtp.addJob(gbc);
313        }
314        try {
315            Thread.currentThread().sleep(1000);
316        } catch (InterruptedException e) {
317            e.printStackTrace();
318        }
319        // run master
320        List<GenPolynomial<C>> G = GBMaster(modv, Fp);
321        return G;
322    }
323
324
325    /**
326     * Distributed hybrid Groebner base.
327     * @param modv number of module variables.
328     * @param F non empty monic polynomial list without zeros.
329     * @return GB(F) a Groebner base of F or null, if a IOException occurs.
330     */
331    List<GenPolynomial<C>> GBMaster(int modv, List<GenPolynomial<C>> F) {
332        long t = System.currentTimeMillis();
333        ChannelFactory cf = new ChannelFactory(port);
334        cf.init();
335
336        List<GenPolynomial<C>> G = F;
337        if (G.isEmpty()) {
338            throw new IllegalArgumentException("empty polynomial list not allowed");
339        }
340        GenPolynomialRing<C> ring = G.get(0).ring;
341        PairList<C> pairlist = strategy.create(modv, ring);
342        pairlist.put(G);
343        logger.info("start {}", pairlist);
344        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(
345                        "localhost", DHT_PORT);
346        theList.init();
347        List<GenPolynomial<C>> al = pairlist.getList();
348        for (int i = 0; i < al.size(); i++) {
349            // no wait required
350            GenPolynomial<C> nn = null;
351            theList.putWait(Integer.valueOf(i), al.get(i));
352            if (nn != null) {
353                logger.info("double polynomials {}, nn = {}, al(i) = {}", i, nn, al.get(i));
354            }
355        }
356
357        Terminator finner = new Terminator(threads * threadsPerNode);
358        HybridReducerServerEC<C> R;
359        logger.info("using pool = {}", pool);
360        for (int i = 0; i < threads; i++) {
361            R = new HybridReducerServerEC<C>(threadsPerNode, finner, cf, theList, pairlist);
362            pool.execute(R);
363            //logger.info("server submitted {}", R);
364        }
365        try {
366            Thread.currentThread().sleep(1000);
367        } catch (InterruptedException e) {
368            e.printStackTrace();
369        }
370        logger.info("main loop waiting {}", finner);
371        finner.waitDone();
372        int ps = theList.size();
373        logger.info("#distributed list = {}", ps);
374        // make sure all polynomials arrived: not needed in master
375        // G = (ArrayList)theList.values();
376        G = pairlist.getList();
377        if (ps != G.size()) {
378            logger.info("#distributed list = {}, #pairlist list = {}", theList.size(), G.size());
379        }
380        for (GenPolynomial<C> q : theList.getValueList()) {
381            if (debug && q != null && !q.isZERO()) {
382                logger.debug("final q = {}", q.leadingExpVector());
383            }
384        }
385        logger.debug("distributed list end");
386        long time = System.currentTimeMillis();
387        List<GenPolynomial<C>> Gp;
388        Gp = minimalGB(G); // not jet distributed but threaded
389        time = System.currentTimeMillis() - time;
390        logger.debug("parallel gbmi time = {}", time);
391        G = Gp;
392        logger.debug("server cf.terminate()");
393        cf.terminate();
394        logger.debug("server theList.terminate() {}", theList.size());
395        theList.clear();
396        theList.terminate();
397        t = System.currentTimeMillis() - t;
398        logger.info("server GB end, time = {} with {}", t, pairlist.toString());
399        return G;
400    }
401
402
403    /**
404     * GB distributed client part.
405     * @param host the server runs on.
406     * @param port the server runs.
407     * @param dhtport of the DHT server.
408     * @throws IOException
409     */
410    public static <C extends RingElem<C>> void clientPart(String host, int threadsPerNode, int port,
411                    int dhtport) throws IOException {
412        ChannelFactory cf = new ChannelFactory(port + 10); // != port for localhost
413        cf.init();
414        logger.info("clientPart connecting to {}, port = {}, dhtport = {}", host, port, dhtport);
415        SocketChannel channel = cf.getChannel(host, port);
416        TaggedSocketChannel pairChannel = new TaggedSocketChannel(channel);
417        pairChannel.init();
418
419        DistHashTable<Integer, GenPolynomial<C>> theList = new DistHashTable<Integer, GenPolynomial<C>>(host,
420                        dhtport);
421        theList.init();
422
423        ExecutorService pool = Executors.newFixedThreadPool(threadsPerNode);
424        //ThreadPool pool = new ThreadPool(threadsPerNode);
425        logger.info("client using pool = {}", pool);
426        for (int i = 0; i < threadsPerNode; i++) {
427            HybridReducerClientEC<C> Rr = new HybridReducerClientEC<C>(/*threadsPerNode,*/pairChannel, /*i,*/
428            theList);
429            pool.execute(Rr);
430        }
431        logger.info("clients submitted");
432
433        pool.shutdown();
434        try {
435            while (!pool.isTerminated()) {
436                //logger.info("await");
437                boolean rest = pool.awaitTermination(1000L, TimeUnit.MILLISECONDS);
438            }
439        } catch (InterruptedException e) {
440            e.printStackTrace();
441        }
442        //pool.terminate();
443        logger.info("client pool.terminate()");
444
445        pairChannel.close();
446        logger.debug("client pairChannel.close()");
447
448        //master only: theList.clear();
449        theList.terminate();
450        cf.terminate();
451        logger.info("client cf.terminate()");
452
453        channel.close();
454        logger.info("client channel.close()");
455        return;
456    }
457
458
459    /**
460     * Minimal ordered groebner basis.
461     * @param Fp a Groebner base.
462     * @return a reduced Groebner base of Fp.
463     */
464    @Override
465    public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) {
466        GenPolynomial<C> a;
467        ArrayList<GenPolynomial<C>> G;
468        G = new ArrayList<GenPolynomial<C>>(Fp.size());
469        ListIterator<GenPolynomial<C>> it = Fp.listIterator();
470        while (it.hasNext()) {
471            a = it.next();
472            if (a.length() != 0) { // always true
473                // already monic  a = a.monic();
474                G.add(a);
475            }
476        }
477        if (G.size() <= 1) {
478            return G;
479        }
480
481        ExpVector e;
482        ExpVector f;
483        GenPolynomial<C> p;
484        ArrayList<GenPolynomial<C>> F;
485        F = new ArrayList<GenPolynomial<C>>(G.size());
486        boolean mt;
487
488        while (G.size() > 0) {
489            a = G.remove(0);
490            e = a.leadingExpVector();
491
492            it = G.listIterator();
493            mt = false;
494            while (it.hasNext() && !mt) {
495                p = it.next();
496                f = p.leadingExpVector();
497                mt = e.multipleOf(f);
498            }
499            it = F.listIterator();
500            while (it.hasNext() && !mt) {
501                p = it.next();
502                f = p.leadingExpVector();
503                mt = e.multipleOf(f);
504            }
505            if (!mt) {
506                F.add(a);
507            } else {
508                // System.out.println("dropped " + a.length());
509            }
510        }
511        G = F;
512        if (G.size() <= 1) {
513            return G;
514        }
515        Collections.reverse(G); // important for lex GB
516
517        @SuppressWarnings("unchecked")
518        MiReducerServer<C>[] mirs = (MiReducerServer<C>[]) new MiReducerServer[G.size()];
519        int i = 0;
520        F = new ArrayList<GenPolynomial<C>>(G.size());
521        while (G.size() > 0) {
522            a = G.remove(0);
523            // System.out.println("doing " + a.length());
524            List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size());
525            R.addAll(G);
526            R.addAll(F);
527            mirs[i] = new MiReducerServer<C>(R, a);
528            pool.execute(mirs[i]);
529            i++;
530            F.add(a);
531        }
532        G = F;
533        F = new ArrayList<GenPolynomial<C>>(G.size());
534        for (i = 0; i < mirs.length; i++) {
535            a = mirs[i].getNF();
536            F.add(a);
537        }
538        return F;
539    }
540
541}
542
543
544/**
545 * Distributed server reducing worker proxy threads.
546 * @param <C> coefficient type
547 */
548class HybridReducerServerEC<C extends RingElem<C>> implements Runnable {
549
550
551    private static final Logger logger = LogManager.getLogger(HybridReducerServerEC.class);
552
553
554    private static final boolean debug = logger.isDebugEnabled();
555
556
557    private final Terminator finner;
558
559
560    private final ChannelFactory cf;
561
562
563    private TaggedSocketChannel pairChannel;
564
565
566    private final DistHashTable<Integer, GenPolynomial<C>> theList;
567
568
569    private final PairList<C> pairlist;
570
571
572    private final int threadsPerNode;
573
574
575    /**
576     * Message tag for pairs.
577     */
578    public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag;
579
580
581    /**
582     * Message tag for results.
583     */
584    public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag;
585
586
587    /**
588     * Message tag for acknowledgments.
589     */
590    public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag;
591
592
593    /**
594     * Constructor.
595     * @param tpn number of threads per node
596     * @param fin terminator
597     * @param cf channel factory
598     * @param dl distributed hash table
599     * @param L ordered pair list
600     */
601    HybridReducerServerEC(int tpn, Terminator fin, ChannelFactory cf,
602                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
603        threadsPerNode = tpn;
604        finner = fin;
605        this.cf = cf;
606        theList = dl;
607        pairlist = L;
608        //logger.info("reducer server created {}", this);
609    }
610
611
612    /**
613     * Work loop.
614     * @see java.lang.Runnable#run()
615     */
616    @Override
617    public void run() {
618        logger.debug("reducer server running with {}", cf);
619        SocketChannel channel = null;
620        try {
621            channel = cf.getChannel();
622            pairChannel = new TaggedSocketChannel(channel);
623            pairChannel.init();
624        } catch (InterruptedException e) {
625            logger.debug("get pair channel interrupted");
626            e.printStackTrace();
627            return;
628        }
629        if (logger.isInfoEnabled()) {
630            logger.info("pairChannel   = {}", pairChannel);
631        }
632        // record idle remote workers (minus one?)
633        //finner.beIdle(threadsPerNode-1);
634        finner.initIdle(threadsPerNode);
635        AtomicInteger active = new AtomicInteger(0);
636
637        // start receiver
638        HybridReducerReceiverEC<C> receiver = new HybridReducerReceiverEC<C>(/*threadsPerNode,*/finner,
639                        active, pairChannel, theList, pairlist);
640        receiver.start();
641
642        Pair<C> pair;
643        //boolean set = false;
644        boolean goon = true;
645        //int polIndex = -1;
646        int red = 0;
647        int sleeps = 0;
648
649        // while more requests
650        while (goon) {
651            // receive request if thread is reported incactive
652            logger.info("receive request");
653            Object req = null;
654            try {
655                req = pairChannel.receive(pairTag);
656            } catch (InterruptedException e) {
657                goon = false;
658                //e.printStackTrace();
659            } catch (IOException e) {
660                goon = false;
661                //e.printStackTrace();
662            } catch (ClassNotFoundException e) {
663                goon = false;
664                e.printStackTrace();
665            }
666            logger.info("received request, req = {}", req);
667            if (req == null) {
668                goon = false;
669                break;
670            }
671            if (!(req instanceof GBTransportMessReq)) {
672                goon = false;
673                break;
674            }
675
676            // find pair and manage termination status
677            logger.debug("find pair");
678            while (!pairlist.hasNext()) { // wait
679                if (!finner.hasJobs() && !pairlist.hasNext()) {
680                    goon = false;
681                    break;
682                }
683                try {
684                    sleeps++;
685                    if (sleeps % 3 == 0) {
686                        logger.info("waiting for reducers, remaining = {}", finner);
687                    }
688                    Thread.sleep(100);
689                } catch (InterruptedException e) {
690                    goon = false;
691                    break;
692                }
693            }
694            if (Thread.currentThread().isInterrupted()) {
695                goon = false;
696                break;
697            }
698            if (!pairlist.hasNext() && !finner.hasJobs()) {
699                logger.info("termination detection: no pairs and no jobs left");
700                goon = false;
701                break; //continue; //break?
702            }
703            finner.notIdle(); // before pairlist get!!
704            pair = pairlist.removeNext();
705            // send pair to client, even if null
706            if (logger.isInfoEnabled()) {
707                logger.info("active count = {}", active.get());
708                logger.info("send pair = {}", pair);
709            }
710            GBTransportMess msg = null;
711            if (pair != null) {
712                msg = new GBTransportMessPairIndex(pair); //,pairlist.size()-1); // size-1
713            } else {
714                msg = new GBTransportMess(); // not End(); at this time
715                // goon ?= false;
716            }
717            try {
718                red++;
719                pairChannel.send(pairTag, msg);
720                @SuppressWarnings("unused")
721                int a = active.getAndIncrement();
722            } catch (IOException e) {
723                e.printStackTrace();
724                goon = false;
725                break;
726            }
727            //logger.debug("#distributed list = {}", theList.size());
728        }
729        logger.info("terminated, send {} reduction pairs", red);
730
731        /*
732         * send end mark to clients
733         */
734        logger.debug("send end");
735        try {
736            for (int i = 0; i < threadsPerNode; i++) { // -1
737                //do not wait: Object rq = pairChannel.receive(pairTag);
738                pairChannel.send(pairTag, new GBTransportMessEnd());
739            }
740            // send also end to receiver
741            pairChannel.send(resultTag, new GBTransportMessEnd());
742            //beware of race condition 
743        } catch (IOException e) {
744            if (logger.isDebugEnabled()) {
745                e.printStackTrace();
746            }
747        }
748        receiver.terminate();
749
750        int d = active.get();
751        if (d > 0) {
752            logger.info("remaining active tasks = {}", d);
753        }
754        //logger.info("terminated, send {} reduction pairs", red);
755        pairChannel.close();
756        logger.debug("redServ pairChannel.close()");
757        finner.release();
758
759        channel.close();
760        logger.info("redServ channel.close()");
761    }
762}
763
764
765/**
766 * Distributed server receiving worker thread.
767 * @param <C> coefficient type
768 */
769class HybridReducerReceiverEC<C extends RingElem<C>> extends Thread {
770
771
772    private static final Logger logger = LogManager.getLogger(HybridReducerReceiverEC.class);
773
774
775    private static final boolean debug = logger.isDebugEnabled();
776
777
778    private final DistHashTable<Integer, GenPolynomial<C>> theList;
779
780
781    private final PairList<C> pairlist;
782
783
784    private final TaggedSocketChannel pairChannel;
785
786
787    private final Terminator finner;
788
789
790    //private final int threadsPerNode;
791
792
793    private final AtomicInteger active;
794
795
796    private volatile boolean goon;
797
798
799    /**
800     * Message tag for pairs.
801     */
802    public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag;
803
804
805    /**
806     * Message tag for results.
807     */
808    public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag;
809
810
811    /**
812     * Message tag for acknowledgments.
813     */
814    public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag;
815
816
817    /**
818     * Constructor.
819     * @param fin terminator
820     * @param a active remote tasks count
821     * @param pc tagged socket channel
822     * @param dl distributed hash table
823     * @param L ordered pair list
824     */
825    //param tpn number of threads per node
826    HybridReducerReceiverEC(/*int tpn,*/Terminator fin, AtomicInteger a, TaggedSocketChannel pc,
827                    DistHashTable<Integer, GenPolynomial<C>> dl, PairList<C> L) {
828        active = a;
829        //threadsPerNode = tpn;
830        finner = fin;
831        pairChannel = pc;
832        theList = dl;
833        pairlist = L;
834        goon = true;
835        //logger.info("reducer server created {}", this);
836    }
837
838
839    /**
840     * Work loop.
841     * @see java.lang.Thread#run()
842     */
843    @Override
844    public void run() {
845        //Pair<C> pair = null;
846        GenPolynomial<C> H = null;
847        int red = 0;
848        int polIndex = -1;
849        //Integer senderId; // obsolete
850
851        // while more requests
852        while (goon) {
853            // receive request
854            logger.info("receive result");
855            //senderId = null;
856            Object rh = null;
857            try {
858                rh = pairChannel.receive(resultTag);
859                @SuppressWarnings("unused")
860                int i = active.getAndDecrement();
861            } catch (InterruptedException e) {
862                goon = false;
863                //e.printStackTrace();
864                //?? finner.initIdle(1);
865                break;
866            } catch (IOException e) {
867                //e.printStackTrace();
868                goon = false;
869                //finner.initIdle(1);
870                break;
871            } catch (ClassNotFoundException e) {
872                e.printStackTrace();
873                goon = false;
874                finner.initIdle(1);
875                break;
876            }
877            logger.info("received H polynomial {}", rh);
878            if (rh == null) {
879                if (this.isInterrupted()) {
880                    goon = false;
881                    finner.initIdle(1);
882                    break;
883                }
884                //finner.initIdle(1);
885            } else if (rh instanceof GBTransportMessEnd) { // should only happen from server
886                logger.info("received GBTransportMessEnd");
887                goon = false;
888                //?? finner.initIdle(1);
889                break;
890            } else if (rh instanceof GBTransportMessPoly) {
891                // update pair list
892                red++;
893                @SuppressWarnings("unchecked")
894                GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh;
895                H = mpi.pol;
896                //senderId = mpi.threadId;
897                if (H != null) {
898                    if (logger.isInfoEnabled()) {
899                        logger.info("H = {}", H.leadingExpVector());
900                    }
901                    if (!H.isZERO()) {
902                        if (H.isONE()) {
903                            // finner.allIdle();
904                            polIndex = pairlist.putOne();
905                            theList.putWait(Integer.valueOf(polIndex), H);
906                            //goon = false; must wait for other clients
907                            //finner.initIdle(1);
908                            //break;
909                        } else {
910                            polIndex = pairlist.put(H);
911                            // use putWait ? but still not all distributed
912                            //GenPolynomial<C> nn = 
913                            theList.putWait(Integer.valueOf(polIndex), H);
914                        }
915                    }
916                }
917            }
918            // only after recording in pairlist !
919            finner.initIdle(1);
920            try {
921                pairChannel.send(ackTag, new GBTransportMess());
922                logger.debug("send acknowledgement");
923            } catch (IOException e) {
924                e.printStackTrace();
925                goon = false;
926                break;
927            }
928        } // end while
929        goon = false;
930        logger.info("terminated, received {} reductions", red);
931    }
932
933
934    /**
935     * Terminate.
936     */
937    public void terminate() {
938        goon = false;
939        //this.interrupt();
940        try {
941            this.join();
942        } catch (InterruptedException e) {
943            // unfug Thread.currentThread().interrupt();
944        }
945        logger.debug("HybridReducerReceiver terminated");
946    }
947
948}
949
950
951/**
952 * Distributed clients reducing worker threads.
953 */
954class HybridReducerClientEC<C extends RingElem<C>> implements Runnable {
955
956
957    private static final Logger logger = LogManager.getLogger(HybridReducerClientEC.class);
958
959
960    private static final boolean debug = logger.isDebugEnabled();
961
962
963    private final TaggedSocketChannel pairChannel;
964
965
966    private final DistHashTable<Integer, GenPolynomial<C>> theList;
967
968
969    private final ReductionPar<C> red;
970
971
972    //private final int threadsPerNode;
973
974
975    /*
976     * Identification number for this thread.
977     */
978    //public final Integer threadId; // obsolete
979
980
981    /**
982     * Message tag for pairs.
983     */
984    public final Integer pairTag = GroebnerBaseDistributedHybridEC.pairTag;
985
986
987    /**
988     * Message tag for results.
989     */
990    public final Integer resultTag = GroebnerBaseDistributedHybridEC.resultTag;
991
992
993    /**
994     * Message tag for acknowledgments.
995     */
996    public final Integer ackTag = GroebnerBaseDistributedHybridEC.ackTag;
997
998
999    /**
1000     * Constructor.
1001     * @param tc tagged socket channel
1002     * @param dl distributed hash table
1003     */
1004    //param tpn number of threads per node
1005    //param tid thread identification
1006    HybridReducerClientEC(/*int tpn,*/TaggedSocketChannel tc, /*Integer tid,*/
1007                    DistHashTable<Integer, GenPolynomial<C>> dl) {
1008        //threadsPerNode = tpn;
1009        pairChannel = tc;
1010        //threadId = 100 + tid; // keep distinct from other tags
1011        theList = dl;
1012        red = new ReductionPar<C>();
1013    }
1014
1015
1016    /**
1017     * Work loop.
1018     * @see java.lang.Runnable#run()
1019     */
1020    @Override
1021    public void run() {
1022        if (logger.isInfoEnabled()) {
1023            logger.info("pairChannel   = {} reducer client running", pairChannel);
1024        }
1025        Pair<C> pair = null;
1026        GenPolynomial<C> pi, pj, ps;
1027        GenPolynomial<C> S;
1028        GenPolynomial<C> H = null;
1029        //boolean set = false;
1030        boolean goon = true;
1031        boolean doEnd = true;
1032        int reduction = 0;
1033        //int sleeps = 0;
1034        Integer pix, pjx, psx;
1035
1036        while (goon) {
1037            /* protocol:
1038             * request pair, process pair, send result, receive acknowledgment
1039             */
1040            // pair = (Pair) pairlist.removeNext();
1041            Object req = new GBTransportMessReq();
1042            logger.info("send request");
1043            try {
1044                pairChannel.send(pairTag, req);
1045            } catch (IOException e) {
1046                goon = false;
1047                if (logger.isDebugEnabled()) {
1048                    e.printStackTrace();
1049                }
1050                logger.info("receive pair, IOexception ");
1051                break;
1052            }
1053            logger.info("receive on pairTag, goon = {}", goon);
1054            doEnd = true;
1055            Object pp = null;
1056            try {
1057                pp = pairChannel.receive(pairTag);
1058            } catch (InterruptedException e) {
1059                goon = false;
1060                e.printStackTrace();
1061            } catch (IOException e) {
1062                goon = false;
1063                if (logger.isDebugEnabled()) {
1064                    e.printStackTrace();
1065                }
1066                break;
1067            } catch (ClassNotFoundException e) {
1068                goon = false;
1069                e.printStackTrace();
1070            }
1071            if (logger.isInfoEnabled()) {
1072                logger.info("received pair = {}", pp);
1073            }
1074            H = null;
1075            if (pp == null) { // should not happen
1076                continue;
1077            }
1078            if (pp instanceof GBTransportMessEnd) {
1079                goon = false;
1080                //doEnd = false;
1081                continue;
1082            }
1083            if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) {
1084                pi = pj = ps = null;
1085                if (pp instanceof GBTransportMessPair) { // obsolet, for tests
1086                    @SuppressWarnings("unchecked")
1087                    GBTransportMessPair<C> tmp = (GBTransportMessPair<C>) pp;
1088                    pair = tmp.pair;
1089                    if (pair != null) {
1090                        pi = pair.pi;
1091                        pj = pair.pj;
1092                        //logger.debug("pair: pix = {}, pjx = {}", pair.i, pair.j);
1093                    }
1094                }
1095                if (pp instanceof GBTransportMessPairIndex) {
1096                    GBTransportMessPairIndex tmpi = (GBTransportMessPairIndex) pp;
1097                    pix = tmpi.i;
1098                    pjx = tmpi.j;
1099                    psx = tmpi.s;
1100                    pi = theList.getWait(pix);
1101                    pj = theList.getWait(pjx);
1102                    ps = theList.getWait(psx);
1103                    //logger.info("pix = {}, pjx = {}, psx = {}", pix, pjx, psx);
1104                }
1105                if (pi != null && pj != null) {
1106                    S = red.SPolynomial(pi, pj);
1107                    //logger.info("ht(S) = {}", S.leadingExpVector());
1108                    if (S.isZERO()) {
1109                        // pair.setZero(); does not work in dist
1110                    } else {
1111                        if (logger.isDebugEnabled()) {
1112                            logger.debug("ht(S) = {}", S.leadingExpVector());
1113                        }
1114                        H = red.normalform(theList, S);
1115                        //logger.info("ht(H) = {}", H.leadingExpVector());
1116                        reduction++;
1117                        if (H.isZERO()) {
1118                            // pair.setZero(); does not work in dist
1119                        } else {
1120                            H = H.monic();
1121                            if (logger.isInfoEnabled()) {
1122                                logger.info("ht(H) = {}", H.leadingExpVector());
1123                            }
1124                        }
1125                    }
1126                } else {
1127                    logger.info("pi = {}, pj = {}, ps = {}", pi, pj, ps);
1128                }
1129            }
1130            if (pp instanceof GBTransportMess) {
1131                logger.debug("null pair results in null H poly");
1132            }
1133
1134            // send H or must send null, if not at end
1135            if (logger.isDebugEnabled()) {
1136                logger.debug("#distributed list = {}", theList.size());
1137                logger.debug("send H polynomial = {}", H);
1138            }
1139            try {
1140                pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId));
1141                doEnd = false;
1142            } catch (IOException e) {
1143                goon = false;
1144                e.printStackTrace();
1145            }
1146            //logger.info("done send poly message of {}", pp);
1147            try {
1148                //pp = pairChannel.receive(threadId);
1149                pp = pairChannel.receive(ackTag);
1150            } catch (InterruptedException e) {
1151                goon = false;
1152                e.printStackTrace();
1153            } catch (IOException e) {
1154                goon = false;
1155                if (logger.isDebugEnabled()) {
1156                    e.printStackTrace();
1157                }
1158                break;
1159            } catch (ClassNotFoundException e) {
1160                goon = false;
1161                e.printStackTrace();
1162            }
1163            if (!(pp instanceof GBTransportMess)) {
1164                logger.error("invalid acknowledgement {}", pp);
1165            }
1166            logger.info("received acknowledgment ");
1167        }
1168        logger.info("terminated, {} reductions, {} polynomials", reduction, theList.size());
1169        if (doEnd) {
1170            try {
1171                pairChannel.send(resultTag, new GBTransportMessEnd());
1172            } catch (IOException e) {
1173                //e.printStackTrace();
1174            }
1175            logger.debug("terminated, send done");
1176        }
1177    }
1178}
1179
1180
1181/**
1182 * Objects of this class are to be send to a ExecutableServer.
1183 */
1184class GBHybridExerClient<C extends RingElem<C>> implements RemoteExecutable {
1185
1186
1187    String host;
1188
1189
1190    int port;
1191
1192
1193    int dhtport;
1194
1195
1196    int threadsPerNode;
1197
1198
1199    /**
1200     * GBHybridExerClient.
1201     * @param host
1202     * @param port
1203     * @param dhtport
1204     */
1205    public GBHybridExerClient(String host, int threadsPerNode, int port, int dhtport) {
1206        this.host = host;
1207        this.threadsPerNode = threadsPerNode;
1208        this.port = port;
1209        this.dhtport = dhtport;
1210    }
1211
1212
1213    /**
1214     * run.
1215     */
1216    public void run() {
1217        try {
1218            GroebnerBaseDistributedHybridEC.<C> clientPart(host, threadsPerNode, port, dhtport);
1219        } catch (Exception e) {
1220            e.printStackTrace();
1221        }
1222    }
1223
1224
1225    /**
1226     * String representation.
1227     */
1228    @Override
1229    public String toString() {
1230        StringBuffer s = new StringBuffer("GBHybridExerClient(");
1231        s.append("host=" + host);
1232        s.append(", threadsPerNode=" + threadsPerNode);
1233        s.append(", port=" + port);
1234        s.append(", dhtport=" + dhtport);
1235        s.append(")");
1236        return s.toString();
1237    }
1238
1239}