001/* 002 * $Id$ 003 */ 004 005package edu.jas.kern; 006 007 008import java.io.IOException; 009import java.util.ArrayList; 010import java.util.Arrays; 011 012import mpi.Comm; 013import mpi.Intracomm; 014import mpi.MPI; 015import mpi.MPIException; 016import mpi.Status; 017 018import org.apache.logging.log4j.Logger; 019import org.apache.logging.log4j.LogManager; 020 021 022/** 023 * MPI engine, provides global MPI service. <b>Note:</b> could eventually be 024 * done directly with MPI, but provides logging. <b>Usage:</b> To obtain a 025 * reference to the MPI service communicator use 026 * <code>MPJEngine.getComminicator()</code>. Once an engine has been created it 027 * must be shutdown to exit JAS with <code>MPJEngine.terminate()</code>. 028 * @author Heinz Kredel 029 */ 030 031public final class MPJEngine { 032 033 034 private static final Logger logger = LogManager.getLogger(MPJEngine.class); 035 036 037 private static final boolean debug = logger.isDebugEnabled(); 038 039 040 /** 041 * Command line arguments. Required for MPI runtime system. 042 */ 043 protected static String[] cmdline; 044 045 046 /** 047 * Hostnames of MPI partners. 048 */ 049 public static ArrayList<String> hostNames = new ArrayList<String>(); 050 051 052 /** 053 * Flag for MPI usage. <b>Note:</b> Only introduced because Google app 054 * engine does not support MPI. 055 */ 056 public static boolean NO_MPI = false; 057 058 059 /** 060 * Number of processors. 061 */ 062 public static final int N_CPUS = Runtime.getRuntime().availableProcessors(); 063 064 065 /* 066 * Core number of threads. 067 * N_CPUS x 1.5, x 2, x 2.5, min 3, ?. 068 */ 069 public static final int N_THREADS = (N_CPUS < 3 ? 3 : N_CPUS + N_CPUS / 2); 070 071 072 /** 073 * MPI communicator engine. 074 */ 075 static Intracomm mpiComm; 076 077 078 /** 079 * MPI engine base tag number. 080 */ 081 public static final int TAG = 11; 082 083 084 /** 085 * Hostname suffix. 086 */ 087 public static final String hostSuf = "-ib"; 088 089 090 // /* 091 // * Send locks per tag. 092 // */ 093 // private static SortedMap<Integer,Object> sendLocks = new TreeMap<Integer,Object>(); 094 095 096 // /* 097 // * receive locks per tag. 098 // */ 099 // private static SortedMap<Integer,Object> recvLocks = new TreeMap<Integer,Object>(); 100 101 102 /** 103 * No public constructor. 104 */ 105 private MPJEngine() { 106 } 107 108 109 /** 110 * Set the commandline. 111 * @param args the command line to use for the MPI runtime system. 112 */ 113 public static synchronized void setCommandLine(String[] args) { 114 cmdline = args; 115 } 116 117 118 /** 119 * Test if a pool is running. 120 * @return true if a thread pool has been started or is running, else false. 121 */ 122 public static synchronized boolean isRunning() { 123 if (mpiComm == null) { 124 return false; 125 } 126 //if (MPI.Finalized()) { // FMPJ only 127 // return false; 128 //} 129 return true; 130 } 131 132 133 /** 134 * Get the MPI communicator. 135 * @return a Communicator constructed for cmdline. 136 */ 137 public static synchronized Comm getCommunicator() throws IOException { 138 if (cmdline == null) { 139 throw new IllegalArgumentException("command line not set"); 140 } 141 return getCommunicator(cmdline); 142 } 143 144 145 /** 146 * Get the MPI communicator. 147 * @param args the command line to use for the MPI runtime system. 148 * @return a Communicator. 149 */ 150 public static synchronized Comm getCommunicator(String[] args) throws IOException { 151 if (NO_MPI) { 152 return null; 153 } 154 if (mpiComm == null) { 155 //String[] args = new String[] { }; //"-np " + N_THREADS }; 156 if (!MPI.Initialized()) { 157 if (args == null) { 158 throw new IllegalArgumentException("command line is null"); 159 } 160 cmdline = args; 161 args = MPI.Init(args); 162 //int tl = MPI.Init_thread(args,MPI.THREAD_MULTIPLE); 163 logger.info("MPI initialized on " + MPI.Get_processor_name()); 164 //logger.info("thread level MPI.THREAD_MULTIPLE: " + MPI.THREAD_MULTIPLE 165 // + ", provided: " + tl); 166 if (debug) { 167 logger.debug("remaining args: " + Arrays.toString(args)); 168 } 169 } 170 mpiComm = MPI.COMM_WORLD; 171 int size = mpiComm.Size(); 172 int rank = mpiComm.Rank(); 173 logger.info("MPI size = " + size + ", rank = " + rank); 174 // maintain list of hostnames of partners 175 hostNames.ensureCapacity(size); 176 for (int i = 0; i < size; i++) { 177 hostNames.add(""); 178 } 179 String myhost = MPI.Get_processor_name(); 180 if (myhost.matches("\\An\\d*")) { // bwGRiD node names n010207 181 myhost += hostSuf; 182 } 183 if (myhost.matches("kredel.*")) { 184 myhost = "localhost"; 185 } 186 hostNames.set(rank, myhost); 187 if (rank == 0) { 188 String[] va = new String[1]; 189 va[0] = hostNames.get(0); 190 mpiComm.Bcast(va, 0, va.length, MPI.OBJECT, 0); 191 for (int i = 1; i < size; i++) { 192 Status stat = mpiComm.Recv(va, 0, va.length, MPI.OBJECT, i, TAG); 193 if (stat == null) { 194 throw new IOException("no Status received"); 195 //throw new MPIException("no Status received"); 196 } 197 int cnt = stat.Get_count(MPI.OBJECT); 198 if (cnt == 0) { 199 throw new IOException("no Object received"); 200 //throw new MPIException("no object received"); 201 } 202 String v = va[0]; 203 hostNames.set(i, v); 204 } 205 logger.info("MPI partner host names = " + hostNames); 206 } else { 207 String[] va = new String[1]; 208 mpiComm.Bcast(va, 0, va.length, MPI.OBJECT, 0); 209 hostNames.set(0, va[0]); 210 va[0] = hostNames.get(rank); 211 mpiComm.Send(va, 0, va.length, MPI.OBJECT, 0, TAG); 212 } 213 } 214 return mpiComm; 215 } 216 217 218 /** 219 * Stop execution. 220 */ 221 public static synchronized void terminate() { 222 if (mpiComm == null) { 223 return; 224 } 225 //if (MPI.Finalized()) { // FMPJ only 226 // return; 227 //} 228 try { 229 logger.info("terminating MPI on rank = " + mpiComm.Rank()); 230 mpiComm = null; 231 MPI.Finalize(); 232 } catch (MPIException e) { 233 e.printStackTrace(); 234 } 235 } 236 237 238 /** 239 * Set no MPI usage. 240 */ 241 public static synchronized void setNoMPI() { 242 NO_MPI = true; 243 terminate(); 244 } 245 246 247 /** 248 * Set MPI usage. 249 */ 250 public static synchronized void setMPI() { 251 NO_MPI = false; 252 } 253 254 255 // /* 256 // * Get send lock per tag. 257 // * @param tag message tag. 258 // * @return a lock for sends. 259 // */ 260 // public static synchronized Object getSendLock(int tag) { 261 // tag = 11; // one global lock 262 // Object lock = sendLocks.get(tag); 263 // if ( lock == null ) { 264 // lock = new Object(); 265 // sendLocks.put(tag,lock); 266 // } 267 // return lock; 268 // } 269 270 271 // /* 272 // * Get receive lock per tag. 273 // * @param tag message tag. 274 // * @return a lock for receives. 275 // */ 276 // public static synchronized Object getRecvLock(int tag) { 277 // Object lock = recvLocks.get(tag); 278 // if ( lock == null ) { 279 // lock = new Object(); 280 // recvLocks.put(tag,lock); 281 // } 282 // return lock; 283 // } 284 285 286 // /* 287 // * Wait for termination of a mpj Request. 288 // * @param req a Request. 289 // * @return a Status after termination of req.Wait(). 290 // */ 291 // public static Status waitRequest(final Request req) { 292 // if ( req == null ) { 293 // throw new IllegalArgumentException("null request"); 294 // } 295 // int delay = 10; 296 // int delcnt = 0; 297 // Status stat = null; 298 // while (true) { 299 // synchronized (MPJEngine.class) { // global static lock 300 // stat = req.Get_status(); // should be non destructive, but is not 301 // if ( stat != null ) { 302 // return req.Wait(); // should terminate immediately 303 // } 304 // } 305 // try { 306 // Thread.currentThread().sleep(delay); // varied a bit 307 // } catch (InterruptedException e) { 308 // logger.info("sleep interrupted"); 309 // e.printStackTrace(); 310 // } 311 // delcnt++; 312 // if ( delcnt % 7 == 0 ) { 313 // delay++; 314 // System.out.println("delay(" + delay + "): " + Thread.currentThread().toString()); 315 // } 316 // } 317 // } 318 319}