001/* 002 * $Id: GroebnerBaseDistributedHybridMPJ.java 4952 2014-10-12 19:41:46Z axelclk 003 * $ 004 */ 005 006package edu.jas.gb; 007 008 009import java.io.IOException; 010import java.util.ArrayList; 011import java.util.Collections; 012import java.util.List; 013import java.util.ListIterator; 014import java.util.concurrent.atomic.AtomicInteger; 015 016import mpi.Comm; 017 018import org.apache.logging.log4j.Logger; 019import org.apache.logging.log4j.LogManager; 020 021import edu.jas.kern.MPJEngine; 022import edu.jas.poly.ExpVector; 023import edu.jas.poly.GenPolynomial; 024import edu.jas.poly.GenPolynomialRing; 025import edu.jas.poly.PolyUtil; 026import edu.jas.structure.RingElem; 027import edu.jas.util.DistHashTableMPJ; 028import edu.jas.util.MPJChannel; 029import edu.jas.util.Terminator; 030import edu.jas.util.ThreadPool; 031 032 033/** 034 * Groebner Base distributed hybrid algorithm with MPJ. Implements a distributed 035 * memory with multi-core CPUs parallel version of Groebner bases with MPJ. 036 * Using pairlist class, distributed multi-threaded tasks do reduction, one 037 * communication channel per remote node. 038 * @param <C> coefficient type 039 * @author Heinz Kredel 040 */ 041 042public class GroebnerBaseDistributedHybridMPJ<C extends RingElem<C>> extends GroebnerBaseAbstract<C> { 043 044 045 private static final Logger logger = LogManager.getLogger(GroebnerBaseDistributedHybridMPJ.class); 046 047 048 public final boolean debug = logger.isDebugEnabled(); 049 050 051 /** 052 * Number of threads to use. 053 */ 054 protected final int threads; 055 056 057 /** 058 * Default number of threads. 059 */ 060 protected static final int DEFAULT_THREADS = 2; 061 062 063 /** 064 * Number of threads per node to use. 065 */ 066 protected final int threadsPerNode; 067 068 069 /** 070 * Default number of threads per compute node. 071 */ 072 protected static final int DEFAULT_THREADS_PER_NODE = 1; 073 074 075 /** 076 * Pool of threads to use. 077 */ 078 //protected final ExecutorService pool; // not for single node tests 079 protected transient final ThreadPool pool; 080 081 082 /* 083 * Underlying MPJ engine. 084 */ 085 protected transient final Comm engine; 086 087 088 /** 089 * Message tag for pairs. 090 */ 091 public static final int pairTag = GroebnerBaseDistributedHybridEC.pairTag.intValue(); 092 093 094 /** 095 * Message tag for results. 096 */ 097 public static final int resultTag = GroebnerBaseDistributedHybridEC.resultTag.intValue(); 098 099 100 /** 101 * Message tag for acknowledgments. 102 */ 103 public static final int ackTag = GroebnerBaseDistributedHybridEC.ackTag.intValue(); 104 105 106 /** 107 * Constructor. 108 */ 109 public GroebnerBaseDistributedHybridMPJ() throws IOException { 110 this(DEFAULT_THREADS); 111 } 112 113 114 /** 115 * Constructor. 116 * @param threads number of threads to use. 117 */ 118 public GroebnerBaseDistributedHybridMPJ(int threads) throws IOException { 119 this(threads, new ThreadPool(threads)); 120 } 121 122 123 /** 124 * Constructor. 125 * @param threads number of threads to use. 126 * @param threadsPerNode threads per node to use. 127 */ 128 public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode) throws IOException { 129 this(threads, threadsPerNode, new ThreadPool(threads)); 130 } 131 132 133 /** 134 * Constructor. 135 * @param threads number of threads to use. 136 * @param pool ThreadPool to use. 137 */ 138 public GroebnerBaseDistributedHybridMPJ(int threads, ThreadPool pool) throws IOException { 139 this(threads, DEFAULT_THREADS_PER_NODE, pool); 140 } 141 142 143 /** 144 * Constructor. 145 * @param threads number of threads to use. 146 * @param threadsPerNode threads per node to use. 147 * @param pl pair selection strategy 148 */ 149 public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode, PairList<C> pl) 150 throws IOException { 151 this(threads, threadsPerNode, new ThreadPool(threads), pl); 152 } 153 154 155 /** 156 * Constructor. 157 * @param threads number of threads to use. 158 * @param threadsPerNode threads per node to use. 159 */ 160 public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode, ThreadPool pool) 161 throws IOException { 162 this(threads, threadsPerNode, pool, new OrderedPairlist<C>()); 163 } 164 165 166 /** 167 * Constructor. 168 * @param threads number of threads to use. 169 * @param threadsPerNode threads per node to use. 170 * @param pool ThreadPool to use. 171 * @param pl pair selection strategy 172 */ 173 public GroebnerBaseDistributedHybridMPJ(int threads, int threadsPerNode, ThreadPool pool, PairList<C> pl) 174 throws IOException { 175 super(new ReductionPar<C>(), pl); 176 this.engine = MPJEngine.getCommunicator(); 177 int size = engine.Size(); 178 if (size < 2) { 179 throw new IllegalArgumentException("Minimal 2 MPJ processes required, not " + size); 180 } 181 if (threads != size || pool.getNumber() != size) { 182 throw new IllegalArgumentException("threads != size: " + threads + " != " + size + ", #pool " 183 + pool.getNumber()); 184 } 185 this.threads = threads; 186 this.pool = pool; 187 this.threadsPerNode = threadsPerNode; 188 //logger.info("generated pool: " + pool); 189 } 190 191 192 /** 193 * Cleanup and terminate. 194 */ 195 @Override 196 public void terminate() { 197 if (pool == null) { 198 return; 199 } 200 //pool.terminate(); 201 pool.cancel(); 202 } 203 204 205 /** 206 * Distributed Groebner base. 207 * @param modv number of module variables. 208 * @param F polynomial list. 209 * @return GB(F) a Groebner base of F or null, if a IOException occurs or on 210 * MPJ client part. 211 */ 212 public List<GenPolynomial<C>> GB(int modv, List<GenPolynomial<C>> F) { 213 try { 214 if (engine.Rank() == 0) { 215 return GBmaster(modv, F); 216 } 217 } catch (IOException e) { 218 logger.info("GBmaster: " + e); 219 e.printStackTrace(); 220 return null; 221 } 222 pool.terminate(); // not used on clients 223 try { 224 clientPart(0); // only 0 225 } catch (IOException e) { 226 logger.info("clientPart: " + e); 227 e.printStackTrace(); 228 } 229 return null; 230 } 231 232 233 /** 234 * Distributed hybrid Groebner base. 235 * @param modv number of module variables. 236 * @param F polynomial list. 237 * @return GB(F) a Groebner base of F or null, if a IOException occurs. 238 */ 239 List<GenPolynomial<C>> GBmaster(int modv, List<GenPolynomial<C>> F) throws IOException { 240 long t = System.currentTimeMillis(); 241 242 List<GenPolynomial<C>> G = normalizeZerosOnes(F); 243 G = PolyUtil.<C> monic(G); 244 if (G.size() <= 1) { 245 //return G; 246 } 247 if (G.isEmpty()) { 248 throw new IllegalArgumentException("empty F / zero ideal not allowed"); 249 } 250 GenPolynomialRing<C> ring = G.get(0).ring; 251 if (!ring.coFac.isField()) { 252 throw new IllegalArgumentException("coefficients not from a field"); 253 } 254 PairList<C> pairlist = strategy.create(modv, ring); 255 pairlist.put(G); 256 257 /* 258 GenPolynomial<C> p; 259 List<GenPolynomial<C>> G = new ArrayList<GenPolynomial<C>>(); 260 PairList<C> pairlist = null; 261 boolean oneInGB = false; 262 int l = F.size(); 263 int unused = 0; 264 ListIterator<GenPolynomial<C>> it = F.listIterator(); 265 while (it.hasNext()) { 266 p = it.next(); 267 if (p.length() > 0) { 268 p = p.monic(); 269 if (p.isONE()) { 270 oneInGB = true; 271 G.clear(); 272 G.add(p); 273 //return G; must signal termination to others 274 } 275 if (!oneInGB) { 276 G.add(p); 277 } 278 if (pairlist == null) { 279 //pairlist = new OrderedPairlist<C>(modv, p.ring); 280 pairlist = strategy.create(modv, p.ring); 281 if (!p.ring.coFac.isField()) { 282 throw new IllegalArgumentException("coefficients not from a field"); 283 } 284 } 285 // theList not updated here 286 if (p.isONE()) { 287 unused = pairlist.putOne(); 288 } else { 289 unused = pairlist.put(p); 290 } 291 } else { 292 l--; 293 } 294 } 295 //if (l <= 1) { 296 //return G; must signal termination to others 297 //} 298 */ 299 logger.info("pairlist " + pairlist); 300 301 logger.debug("looking for clients"); 302 DistHashTableMPJ<Integer, GenPolynomial<C>> theList = new DistHashTableMPJ<Integer, GenPolynomial<C>>( 303 engine); 304 theList.init(); 305 306 List<GenPolynomial<C>> al = pairlist.getList(); 307 for (int i = 0; i < al.size(); i++) { 308 // no wait required 309 GenPolynomial<C> nn = theList.put(Integer.valueOf(i), al.get(i)); 310 if (nn != null) { 311 logger.info("double polynomials " + i + ", nn = " + nn + ", al(i) = " + al.get(i)); 312 } 313 } 314 315 Terminator finner = new Terminator((threads - 1) * threadsPerNode); 316 HybridReducerServerMPJ<C> R; 317 logger.info("using pool = " + pool); 318 for (int i = 1; i < threads; i++) { 319 MPJChannel chan = new MPJChannel(engine, i); // closed in server 320 R = new HybridReducerServerMPJ<C>(i, threadsPerNode, finner, chan, theList, pairlist); 321 pool.addJob(R); 322 //logger.info("server submitted " + R); 323 } 324 logger.info("main loop waiting " + finner); 325 finner.waitDone(); 326 int ps = theList.size(); 327 logger.info("#distributed list = " + ps); 328 // make sure all polynomials arrived: not needed in master 329 G = pairlist.getList(); 330 if (ps != G.size()) { 331 logger.info("#distributed list = " + theList.size() + " #pairlist list = " + G.size()); 332 } 333 for (GenPolynomial<C> q : theList.getValueList()) { 334 if (q != null && !q.isZERO()) { 335 logger.debug("final q = " + q.leadingExpVector()); 336 } 337 } 338 logger.debug("distributed list end"); 339 long time = System.currentTimeMillis(); 340 List<GenPolynomial<C>> Gp; 341 Gp = minimalGB(G); // not jet distributed but threaded 342 time = System.currentTimeMillis() - time; 343 logger.debug("parallel gbmi time = " + time); 344 G = Gp; 345 logger.info("server theList.terminate() " + theList.size()); 346 theList.terminate(); 347 t = System.currentTimeMillis() - t; 348 logger.info("server GB end, time = " + t + ", " + pairlist.toString()); 349 return G; 350 } 351 352 353 /** 354 * GB distributed client. 355 * @param rank of the MPJ where the server runs on. 356 * @throws IOException 357 */ 358 public void clientPart(int rank) throws IOException { 359 if (rank != 0) { 360 throw new UnsupportedOperationException("only master at rank 0 implemented: " + rank); 361 } 362 Comm engine = MPJEngine.getCommunicator(); 363 364 DistHashTableMPJ<Integer, GenPolynomial<C>> theList = new DistHashTableMPJ<Integer, GenPolynomial<C>>(); 365 theList.init(); 366 367 MPJChannel chan = new MPJChannel(engine, rank); 368 369 ThreadPool pool = new ThreadPool(threadsPerNode); 370 logger.info("client using pool = " + pool); 371 for (int i = 0; i < threadsPerNode; i++) { 372 HybridReducerClientMPJ<C> Rr = new HybridReducerClientMPJ<C>(chan, theList); // i 373 pool.addJob(Rr); 374 } 375 if (debug) { 376 logger.info("clients submitted"); 377 } 378 pool.terminate(); 379 logger.info("client pool.terminate()"); 380 381 chan.close(); 382 theList.terminate(); 383 return; 384 } 385 386 387 /** 388 * Minimal ordered groebner basis. 389 * @param Fp a Groebner base. 390 * @return a reduced Groebner base of Fp. 391 */ 392 @SuppressWarnings("unchecked") 393 @Override 394 public List<GenPolynomial<C>> minimalGB(List<GenPolynomial<C>> Fp) { 395 GenPolynomial<C> a; 396 ArrayList<GenPolynomial<C>> G; 397 G = new ArrayList<GenPolynomial<C>>(Fp.size()); 398 ListIterator<GenPolynomial<C>> it = Fp.listIterator(); 399 while (it.hasNext()) { 400 a = it.next(); 401 if (a.length() != 0) { // always true 402 // already monic a = a.monic(); 403 G.add(a); 404 } 405 } 406 if (G.size() <= 1) { 407 return G; 408 } 409 410 ExpVector e; 411 ExpVector f; 412 GenPolynomial<C> p; 413 ArrayList<GenPolynomial<C>> F; 414 F = new ArrayList<GenPolynomial<C>>(G.size()); 415 boolean mt; 416 417 while (G.size() > 0) { 418 a = G.remove(0); 419 e = a.leadingExpVector(); 420 421 it = G.listIterator(); 422 mt = false; 423 while (it.hasNext() && !mt) { 424 p = it.next(); 425 f = p.leadingExpVector(); 426 mt = e.multipleOf(f); 427 } 428 it = F.listIterator(); 429 while (it.hasNext() && !mt) { 430 p = it.next(); 431 f = p.leadingExpVector(); 432 mt = e.multipleOf(f); 433 } 434 if (!mt) { 435 F.add(a); 436 } else { 437 // System.out.println("dropped " + a.length()); 438 } 439 } 440 G = F; 441 if (G.size() <= 1) { 442 return G; 443 } 444 Collections.reverse(G); // important for lex GB 445 446 MiMPJReducerServer<C>[] mirs = (MiMPJReducerServer<C>[]) new MiMPJReducerServer[G.size()]; 447 int i = 0; 448 F = new ArrayList<GenPolynomial<C>>(G.size()); 449 while (G.size() > 0) { 450 a = G.remove(0); 451 // System.out.println("doing " + a.length()); 452 List<GenPolynomial<C>> R = new ArrayList<GenPolynomial<C>>(G.size() + F.size()); 453 R.addAll(G); 454 R.addAll(F); 455 mirs[i] = new MiMPJReducerServer<C>(R, a); 456 pool.addJob(mirs[i]); 457 i++; 458 F.add(a); 459 } 460 G = F; 461 F = new ArrayList<GenPolynomial<C>>(G.size()); 462 for (i = 0; i < mirs.length; i++) { 463 a = mirs[i].getNF(); 464 F.add(a); 465 } 466 return F; 467 } 468 469} 470 471 472/** 473 * Distributed server reducing worker proxy threads. 474 * @param <C> coefficient type 475 */ 476 477class HybridReducerServerMPJ<C extends RingElem<C>> implements Runnable { 478 479 480 private static final Logger logger = LogManager.getLogger(HybridReducerServerMPJ.class); 481 482 483 public final boolean debug = logger.isDebugEnabled(); 484 485 486 private final Terminator finner; 487 488 489 private final MPJChannel pairChannel; 490 491 492 //protected transient final Comm engine; 493 494 495 private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList; 496 497 498 private final PairList<C> pairlist; 499 500 501 private final int threadsPerNode; 502 503 504 final int rank; 505 506 507 /** 508 * Message tag for pairs. 509 */ 510 public static final int pairTag = GroebnerBaseDistributedHybridMPJ.pairTag; 511 512 513 /** 514 * Constructor. 515 * @param r MPJ rank of partner. 516 * @param tpn number of threads per node 517 * @param fin terminator 518 * @param chan MPJ channel 519 * @param dl distributed hash table 520 * @param L ordered pair list 521 */ 522 HybridReducerServerMPJ(int r, int tpn, Terminator fin, MPJChannel chan, 523 DistHashTableMPJ<Integer, GenPolynomial<C>> dl, PairList<C> L) { 524 rank = r; 525 threadsPerNode = tpn; 526 finner = fin; 527 this.pairChannel = chan; 528 theList = dl; 529 pairlist = L; 530 //logger.info("reducer server created " + this); 531 } 532 533 534 /** 535 * Work loop. 536 * @see java.lang.Runnable#run() 537 */ 538 @Override 539 @SuppressWarnings("unchecked") 540 public void run() { 541 //logger.info("reducer server running with " + engine); 542 // try { 543 // pairChannel = new MPJChannel(engine, rank); //,pairTag 544 // } catch (IOException e) { 545 // e.printStackTrace(); 546 // return; 547 // } 548 if (logger.isInfoEnabled()) { 549 logger.info("reducer server running: pairChannel = " + pairChannel); 550 } 551 // record idle remote workers (minus one?) 552 //finner.beIdle(threadsPerNode-1); 553 finner.initIdle(threadsPerNode); 554 AtomicInteger active = new AtomicInteger(0); 555 556 // start receiver 557 HybridReducerReceiverMPJ<C> receiver = new HybridReducerReceiverMPJ<C>(rank, finner, active, 558 pairChannel, theList, pairlist); 559 receiver.start(); 560 561 Pair<C> pair; 562 //boolean set = false; 563 boolean goon = true; 564 //int polIndex = -1; 565 int red = 0; 566 int sleeps = 0; 567 568 // while more requests 569 while (goon) { 570 // receive request if thread is reported incactive 571 logger.debug("receive request"); 572 Object req = null; 573 try { 574 req = pairChannel.receive(pairTag); 575 //} catch (InterruptedException e) { 576 //goon = false; 577 //e.printStackTrace(); 578 } catch (IOException e) { 579 goon = false; 580 e.printStackTrace(); 581 } catch (ClassNotFoundException e) { 582 goon = false; 583 e.printStackTrace(); 584 } 585 logger.debug("received request, req = " + req); 586 if (req == null) { 587 goon = false; 588 break; 589 } 590 if (!(req instanceof GBTransportMessReq)) { 591 goon = false; 592 break; 593 } 594 595 // find pair and manage termination status 596 logger.debug("find pair"); 597 while (!pairlist.hasNext()) { // wait 598 if (!finner.hasJobs() && !pairlist.hasNext()) { 599 goon = false; 600 break; 601 } 602 try { 603 sleeps++; 604 if (sleeps % 3 == 0) { 605 logger.info("waiting for reducers, remaining = " + finner.getJobs()); 606 } 607 Thread.sleep(100); 608 } catch (InterruptedException e) { 609 goon = false; 610 break; 611 } 612 } 613 if (!pairlist.hasNext() && !finner.hasJobs()) { 614 logger.info("termination detection: no pairs and no jobs left"); 615 goon = false; 616 break; //continue; //break? 617 } 618 finner.notIdle(); // before pairlist get!! 619 pair = pairlist.removeNext(); 620 // send pair to client, even if null 621 if (debug) { 622 logger.info("active count = " + active.get()); 623 logger.info("send pair = " + pair); 624 } 625 GBTransportMess msg = null; 626 if (pair != null) { 627 msg = new GBTransportMessPairIndex(pair); 628 } else { 629 msg = new GBTransportMess(); //not End(); at this time 630 // goon ?= false; 631 } 632 try { 633 red++; 634 pairChannel.send(pairTag, msg); 635 @SuppressWarnings("unused") 636 int a = active.getAndIncrement(); 637 } catch (IOException e) { 638 e.printStackTrace(); 639 goon = false; 640 break; 641 } 642 //logger.debug("#distributed list = " + theList.size()); 643 } 644 logger.info("terminated, send " + red + " reduction pairs"); 645 646 /* 647 * send end mark to clients 648 */ 649 logger.debug("send end"); 650 try { 651 for (int i = 0; i < threadsPerNode; i++) { // -1 652 pairChannel.send(pairTag, new GBTransportMessEnd()); 653 } 654 logger.info("sent end to clients"); 655 // send also end to receiver, no more 656 //pairChannel.send(resultTag, new GBTransportMessEnd(), engine.Rank()); 657 } catch (IOException e) { 658 if (logger.isDebugEnabled()) { 659 e.printStackTrace(); 660 } 661 } 662 int d = active.get(); 663 if (d > 0) { 664 logger.info("remaining active tasks = " + d); 665 } 666 receiver.terminate(); 667 //logger.info("terminated, send " + red + " reduction pairs"); 668 pairChannel.close(); 669 logger.info("redServ pairChannel.close()"); 670 finner.release(); 671 } 672} 673 674 675/** 676 * Distributed server receiving worker thread. 677 * @param <C> coefficient type 678 */ 679 680class HybridReducerReceiverMPJ<C extends RingElem<C>> extends Thread { 681 682 683 private static final Logger logger = LogManager.getLogger(HybridReducerReceiverMPJ.class); 684 685 686 public final boolean debug = logger.isDebugEnabled(); 687 688 689 private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList; 690 691 692 private final PairList<C> pairlist; 693 694 695 private final MPJChannel pairChannel; 696 697 698 final int rank; 699 700 701 private final Terminator finner; 702 703 704 //private final int threadsPerNode; 705 706 707 private final AtomicInteger active; 708 709 710 private volatile boolean goon; 711 712 713 /** 714 * Message tag for results. 715 */ 716 public static final int resultTag = GroebnerBaseDistributedHybridMPJ.resultTag; 717 718 719 /** 720 * Message tag for acknowledgments. 721 */ 722 public static final int ackTag = GroebnerBaseDistributedHybridMPJ.ackTag; 723 724 725 /** 726 * Constructor. 727 * @param r MPJ rank of partner. 728 * @param fin terminator 729 * @param a active remote tasks count 730 * @param pc tagged socket channel 731 * @param dl distributed hash table 732 * @param L ordered pair list 733 */ 734 HybridReducerReceiverMPJ(int r, Terminator fin, AtomicInteger a, MPJChannel pc, 735 DistHashTableMPJ<Integer, GenPolynomial<C>> dl, PairList<C> L) { 736 rank = r; 737 active = a; 738 //threadsPerNode = tpn; 739 finner = fin; 740 pairChannel = pc; 741 theList = dl; 742 pairlist = L; 743 goon = true; 744 //logger.info("reducer server created " + this); 745 } 746 747 748 /** 749 * Work loop. 750 * @see java.lang.Thread#run() 751 */ 752 @Override 753 @SuppressWarnings("unchecked") 754 public void run() { 755 //Pair<C> pair = null; 756 GenPolynomial<C> H = null; 757 int red = 0; 758 int polIndex = -1; 759 //Integer senderId; // obsolete 760 761 // while more requests 762 while (goon) { 763 // receive request 764 logger.debug("receive result"); 765 //senderId = null; 766 Object rh = null; 767 try { 768 rh = pairChannel.receive(resultTag); 769 @SuppressWarnings("unused") 770 int i = active.getAndDecrement(); 771 //} catch (InterruptedException e) { 772 //goon = false; 773 ////e.printStackTrace(); 774 ////?? finner.initIdle(1); 775 //break; 776 } catch (IOException e) { 777 e.printStackTrace(); 778 goon = false; 779 finner.initIdle(1); 780 break; 781 } catch (ClassNotFoundException e) { 782 e.printStackTrace(); 783 goon = false; 784 finner.initIdle(1); 785 break; 786 } 787 logger.info("received result"); 788 if (rh == null) { 789 if (this.isInterrupted()) { 790 goon = false; 791 finner.initIdle(1); 792 break; 793 } 794 //finner.initIdle(1); 795 } else if (rh instanceof GBTransportMessEnd) { // should only happen from server 796 logger.info("received GBTransportMessEnd"); 797 goon = false; 798 //?? finner.initIdle(1); 799 break; 800 } else if (rh instanceof GBTransportMessPoly) { 801 // update pair list 802 red++; 803 GBTransportMessPoly<C> mpi = (GBTransportMessPoly<C>) rh; 804 H = mpi.pol; 805 //senderId = mpi.threadId; 806 if (H != null) { 807 if (logger.isInfoEnabled()) { // debug 808 logger.info("H = " + H.leadingExpVector()); 809 } 810 if (!H.isZERO()) { 811 if (H.isONE()) { 812 polIndex = pairlist.putOne(); 813 //GenPolynomial<C> nn = 814 theList.putWait(Integer.valueOf(polIndex), H); 815 //goon = false; must wait for other clients 816 //finner.initIdle(1); 817 //break; 818 } else { 819 polIndex = pairlist.put(H); 820 // use putWait ? but still not all distributed 821 //GenPolynomial<C> nn = 822 theList.putWait(Integer.valueOf(polIndex), H); 823 } 824 } 825 } 826 } 827 // only after recording in pairlist ! 828 finner.initIdle(1); 829 try { 830 pairChannel.send(ackTag, new GBTransportMess()); 831 logger.debug("send acknowledgement"); 832 } catch (IOException e) { 833 e.printStackTrace(); 834 goon = false; 835 break; 836 } 837 } // end while 838 goon = false; 839 logger.info("terminated, received " + red + " reductions"); 840 } 841 842 843 /** 844 * Terminate. 845 */ 846 public void terminate() { 847 goon = false; 848 try { 849 this.join(); 850 //this.interrupt(); 851 } catch (InterruptedException e) { 852 // unfug Thread.currentThread().interrupt(); 853 } 854 logger.info("terminate end"); 855 } 856 857} 858 859 860/** 861 * Distributed clients reducing worker threads. 862 */ 863 864class HybridReducerClientMPJ<C extends RingElem<C>> implements Runnable { 865 866 867 private static final Logger logger = LogManager.getLogger(HybridReducerClientMPJ.class); 868 869 870 public final boolean debug = logger.isDebugEnabled(); 871 872 873 private final MPJChannel pairChannel; 874 875 876 private final DistHashTableMPJ<Integer, GenPolynomial<C>> theList; 877 878 879 private final ReductionPar<C> red; 880 881 882 //private final int threadsPerNode; 883 884 885 /* 886 * Identification number for this thread. 887 */ 888 //public final Integer threadId; // obsolete 889 890 891 /** 892 * Message tag for pairs. 893 */ 894 public static final int pairTag = GroebnerBaseDistributedHybridMPJ.pairTag; 895 896 897 /** 898 * Message tag for results. 899 */ 900 public static final int resultTag = GroebnerBaseDistributedHybridMPJ.resultTag; 901 902 903 /** 904 * Message tag for acknowledgments. 905 */ 906 public static final int ackTag = GroebnerBaseDistributedHybridMPJ.ackTag; 907 908 909 /** 910 * Constructor. 911 * @param tc tagged socket channel 912 * @param dl distributed hash table 913 */ 914 HybridReducerClientMPJ(MPJChannel tc, DistHashTableMPJ<Integer, GenPolynomial<C>> dl) { 915 //this.threadsPerNode = tpn; 916 pairChannel = tc; 917 //threadId = 100 + tid; // keep distinct from other tags 918 theList = dl; 919 red = new ReductionPar<C>(); 920 } 921 922 923 /** 924 * Work loop. 925 * @see java.lang.Runnable#run() 926 */ 927 @Override 928 @SuppressWarnings("unchecked") 929 public void run() { 930 if (debug) { 931 logger.info("pairChannel = " + pairChannel + " reducer client running"); 932 } 933 Pair<C> pair = null; 934 GenPolynomial<C> pi, pj, ps; 935 GenPolynomial<C> S; 936 GenPolynomial<C> H = null; 937 //boolean set = false; 938 boolean goon = true; 939 boolean doEnd = true; 940 int reduction = 0; 941 //int sleeps = 0; 942 Integer pix, pjx, psx; 943 944 while (goon) { 945 /* protocol: 946 * request pair, process pair, send result, receive acknowledgment 947 */ 948 // pair = (Pair) pairlist.removeNext(); 949 Object req = new GBTransportMessReq(); 950 logger.debug("send request = " + req); 951 try { 952 pairChannel.send(pairTag, req); 953 } catch (IOException e) { 954 goon = false; 955 if (debug) { 956 e.printStackTrace(); 957 } 958 logger.info("receive pair, exception "); 959 break; 960 } 961 logger.debug("receive pair, goon = " + goon); 962 doEnd = true; 963 Object pp = null; 964 try { 965 pp = pairChannel.receive(pairTag); 966 //} catch (InterruptedException e) { 967 //goon = false; 968 //e.printStackTrace(); 969 } catch (IOException e) { 970 goon = false; 971 if (debug) { 972 e.printStackTrace(); 973 } 974 break; 975 } catch (ClassNotFoundException e) { 976 goon = false; 977 e.printStackTrace(); 978 } 979 if (debug) { 980 logger.info("received pair = " + pp); 981 } 982 H = null; 983 if (pp == null) { // should not happen 984 continue; 985 } 986 if (pp instanceof GBTransportMessEnd) { 987 goon = false; 988 //doEnd = false; // bug 989 continue; 990 } 991 if (pp instanceof GBTransportMessPair || pp instanceof GBTransportMessPairIndex) { 992 pi = pj = ps = null; 993 if (pp instanceof GBTransportMessPair) { 994 pair = ((GBTransportMessPair<C>) pp).pair; 995 if (pair != null) { 996 pi = pair.pi; 997 pj = pair.pj; 998 //logger.debug("pair: pix = " + pair.i 999 // + ", pjx = " + pair.j); 1000 } 1001 } 1002 if (pp instanceof GBTransportMessPairIndex) { 1003 pix = ((GBTransportMessPairIndex) pp).i; 1004 pjx = ((GBTransportMessPairIndex) pp).j; 1005 psx = ((GBTransportMessPairIndex) pp).s; 1006 pi = theList.getWait(pix); 1007 pj = theList.getWait(pjx); 1008 ps = theList.getWait(psx); 1009 //logger.info("pix = " + pix + ", pjx = " +pjx + ", psx = " +psx); 1010 } 1011 1012 if (pi != null && pj != null) { 1013 S = red.SPolynomial(pi, pj); 1014 //System.out.println("S = " + S); 1015 logger.info("ht(S) = " + S.leadingExpVector()); 1016 if (S.isZERO()) { 1017 // pair.setZero(); does not work in dist 1018 H = S; 1019 } else { 1020 if (debug) { 1021 logger.debug("ht(S) = " + S.leadingExpVector()); 1022 } 1023 H = red.normalform(theList, S); 1024 reduction++; 1025 if (H.isZERO()) { 1026 // pair.setZero(); does not work in dist 1027 } else { 1028 H = H.monic(); 1029 if (logger.isInfoEnabled()) { 1030 logger.info("ht(H) = " + H.leadingExpVector()); 1031 } 1032 } 1033 } 1034 } else { 1035 logger.info("pi = " + pi + ", pj = " + pj + ", ps = " + ps); 1036 } 1037 } 1038 if (pp instanceof GBTransportMess) { 1039 logger.debug("null pair results in null H poly"); 1040 } 1041 1042 // send H or must send null, if not at end 1043 if (debug) { 1044 logger.debug("#distributed list = " + theList.size()); 1045 logger.debug("send H polynomial = " + H); 1046 } 1047 try { 1048 pairChannel.send(resultTag, new GBTransportMessPoly<C>(H)); //,threadId)); 1049 doEnd = false; 1050 } catch (IOException e) { 1051 goon = false; 1052 e.printStackTrace(); 1053 } 1054 logger.debug("done send poly message of " + pp); 1055 try { 1056 pp = pairChannel.receive(ackTag); 1057 //} catch (InterruptedException e) { 1058 //goon = false; 1059 //e.printStackTrace(); 1060 } catch (IOException e) { 1061 goon = false; 1062 if (debug) { 1063 e.printStackTrace(); 1064 } 1065 break; 1066 } catch (ClassNotFoundException e) { 1067 goon = false; 1068 e.printStackTrace(); 1069 } 1070 if (!(pp instanceof GBTransportMess)) { 1071 logger.error("invalid acknowledgement " + pp); 1072 } 1073 logger.debug("received acknowledgment " + pp); 1074 } 1075 logger.info("terminated, done " + reduction + " reductions"); 1076 if (doEnd) { 1077 try { 1078 pairChannel.send(resultTag, new GBTransportMessEnd()); 1079 } catch (IOException e) { 1080 //e.printStackTrace(); 1081 } 1082 logger.info("terminated, send done"); 1083 } 1084 } 1085}