001/* 002 * $Id$ 003 */ 004 005package edu.jas.gb; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Collections; 011import java.util.List; 012import java.util.ListIterator; 013import java.util.concurrent.atomic.AtomicInteger; 014 015import mpi.Comm; 016import mpi.MPIException; 017 018import org.apache.logging.log4j.Logger; 019import org.apache.logging.log4j.LogManager; 020 021import edu.jas.kern.MPIEngine; 022import edu.jas.poly.ExpVector; 023import edu.jas.poly.GenPolynomial; 024import edu.jas.structure.RingElem; 025import edu.jas.util.DistHashTableMPI; 026import edu.jas.util.MPIChannel; 027import edu.jas.util.Terminator; 028import edu.jas.util.ThreadPool; 029 030 031/** 032 * Groebner Base distributed hybrid algorithm with MPI. Implements a distributed 033 * memory with multi-core CPUs parallel version of Groebner bases with MPI. 034 * Using pairlist class, distributed multi-threaded tasks do reduction, one 035 * communication channel per remote node. 036 * @param <C> coefficient type 037 * @author Heinz Kredel 038 */ 039 040public class GroebnerBaseDistributedHybridMPI<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 041 042 043 private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedHybridMPI.class); 044 045 046 public final boolean debug = logger.isDebugEnabled(); 047 048 049 /** 050 * Number of threads to use. 051 */ 052 protected final int threads; 053 054 055 /** 056 * Default number of threads. 057 */ 058 protected static final int DEFAULT_THREADS = 2; 059 060 061 /** 062 * Number of threads per node to use. 063 */ 064 protected final int threadsPerNode; 065 066 067 /** 068 * Default number of threads per compute node. 069 */ 070 protected static final int DEFAULT_THREADS_PER_NODE = 1; 071 072 073 /** 074 * Pool of threads to use. 075 */ 076 //protected final ExecutorService pool; // not for single node tests 077 protected transient final ThreadPool pool; 078 079 080 /* 081 * Underlying MPI engine. 082 */ 083 protected transient final Comm engine; 084 085 086 /** 087 * Message tag for pairs. 088 */ 089 public static final int pairTag = GroebnerBaseDistributedHybridEC.pairTag.intValue(); 090 091 092 /** 093 * Message tag for results. 094 */ 095 public static final int resultTag = GroebnerBaseDistributedHybridEC.resultTag.intValue(); 096 097 098 /** 099 * Message tag for acknowledgments. 100 */ 101 public static final int ackTag = GroebnerBaseDistributedHybridEC.ackTag.intValue(); 102 103 104 /** 105 * Constructor. 106 */ 107 public GroebnerBaseDistributedHybridMPI() throws IOException { 108 this(DEFAULT_THREADS); 109 } 110 111 112 /** 113 * Constructor. 114 * @param threads number of threads to use. 115 */ 116 public GroebnerBaseDistributedHybridMPI(int threads) throws IOException { 117 this(threads, new ThreadPool(threads)); 118 } 119 120 121 /** 122 * Constructor. 123 * @param threads number of threads to use. 124 * @param threadsPerNode threads per node to use. 125 */ 126 public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode) throws IOException { 127 this(threads, threadsPerNode, new ThreadPool(threads)); 128 } 129 130 131 /** 132 * Constructor. 133 * @param threads number of threads to use. 134 * @param pool ThreadPool to use. 135 */ 136 public GroebnerBaseDistributedHybridMPI(int threads, ThreadPool pool) throws IOException { 137 this(threads, DEFAULT_THREADS_PER_NODE, pool); 138 } 139 140 141 /** 142 * Constructor. 143 * @param threads number of threads to use. 144 * @param threadsPerNode threads per node to use. 145 * @param pl pair selection strategy 146 */ 147 public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode, PairList<C> pl) 148 throws IOException { 149 this(threads, threadsPerNode, new ThreadPool(threads), pl); 150 } 151 152 153 /** 154 * Constructor. 155 * @param threads number of threads to use. 156 * @param threadsPerNode threads per node to use. 157 */ 158 public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode, ThreadPool pool) 159 throws IOException { 160 this(threads, threadsPerNode, pool, new OrderedPairlist<C>()); 161 } 162 163 164 /** 165 * Constructor. 166 * @param threads number of threads to use. 167 * @param threadsPerNode threads per node to use. 168 * @param pool ThreadPool to use. 169 * @param pl pair selection strategy 170 */ 171 public GroebnerBaseDistributedHybridMPI(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl) 172 throws IOException { 173 super(new ReductionPar<C>(), pl); 174 int size = 0; 175 try { 176 engine = MPIEngine.getCommunicator(); 177 size = engine.Size(); 178 } catch (MPIException e) { 179 throw new IOException(e); 180 } 181 if (size < 2) { 182 throw new IllegalArgumentException("Minimal 2 MPI processes required, not " + size); 183 } 184 if (threads != size || pool.getNumber() != size) { 185 throw new IllegalArgumentException("threads != size: " + threads + " != " + size + ", #pool " 186 + pool.getNumber()); 187 } 188 this.threads = threads; 189 this.pool = pool; 190 this.threadsPerNode = threadsPerNode; 191 //logger.info("generated pool: " + pool); 192 } 193 194 195 /** 196 * Cleanup and terminate. 197 */ 198 @Override 199 public void terminate() { 200 if (pool == null) { 201 return; 202 } 203 //pool.terminate(); 204 pool.cancel(); 205 } 206 207 208 /** 209 * Distributed Groebner base. 210 * @param modv number of module variables. 211 * @param F polynomial list. 212 * @return GB(F) a Groebner base of F or null, if a IOException occurs or on 213 * MPI client part. 214 */ 215 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 216 try { 217 if (engine.Rank() == 0) { 218 return GBmaster(modv, F); 219 } 220 } catch (MPIException e) { 221 logger.info("GBmaster: " + e); 222 e.printStackTrace(); 223 return null; 224 } catch (IOException e) { 225 logger.info("GBmaster: " + e); 226 e.printStackTrace(); 227 return null; 228 } 229 pool.terminate(); // not used on clients 230 try { 231 clientPart(0); // only 0 232 } catch (IOException e) { 233 logger.info("clientPart: " + e); 234 e.printStackTrace(); 235 } catch (MPIException e) { 236 logger.info("clientPart: " + e); 237 e.printStackTrace(); 238 } 239 return null; 240 } 241 242 243 /** 244 * Distributed hybrid Groebner base. 245 * @param modv number of module variables. 246 * @param F polynomial list. 247 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 248 */ 249 public List<GenPolynomial<C>> GBmaster(int modv, List<GenPolynomial<C>> F) throws MPIException, 250 IOException { 251 long t = System.currentTimeMillis(); 252 GenPolynomial<C> p; 253 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 254 PairList<C> pairlist = null; 255 boolean oneInGB = false; 256 //int l = F.size(); 257 int unused = 0; 258 ListIterator<GenPolynomial<C>> it = F.listIterator(); 259 while (it.hasNext()) { 260 p = it.next(); 261 if (p.length() > 0) { 262 p = p.monic(); 263 if (p.isONE()) { 264 oneInGB = true; 265 G.clear(); 266 G.add(p); 267 //return G; must signal termination to others 268 } 269 if (!oneInGB) { 270 G.add(p); 271 } 272 if (pairlist == null) { 273 //pairlist = new OrderedPairlist<C>(modv, p.ring); 274 pairlist = strategy.create(modv, p.ring); 275 if (!p.ring.coFac.isField()) { 276 throw new IllegalArgumentException("coefficients not from a field"); 277 } 278 } 279 // theList not updated here 280 if (p.isONE()) { 281 unused = pairlist.putOne(); 282 } else { 283 unused = pairlist.put(p); 284 } 285 } else { 286 //l--; 287 } 288 } 289 //if (l <= 1) { 290 //return G; must signal termination to others 291 //} 292 logger.info("pairlist " + pairlist + ": " + unused); 293 294 logger.debug("looking for clients"); 295 DistHashTableMPI<Integer, GenPolynomial<C>> theList = new DistHashTableMPI<Integer, GenPolynomial<C>>( 296 engine); 297 theList.init(); 298 299 List<GenPolynomial<C>> al = pairlist.getList(); 300 for (int i = 0; i < al.size(); i++) { 301 // no wait required 302 GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i)); 303 if (nn != null) { 304 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 305 } 306 } 307 308 Terminator finner = new Terminator((threads - 1) * threadsPerNode); 309 HybridReducerServerMPI<C> R; 310 logger.info("using pool = " + pool); 311 for (int i = 1; i < threads; i++) { 312 MPIChannel chan = new MPIChannel(engine, i); // closed in server 313 R = new HybridReducerServerMPI<C>(i, threadsPerNode, finner, chan, theList, pairlist); 314 pool.addJob(R); 315 //logger.info("server submitted " + R); 316 } 317 logger.info("main loop waiting " + finner); 318 finner.waitDone(); 319 int ps = theList.size(); 320 logger.info("#distributed list = " + ps); 321 // make sure all polynomials arrived: not needed in master 322 G = pairlist.getList(); 323 if (ps != G.size()) { 324 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 325 } 326 for (GenPolynomial<C> q : theList.getValueList()) { 327 if (q != null && !q.isZERO()) { 328 logger.debug("final q = " + q.leadingExpVector()); 329 } 330 } 331 logger.debug("distributed list end"); 332 long time = System.currentTimeMillis(); 333 List<GenPolynomial<C>> Gp; 334 Gp = minimalGB(G); // not jet distributed but threaded 335 time = System.currentTimeMillis() - time; 336 logger.debug("parallel gbmi time = " + time); 337 G = Gp; 338 logger.info("server theList.terminate() " + theList.size()); 339 theList.terminate(); 340 t = System.currentTimeMillis() - t; 341 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 342 return G; 343 } 344 345 346 /** 347 * GB distributed client. 348 * @param rank of the MPI where the server runs on. 349 * @throws IOException 350 */ 351 public void clientPart(int rank) throws IOException, MPIException { 352 if (rank != 0) { 353 throw new UnsupportedOperationException("only master at rank 0 implemented: " + rank); 354 } 355 Comm engine = MPIEngine.getCommunicator(); 356 357 DistHashTableMPI<Integer, GenPolynomial<C>> theList = new DistHashTableMPI<Integer, GenPolynomial<C>>( 358 engine); 359 theList.init(); 360 361 MPIChannel chan = new MPIChannel(engine, rank); 362 363 ThreadPool pool = new ThreadPool(threadsPerNode); 364 logger.info("client using pool = " + pool); 365 for (int i = 0; i < threadsPerNode; i++) { 366 HybridReducerClientMPI<C> Rr = new HybridReducerClientMPI<C>(chan, theList); // i 367 pool.addJob(Rr); 368 } 369 if (debug) { 370 logger.info("clients submitted"); 371 } 372 pool.terminate(); 373 logger.info("client pool.terminate()"); 374 chan.close(); 375 theList.terminate(); 376 return; 377 } 378 379 380 /** 381 * Minimal ordered groebner basis. 382 * @param Fp a Groebner base. 383 * @return a reduced Groebner base of Fp. 384 */ 385 @SuppressWarnings("unchecked") 386 @Override 387 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 388 GenPolynomial<C> a; 389 ArrayList<GenPolynomial<C>> G; 390 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 391 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 392 while (it.hasNext()) { 393 a = it.next(); 394 if (a.length() != 0) { // always true 395 // already monic a = a.monic(); 396 G.add(a); 397 } 398 } 399 if (G.size() <= 1) { 400 return G; 401 } 402 403 ExpVector e; 404 ExpVector f; 405 GenPolynomial<C> p; 406 ArrayList<GenPolynomial<C>> F; 407 F = new ArrayList<GenPolynomial<C>>(G.size()); 408 boolean mt; 409 410 while (G.size() > 0) { 411 a = G.remove(0); 412 e = a.leadingExpVector(); 413 414 it = G.listIterator(); 415 mt = false; 416 while (it.hasNext() && !mt) { 417 p = it.next(); 418 f = p.leadingExpVector(); 419 mt = e.multipleOf(f); 420 } 421 it = F.listIterator(); 422 while (it.hasNext() && !mt) { 423 p = it.next(); 424 f = p.leadingExpVector(); 425 mt = e.multipleOf(f); 426 } 427 if (!mt) { 428 F.add(a); 429 } else { 430 // System.out.println("dropped " + a.length()); 431 } 432 } 433 G = F; 434 if (G.size() <= 1) { 435 return G; 436 } 437 Collections.reverse(G); // important for lex GB 438 439 MiMPIReducerServer<C>[] mirs = (MiMPIReducerServer<C>[]) new MiMPIReducerServer[G.size()]; 440 int i = 0; 441 F = new ArrayList<GenPolynomial<C>>(G.size()); 442 while (G.size() > 0) { 443 a = G.remove(0); 444 // System.out.println("doing " + a.length()); 445 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 446 R.addAll(G); 447 R.addAll(F); 448 mirs[i] = new MiMPIReducerServer<C>(R, a); 449 pool.addJob(mirs[i]); 450 i++; 451 F.add(a); 452 } 453 G = F; 454 F = new ArrayList<GenPolynomial<C>>(G.size()); 455 for (i = 0; i < mirs.length; i++) { 456 a = mirs[i].getNF(); 457 F.add(a); 458 } 459 return F; 460 } 461 462} 463 464 465/** 466 * Distributed server reducing worker proxy threads. 467 * @param <C> coefficient type 468 */ 469 470class HybridReducerServerMPI<C extends RingElem<C>> implements Runnable { 471 472 473 private static final Logger logger = LogManager.getLogger(HybridReducerServerMPI.class); 474 475 476 public final boolean debug = logger.isDebugEnabled(); 477 478 479 private final Terminator finner; 480 481 482 private final MPIChannel pairChannel; 483 484 485 //protected transient final Comm engine; 486 487 488 private final DistHashTableMPI<Integer, GenPolynomial<C>> theList; 489 490 491 private final PairList<C> pairlist; 492 493 494 private final int threadsPerNode; 495 496 497 final int rank; 498 499 500 /** 501 * Message tag for pairs. 502 */ 503 public static final int pairTag = GroebnerBaseDistributedHybridMPI.pairTag; 504 505 506 /** 507 * Constructor. 508 * @param r MPI rank of partner. 509 * @param tpn number of threads per node 510 * @param fin terminator 511 * @param chan MPIChannel 512 * @param dl distributed hash table 513 * @param L ordered pair list 514 */ 515 HybridReducerServerMPI(int r, int tpn, Terminator fin, MPIChannel chan, 516 DistHashTableMPI<Integer, GenPolynomial<C>> dl, PairList<C> L) { 517 rank = r; 518 threadsPerNode = tpn; 519 finner = fin; 520 this.pairChannel = chan; 521 theList = dl; 522 pairlist = L; 523 //logger.info("reducer server created " + this); 524 } 525 526 527 /** 528 * Work loop. 529 * @see java.lang.Runnable#run() 530 */ 531 @Override 532 @SuppressWarnings("unchecked") 533 public void run() { 534 //logger.info("reducer server running with " + engine); 535 // try { 536 // pairChannel = new MPIChannel(engine, rank); //,pairTag 537 // } catch (IOException e) { 538 // e.printStackTrace(); 539 // return; 540 // } catch (MPIException e) { 541 // e.printStackTrace(); 542 // return; 543 // } 544 if (logger.isInfoEnabled()) { 545 logger.info("reducer server running: pairChannel = " + pairChannel); 546 } 547 // record idle remote workers (minus one?) 548 //finner.beIdle(threadsPerNode-1); 549 finner.initIdle(threadsPerNode); 550 AtomicInteger active = new AtomicInteger(0); 551 552 // start receiver 553 HybridReducerReceiverMPI<C> receiver = new HybridReducerReceiverMPI<C>(rank, finner, active, 554 pairChannel, theList, pairlist); 555 receiver.start(); 556 557 Pair<C> pair; 558 //boolean set = false; 559 boolean goon = true; 560 //int polIndex = -1; 561 int red = 0; 562 int sleeps = 0; 563 564 // while more requests 565 while (goon) { 566 // receive request if thread is reported incactive 567 logger.debug("receive request"); 568 Object req = null; 569 try { 570 req = pairChannel.receive(pairTag); 571 //} catch (InterruptedException e) { 572 //goon = false; 573 //e.printStackTrace(); 574 } catch (IOException e) { 575 goon = false; 576 e.printStackTrace(); 577 } catch (MPIException e) { 578 e.printStackTrace(); 579 return; 580 } catch (ClassNotFoundException e) { 581 goon = false; 582 e.printStackTrace(); 583 } 584 logger.debug("received request, req = " + req); 585 if (req == null) { 586 goon = false; 587 break; 588 } 589 if (!(req instanceof GBTransportMessReq)) { 590 goon = false; 591 break; 592 } 593 594 // find pair and manage termination status 595 logger.debug("find pair"); 596 while (!pairlist.hasNext()) { // wait 597 if (!finner.hasJobs() && !pairlist.hasNext()) { 598 goon = false; 599 break; 600 } 601 try { 602 sleeps++; 603 if (sleeps % 3 == 0) { 604 logger.info("waiting for reducers, remaining = " + finner.getJobs()); 605 } 606 Thread.sleep(100); 607 } catch (InterruptedException e) { 608 goon = false; 609 break; 610 } 611 } 612 if (!pairlist.hasNext() && !finner.hasJobs()) { 613 logger.info("termination detection: no pairs and no jobs left"); 614 goon = false; 615 break; //continue; //break? 616 } 617 finner.notIdle(); // before pairlist get!! 618 pair = pairlist.removeNext(); 619 // send pair to client, even if null 620 if (debug) { 621 logger.info("active count = " + active.get()); 622 logger.info("send pair = " + pair); 623 } 624 GBTransportMess msg = null; 625 if (pair != null) { 626 msg = new GBTransportMessPairIndex(pair); 627 } else { 628 msg = new GBTransportMess(); //not End(); at this time 629 // goon ?= false; 630 } 631 try { 632 red++; 633 pairChannel.send(pairTag, msg); 634 @SuppressWarnings("unused") 635 int a = active.getAndIncrement(); 636 } catch (IOException e) { 637 e.printStackTrace(); 638 goon = false; 639 break; 640 } catch (MPIException e) { 641 e.printStackTrace(); 642 goon = false; 643 break; 644 } 645 //logger.debug("#distributed list = " + theList.size()); 646 } 647 logger.info("terminated, send " + red + " reduction pairs"); 648 649 /* 650 * send end mark to clients 651 */ 652 logger.debug("send end"); 653 try { 654 for (int i = 0; i < threadsPerNode; i++) { // -1 655 pairChannel.send(pairTag, new GBTransportMessEnd()); 656 } 657 logger.info("sent end to clients"); 658 // send also end to receiver, no more 659 //pairChannel.send(resultTag, new GBTransportMessEnd(), engine.Rank()); 660 } catch (IOException e) { 661 if (logger.isDebugEnabled()) { 662 e.printStackTrace(); 663 } 664 } catch (MPIException e) { 665 e.printStackTrace(); 666 } 667 int d = active.get(); 668 if (d > 0) { 669 logger.info("remaining active tasks = " + d); 670 } 671 receiver.terminate(); 672 //logger.info("terminated, send " + red + " reduction pairs"); 673 pairChannel.close(); 674 logger.info("redServ pairChannel.close()"); 675 finner.release(); 676 } 677} 678 679 680/** 681 * Distributed server receiving worker thread. 682 * @param <C> coefficient type 683 */ 684 685class HybridReducerReceiverMPI<C extends RingElem<C>> extends Thread { 686 687 688 private static final Logger logger = LogManager.getLogger(HybridReducerReceiverMPI.class); 689 690 691 public final boolean debug = logger.isDebugEnabled(); 692 693 694 private final DistHashTableMPI<Integer, GenPolynomial<C>> theList; 695 696 697 private final PairList<C> pairlist; 698 699 700 private final MPIChannel pairChannel; 701 702 703 final int rank; 704 705 706 private final Terminator finner; 707 708 709 //private final int threadsPerNode; 710 711 712 private final AtomicInteger active; 713 714 715 private volatile boolean goon; 716 717 718 /** 719 * Message tag for results. 720 */ 721 public static final int resultTag = GroebnerBaseDistributedHybridMPI.resultTag; 722 723 724 /** 725 * Message tag for acknowledgments. 726 */ 727 public static final int ackTag = GroebnerBaseDistributedHybridMPI.ackTag; 728 729 730 /** 731 * Constructor. 732 * @param r MPI rank of partner. 733 * @param fin terminator 734 * @param a active remote tasks count 735 * @param pc tagged socket channel 736 * @param dl distributed hash table 737 * @param L ordered pair list 738 */ 739 HybridReducerReceiverMPI(int r, Terminator fin, AtomicInteger a, MPIChannel pc, 740 DistHashTableMPI<Integer, GenPolynomial<C>> dl, PairList<C> L) { 741 rank = r; 742 active = a; 743 //threadsPerNode = tpn; 744 finner = fin; 745 pairChannel = pc; 746 theList = dl; 747 pairlist = L; 748 goon = true; 749 //logger.info("reducer server created " + this); 750 } 751 752 753 /** 754 * Work loop. 755 * @see java.lang.Thread#run() 756 */ 757 @Override 758 @SuppressWarnings("unchecked") 759 public void run() { 760 //Pair<C> pair = null; 761 GenPolynomial<C> H = null; 762 int red = 0; 763 int polIndex = -1; 764 //Integer senderId; // obsolete 765 766 // while more requests 767 while (goon) { 768 // receive request 769 logger.debug("receive result"); 770 //senderId = null; 771 Object rh = null; 772 try { 773 rh = pairChannel.receive(resultTag); 774 @SuppressWarnings("unused") 775 int i = active.getAndDecrement(); 776 //} catch (InterruptedException e) { 777 //goon = false; 778 ////e.printStackTrace(); 779 ////?? finner.initIdle(1); 780 //break; 781 } catch (IOException e) { 782 e.printStackTrace(); 783 goon = false; 784 finner.initIdle(1); 785 break; 786 } catch (MPIException e) { 787 e.printStackTrace(); 788 goon = false; 789 finner.initIdle(1); 790 break; 791 } catch (ClassNotFoundException e) { 792 e.printStackTrace(); 793 goon = false; 794 finner.initIdle(1); 795 break; 796 } 797 logger.info("received result"); 798 if (rh == null) { 799 if (this.isInterrupted()) { 800 goon = false; 801 finner.initIdle(1); 802 break; 803 } 804 //finner.initIdle(1); 805 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 806 logger.info("received GBTransportMessEnd"); 807 goon = false; 808 //?? finner.initIdle(1); 809 break; 810 } else if (rh instanceof GBTransportMessPoly) { 811 // update pair list 812 red++; 813 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 814 H = mpi.pol; 815 //senderId = mpi.threadId; 816 if (H != null) { 817 if (logger.isInfoEnabled()) { // debug 818 logger.info("H = " + H.leadingExpVector()); 819 } 820 if (!H.isZERO()) { 821 if (H.isONE()) { 822 polIndex = pairlist.putOne(); 823 //GenPolynomial<C> nn = 824 theList.putWait(Integer.valueOf(polIndex), H); 825 //goon = false; must wait for other clients 826 //finner.initIdle(1); 827 //break; 828 } else { 829 polIndex = pairlist.put(H); 830 // use putWait ? but still not all distributed 831 //GenPolynomial<C> nn = 832 theList.putWait(Integer.valueOf(polIndex), H); 833 } 834 } 835 } 836 } 837 // only after recording in pairlist ! 838 finner.initIdle(1); 839 try { 840 pairChannel.send(ackTag, new GBTransportMess()); 841 logger.debug("send acknowledgement"); 842 } catch (IOException e) { 843 e.printStackTrace(); 844 goon = false; 845 break; 846 } catch (MPIException e) { 847 e.printStackTrace(); 848 goon = false; 849 break; 850 } 851 } // end while 852 goon = false; 853 logger.info("terminated, received " + red + " reductions"); 854 } 855 856 857 /** 858 * Terminate. 859 */ 860 public void terminate() { 861 goon = false; 862 try { 863 this.join(); 864 //this.interrupt(); 865 } catch (InterruptedException e) { 866 // unfug Thread.currentThread().interrupt(); 867 } 868 logger.info("terminate end"); 869 } 870 871} 872 873 874/** 875 * Distributed clients reducing worker threads. 876 */ 877 878class HybridReducerClientMPI<C extends RingElem<C>> implements Runnable { 879 880 881 private static final Logger logger = LogManager.getLogger(HybridReducerClientMPI.class); 882 883 884 public final boolean debug = logger.isDebugEnabled(); 885 886 887 private final MPIChannel pairChannel; 888 889 890 private final DistHashTableMPI<Integer, GenPolynomial<C>> theList; 891 892 893 private final ReductionPar<C> red; 894 895 896 //private final int threadsPerNode; 897 898 899 /* 900 * Identification number for this thread. 901 */ 902 //public final Integer threadId; // obsolete 903 904 905 /** 906 * Message tag for pairs. 907 */ 908 public static final int pairTag = GroebnerBaseDistributedHybridMPI.pairTag; 909 910 911 /** 912 * Message tag for results. 913 */ 914 public static final int resultTag = GroebnerBaseDistributedHybridMPI.resultTag; 915 916 917 /** 918 * Message tag for acknowledgments. 919 */ 920 public static final int ackTag = GroebnerBaseDistributedHybridMPI.ackTag; 921 922 923 /** 924 * Constructor. 925 * @param tc tagged socket channel 926 * @param dl distributed hash table 927 */ 928 HybridReducerClientMPI(MPIChannel tc, DistHashTableMPI<Integer, GenPolynomial<C>> dl) { 929 //this.threadsPerNode = tpn; 930 pairChannel = tc; 931 //threadId = 100 + tid; // keep distinct from other tags 932 theList = dl; 933 red = new ReductionPar<C>(); 934 } 935 936 937 /** 938 * Work loop. 939 * @see java.lang.Runnable#run() 940 */ 941 @Override 942 @SuppressWarnings("unchecked") 943 public void run() { 944 if (debug) { 945 logger.info("pairChannel = " + pairChannel + " reducer client running"); 946 } 947 Pair<C> pair = null; 948 GenPolynomial<C> pi, pj, ps; 949 GenPolynomial<C> S; 950 GenPolynomial<C> H = null; 951 //boolean set = false; 952 boolean goon = true; 953 boolean doEnd = true; 954 int reduction = 0; 955 //int sleeps = 0; 956 Integer pix, pjx, psx; 957 958 while (goon) { 959 /* protocol: 960 * request pair, process pair, send result, receive acknowledgment 961 */ 962 // pair = (Pair) pairlist.removeNext(); 963 Object req = new GBTransportMessReq(); 964 logger.debug("send request = " + req); 965 try { 966 pairChannel.send(pairTag, req); 967 } catch (IOException e) { 968 goon = false; 969 if (debug) { 970 e.printStackTrace(); 971 } 972 logger.info("receive pair, exception "); 973 break; 974 } catch (MPIException e) { 975 goon = false; 976 if (debug) { 977 e.printStackTrace(); 978 } 979 logger.info("receive pair, exception "); 980 break; 981 } 982 logger.debug("receive pair, goon = " + goon); 983 doEnd = true; 984 Object pp = null; 985 try { 986 pp = pairChannel.receive(pairTag); 987 //} catch (InterruptedException e) { 988 //goon = false; 989 //e.printStackTrace(); 990 } catch (IOException e) { 991 goon = false; 992 if (debug) { 993 e.printStackTrace(); 994 } 995 break; 996 } catch (MPIException e) { 997 goon = false; 998 if (debug) { 999 e.printStackTrace(); 1000 } 1001 break; 1002 } catch (ClassNotFoundException e) { 1003 goon = false; 1004 e.printStackTrace(); 1005 } 1006 if (debug) { 1007 logger.info("received pair = " + pp); 1008 } 1009 H = null; 1010 if (pp == null) { // should not happen 1011 continue; 1012 } 1013 if (pp instanceof GBTransportMessEnd) { 1014 goon = false; 1015 //doEnd = false; // bug 1016 continue; 1017 } 1018 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 1019 pi = pj = ps = null; 1020 if (pp instanceof GBTransportMessPair) { 1021 pair = ((GBTransportMessPair<C>) pp).pair; 1022 if (pair != null) { 1023 pi = pair.pi; 1024 pj = pair.pj; 1025 //logger.debug("pair: pix = " + pair.i 1026 // + ", pjx = " + pair.j); 1027 } 1028 } 1029 if (pp instanceof GBTransportMessPairIndex) { 1030 pix = ((GBTransportMessPairIndex) pp).i; 1031 pjx = ((GBTransportMessPairIndex) pp).j; 1032 psx = ((GBTransportMessPairIndex) pp).s; 1033 pi = theList.getWait(pix); 1034 pj = theList.getWait(pjx); 1035 ps = theList.getWait(psx); 1036 //logger.info("pix = " + pix + ", pjx = " +pjx + ", psx = " +psx); 1037 } 1038 1039 if (pi != null && pj != null) { 1040 S = red.SPolynomial(pi, pj); 1041 //System.out.println("S = " + S); 1042 logger.info("ht(S) = " + S.leadingExpVector()); 1043 if (S.isZERO()) { 1044 // pair.setZero(); does not work in dist 1045 H = S; 1046 } else { 1047 if (debug) { 1048 logger.debug("ht(S) = " + S.leadingExpVector()); 1049 } 1050 H = red.normalform(theList, S); 1051 reduction++; 1052 if (H.isZERO()) { 1053 // pair.setZero(); does not work in dist 1054 } else { 1055 H = H.monic(); 1056 if (logger.isInfoEnabled()) { 1057 logger.info("ht(H) = " + H.leadingExpVector()); 1058 } 1059 } 1060 } 1061 } else { 1062 logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps); 1063 } 1064 } 1065 if (pp instanceof GBTransportMess) { 1066 logger.debug("null pair results in null H poly"); 1067 } 1068 1069 // send H or must send null, if not at end 1070 if (debug) { 1071 logger.debug("#distributed list = " + theList.size()); 1072 logger.debug("send H polynomial = " + H); 1073 } 1074 try { 1075 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1076 doEnd = false; 1077 } catch (IOException e) { 1078 goon = false; 1079 e.printStackTrace(); 1080 } catch (MPIException e) { 1081 goon = false; 1082 e.printStackTrace(); 1083 } 1084 logger.debug("done send poly message of " + pp); 1085 try { 1086 pp = pairChannel.receive(ackTag); 1087 //} catch (InterruptedException e) { 1088 //goon = false; 1089 //e.printStackTrace(); 1090 } catch (IOException e) { 1091 goon = false; 1092 if (debug) { 1093 e.printStackTrace(); 1094 } 1095 break; 1096 } catch (MPIException e) { 1097 goon = false; 1098 if (debug) { 1099 e.printStackTrace(); 1100 } 1101 break; 1102 } catch (ClassNotFoundException e) { 1103 goon = false; 1104 e.printStackTrace(); 1105 } 1106 if (!(pp instanceof GBTransportMess)) { 1107 logger.error("invalid acknowledgement " + pp); 1108 } 1109 logger.debug("received acknowledgment " + pp); 1110 } 1111 logger.info("terminated, done " + reduction + " reductions"); 1112 if (doEnd) { 1113 try { 1114 pairChannel.send(resultTag, new GBTransportMessEnd()); 1115 } catch (IOException e) { 1116 //e.printStackTrace(); 1117 } catch (MPIException e) { 1118 //e.printStackTrace(); 1119 } 1120 logger.info("terminated, send done"); 1121 } 1122 } 1123}