001/*
002 * $Id$
003 */
004
005package edu.jas.kern;
006
007
008import java.io.IOException;
009import java.util.ArrayList;
010import java.util.Arrays;
011
012import mpi.Comm;
013import mpi.Intracomm;
014import mpi.MPI;
015import mpi.MPIException;
016import mpi.Status;
017
018import org.apache.logging.log4j.Logger;
019import org.apache.logging.log4j.LogManager;
020
021
022/**
023 * MPI engine, provides global MPI service. <b>Note:</b> could eventually be
024 * done directly with MPI, but provides logging. <b>Usage:</b> To obtain a
025 * reference to the MPI service communicator use
026 * <code>MPJEngine.getComminicator()</code>. Once an engine has been created it
027 * must be shutdown to exit JAS with <code>MPJEngine.terminate()</code>.
028 * @author Heinz Kredel
029 */
030
031public final class MPJEngine {
032
033
034    private static final Logger logger = LogManager.getLogger(MPJEngine.class);
035
036
037    private static final boolean debug = logger.isDebugEnabled();
038
039
040    /**
041     * Command line arguments. Required for MPI runtime system.
042     */
043    protected static String[] cmdline;
044
045
046    /**
047     * Hostnames of MPI partners.
048     */
049    public static ArrayList<String> hostNames = new ArrayList<String>();
050
051
052    /**
053     * Flag for MPI usage. <b>Note:</b> Only introduced because Google app
054     * engine does not support MPI.
055     */
056    public static boolean NO_MPI = false;
057
058
059    /**
060     * Number of processors.
061     */
062    public static final int N_CPUS = Runtime.getRuntime().availableProcessors();
063
064
065    /*
066     * Core number of threads.
067     * N_CPUS x 1.5, x 2, x 2.5, min 3, ?.
068     */
069    public static final int N_THREADS = (N_CPUS < 3 ? 3 : N_CPUS + N_CPUS / 2);
070
071
072    /**
073     * MPI communicator engine.
074     */
075    static Intracomm mpiComm;
076
077
078    /**
079     * MPI engine base tag number.
080     */
081    public static final int TAG = 11;
082
083
084    /**
085     * Hostname suffix.
086     */
087    public static final String hostSuf = "-ib";
088
089
090    // /*
091    //  * Send locks per tag.
092    //  */
093    // private static SortedMap<Integer,Object> sendLocks = new TreeMap<Integer,Object>();
094
095
096    // /*
097    //  * receive locks per tag.
098    //  */
099    // private static SortedMap<Integer,Object> recvLocks = new TreeMap<Integer,Object>();
100
101
102    /**
103     * No public constructor.
104     */
105    private MPJEngine() {
106    }
107
108
109    /**
110     * Set the commandline.
111     * @param args the command line to use for the MPI runtime system.
112     */
113    public static synchronized void setCommandLine(String[] args) {
114        cmdline = args;
115    }
116
117
118    /**
119     * Test if a pool is running.
120     * @return true if a thread pool has been started or is running, else false.
121     */
122    public static synchronized boolean isRunning() {
123        if (mpiComm == null) {
124            return false;
125        }
126        //if (MPI.Finalized()) { // FMPJ only
127        //    return false;
128        //}
129        return true;
130    }
131
132
133    /**
134     * Get the MPI communicator.
135     * @return a Communicator constructed for cmdline.
136     */
137    public static synchronized Comm getCommunicator() throws IOException {
138        if (cmdline == null) {
139            throw new IllegalArgumentException("command line not set");
140        }
141        return getCommunicator(cmdline);
142    }
143
144
145    /**
146     * Get the MPI communicator.
147     * @param args the command line to use for the MPI runtime system.
148     * @return a Communicator.
149     */
150    public static synchronized Comm getCommunicator(String[] args) throws IOException {
151        if (NO_MPI) {
152            return null;
153        }
154        if (mpiComm == null) {
155            //String[] args = new String[] { }; //"-np " + N_THREADS };
156            if (!MPI.Initialized()) {
157                if (args == null) {
158                    throw new IllegalArgumentException("command line is null");
159                }
160                cmdline = args;
161                args = MPI.Init(args);
162                //int tl = MPI.Init_thread(args,MPI.THREAD_MULTIPLE);
163                logger.info("MPI initialized on " + MPI.Get_processor_name());
164                //logger.info("thread level MPI.THREAD_MULTIPLE: " + MPI.THREAD_MULTIPLE 
165                //            + ", provided: " + tl);
166                if (debug) {
167                    logger.debug("remaining args: " + Arrays.toString(args));
168                }
169            }
170            mpiComm = MPI.COMM_WORLD;
171            int size = mpiComm.Size();
172            int rank = mpiComm.Rank();
173            logger.info("MPI size = " + size + ", rank = " + rank);
174            // maintain list of hostnames of partners
175            hostNames.ensureCapacity(size);
176            for (int i = 0; i < size; i++) {
177                hostNames.add("");
178            }
179            String myhost = MPI.Get_processor_name();
180            if (myhost.matches("\\An\\d*")) { // bwGRiD node names n010207
181                myhost += hostSuf;
182            }
183            if (myhost.matches("kredel.*")) {
184                myhost = "localhost";
185            }
186            hostNames.set(rank, myhost);
187            if (rank == 0) {
188                String[] va = new String[1];
189                va[0] = hostNames.get(0);
190                mpiComm.Bcast(va, 0, va.length, MPI.OBJECT, 0);
191                for (int i = 1; i < size; i++) {
192                    Status stat = mpiComm.Recv(va, 0, va.length, MPI.OBJECT, i, TAG);
193                    if (stat == null) {
194                        throw new IOException("no Status received");
195                        //throw new MPIException("no Status received");
196                    }
197                    int cnt = stat.Get_count(MPI.OBJECT);
198                    if (cnt == 0) {
199                        throw new IOException("no Object received");
200                        //throw new MPIException("no object received");
201                    }
202                    String v = va[0];
203                    hostNames.set(i, v);
204                }
205                logger.info("MPI partner host names = " + hostNames);
206            } else {
207                String[] va = new String[1];
208                mpiComm.Bcast(va, 0, va.length, MPI.OBJECT, 0);
209                hostNames.set(0, va[0]);
210                va[0] = hostNames.get(rank);
211                mpiComm.Send(va, 0, va.length, MPI.OBJECT, 0, TAG);
212            }
213        }
214        return mpiComm;
215    }
216
217
218    /**
219     * Stop execution.
220     */
221    public static synchronized void terminate() {
222        if (mpiComm == null) {
223            return;
224        }
225        //if (MPI.Finalized()) { // FMPJ only
226        //    return;
227        //}
228        try {
229            logger.info("terminating MPI on rank = " + mpiComm.Rank());
230            mpiComm = null;
231            MPI.Finalize();
232        } catch (MPIException e) {
233            e.printStackTrace();
234        }
235    }
236
237
238    /**
239     * Set no MPI usage.
240     */
241    public static synchronized void setNoMPI() {
242        NO_MPI = true;
243        terminate();
244    }
245
246
247    /**
248     * Set MPI usage.
249     */
250    public static synchronized void setMPI() {
251        NO_MPI = false;
252    }
253
254
255    // /*
256    //  * Get send lock per tag.
257    //  * @param tag message tag.
258    //  * @return a lock for sends.
259    //  */
260    // public static synchronized Object getSendLock(int tag) {
261    //     tag = 11; // one global lock
262    //     Object lock = sendLocks.get(tag);
263    //     if ( lock == null ) {
264    //         lock = new Object();
265    //         sendLocks.put(tag,lock);
266    //     }
267    //     return lock;
268    // }
269
270
271    // /*
272    //  * Get receive lock per tag.
273    //  * @param tag message tag.
274    //  * @return a lock for receives.
275    //  */
276    // public static synchronized Object getRecvLock(int tag) {
277    //     Object lock = recvLocks.get(tag);
278    //     if ( lock == null ) {
279    //         lock = new Object();
280    //         recvLocks.put(tag,lock);
281    //     }
282    //     return lock;
283    // }
284
285
286    // /*
287    //  * Wait for termination of a mpj Request.
288    //  * @param req a Request.
289    //  * @return a Status after termination of req.Wait().
290    //  */
291    // public static Status waitRequest(final Request req) {
292    //     if ( req == null ) {
293    //         throw new IllegalArgumentException("null request");
294    //     }
295    //     int delay = 10;
296    //     int delcnt = 0;
297    //     Status stat = null;
298    //     while (true) {
299    //         synchronized (MPJEngine.class) { // global static lock
300    //             stat = req.Get_status(); // should be non destructive, but is not
301    //             if ( stat != null ) {
302    //                 return req.Wait(); // should terminate immediately
303    //             }
304    //         }
305    //         try {
306    //             Thread.currentThread().sleep(delay); // varied a bit
307    //         } catch (InterruptedException e) {
308    //             logger.info("sleep interrupted");
309    //             e.printStackTrace();
310    //         }
311    //         delcnt++; 
312    //         if ( delcnt % 7 == 0 ) {
313    //             delay++;
314    //             System.out.println("delay(" + delay + "): " + Thread.currentThread().toString());
315    //         } 
316    //     }
317    // }
318
319}