001/* 002 * $Id$ 003 */ 004 005package edu.jas.util; 006 007 008import java.io.BufferedReader; 009import java.io.FileNotFoundException; 010import java.io.FileInputStream; 011import java.io.InputStreamReader; 012import java.io.IOException; 013import java.nio.charset.Charset; 014import java.util.ArrayList; 015import java.util.List; 016 017import org.apache.logging.log4j.Logger; 018import org.apache.logging.log4j.LogManager; 019 020 021// import edu.unima.ky.parallel.ChannelFactory; 022// import edu.unima.ky.parallel.SocketChannel; 023 024/** 025 * ExecutableChannels used to receive and execute classes. 026 * @author Heinz Kredel 027 */ 028 029 030public class ExecutableChannels { 031 032 033 private static final Logger logger = LogManager.getLogger(ExecutableChannels.class); 034 035 036 /** 037 * default port. 038 */ 039 protected final static int DEFAULT_PORT = 7114; //ChannelFactory.DEFAULT_PORT; 040 041 042 /** 043 * default machine file. 044 */ 045 protected final static String DEFAULT_MFILE = "examples/machines.test"; 046 047 048 protected final ChannelFactory cf; 049 050 051 protected SocketChannel[] channels = null; 052 053 054 protected String[] servers = null; 055 056 057 protected int[] ports = null; 058 059 060 /** 061 * Internal constructor. 062 */ 063 protected ExecutableChannels() { 064 cf = new ChannelFactory(); 065 cf.init(); 066 } 067 068 069 /** 070 * Constructor from array of server:port strings. 071 * @param srvs A String array. 072 */ 073 public ExecutableChannels(String[] srvs) { 074 this(); 075 if (srvs == null) { 076 return; 077 } 078 servers = new String[srvs.length]; 079 ports = new int[srvs.length]; 080 for (int i = 0; i < srvs.length; i++) { 081 setServerPort(i, srvs[i]); 082 } 083 } 084 085 086 /** 087 * Constructor from machine file. 088 * @param mfile 089 * @throws FileNotFoundException. 090 */ 091 public ExecutableChannels(String mfile) throws FileNotFoundException { 092 this(); 093 if (mfile == null || mfile.length() == 0) { 094 mfile = DEFAULT_MFILE; 095 } 096 InputStreamReader isr = new InputStreamReader(new FileInputStream(mfile),Charset.forName("UTF8")); 097 BufferedReader in = new BufferedReader(isr); 098 String line = null; 099 List<String> list = new ArrayList<String>(); 100 int x; 101 try { 102 while (true) { 103 if (!in.ready()) { 104 break; 105 } 106 line = in.readLine(); 107 if (line == null) { 108 break; 109 } 110 x = line.indexOf("#"); 111 if (x >= 0) { 112 line = line.substring(0, x); 113 } 114 line = line.trim(); 115 if (line.length() == 0) { 116 continue; 117 } 118 list.add(line); 119 } 120 } catch (IOException ignored) { 121 } finally { 122 try { 123 in.close(); 124 isr.close(); 125 } catch (IOException ignore) { 126 } 127 } 128 logger.debug("list.size() in {} = {}", mfile, list.size()); 129 if (list.size() == 0) { 130 return; 131 } 132 servers = new String[list.size()]; 133 ports = new int[list.size()]; 134 for (int i = 0; i < servers.length; i++) { 135 setServerPort(i, list.get(i)); 136 } 137 } 138 139 140 /* 141 * internal method 142 */ 143 protected void setServerPort(int i, String srv) { 144 int x = srv.indexOf(":"); 145 ports[i] = DEFAULT_PORT; 146 if (x < 0) { 147 servers[i] = srv; 148 } else { 149 servers[i] = srv.substring(0, x); 150 String p = srv.substring(x + 1, srv.length()); 151 try { 152 ports[i] = Integer.parseInt(p); 153 } catch (NumberFormatException ignored) { 154 } 155 } 156 } 157 158 159 /** 160 * String representation. 161 */ 162 @Override 163 public String toString() { 164 StringBuffer s = new StringBuffer("ExecutableChannels("); 165 if (servers != null) { 166 for (int i = 0; i < servers.length; i++) { 167 s.append(servers[i] + ":" + ports[i]); 168 if (i < servers.length - 1) { 169 s.append(" "); 170 } 171 } 172 } 173 if (channels != null) { 174 s.append(" channels = "); 175 for (int i = 0; i < channels.length; i++) { 176 s.append(channels[i]); 177 if (i < channels.length - 1) { 178 s.append(" "); 179 } 180 } 181 } 182 s.append(")"); 183 return s.toString(); 184 } 185 186 187 /** 188 * number of servers. 189 */ 190 public int numServers() { 191 if (servers != null) { 192 return servers.length; 193 } 194 return -1; 195 } 196 197 198 /** 199 * get master host. 200 */ 201 public String getMasterHost() { 202 if (servers != null && servers.length > 0) { 203 return servers[0]; 204 } 205 return null; 206 } 207 208 209 /** 210 * get master port. 211 */ 212 public int getMasterPort() { 213 if (ports != null && ports.length > 0) { 214 return ports[0]; 215 } 216 return 0; 217 } 218 219 220 /** 221 * number of channels. 222 */ 223 public int numChannels() { 224 if (channels != null) { 225 return channels.length; 226 } 227 return -1; 228 } 229 230 231 /** 232 * open, setup of SocketChannels. 233 * @throws IOException. 234 */ 235 public void open() throws IOException { 236 logger.debug("opening {} channels", servers.length); 237 if (servers.length <= 1) { 238 throw new IOException("to few servers"); 239 } 240 channels = new SocketChannel[servers.length - 1]; 241 for (int i = 1; i < servers.length; i++) { 242 channels[i - 1] = cf.getChannel(servers[i], ports[i]); 243 } 244 } 245 246 247 /** 248 * open, setup of SocketChannels. If nc > servers.length open in round 249 * robin fashion. 250 * @param nc number of channels to open. 251 * @throws IOException. 252 */ 253 public void open(int nc) throws IOException { 254 logger.debug("opening {} channels", nc); 255 if (servers.length <= 1) { 256 throw new IOException("to few servers"); 257 } 258 channels = new SocketChannel[nc]; 259 int j = 1; // 0 is master 260 for (int i = 0; i < channels.length; i++) { 261 if (j >= servers.length) { // modulo #servers 262 j = 1; 263 } 264 channels[i] = cf.getChannel(servers[j], ports[j]); 265 j++; 266 } 267 } 268 269 270 /** 271 * close all channels and ChannelFactory. 272 */ 273 public void close() { 274 logger.debug("closing ExecutableChannels"); 275 if (cf != null) { 276 cf.terminate(); 277 } 278 if (channels != null) { 279 for (int i = 0; i < channels.length; i++) { 280 if (channels[i] != null) { 281 try { 282 channels[i].send(ExecutableServer.STOP); 283 } catch (IOException e) { 284 if (logger.isDebugEnabled()) { 285 e.printStackTrace(); 286 } 287 } finally { 288 channels[i].close(); 289 } 290 channels[i] = null; 291 } 292 } 293 channels = null; 294 } 295 logger.debug("ExecuteChannels closed"); 296 } 297 298 299 /** 300 * getChannel. 301 * @param i channel number. 302 */ 303 public SocketChannel getChannel(int i) { 304 if (channels != null && 0 <= i && i < channels.length) { 305 return channels[i]; 306 } 307 return null; 308 } 309 310 311 /** 312 * getChannels. 313 */ 314 /*package*/SocketChannel[] getChannels() { 315 return channels; 316 } 317 318 319 /** 320 * send on channel i. 321 * @param i channel number. 322 * @param o object to send. 323 */ 324 public void send(int i, Object o) throws IOException { 325 if (channels != null && 0 <= i && i < channels.length) { 326 channels[i].send(o); 327 } 328 } 329 330 331 /** 332 * receive on channel i. 333 * @param i channel number. 334 * @return object received. 335 */ 336 public Object receive(int i) throws IOException, ClassNotFoundException { 337 if (channels != null && 0 <= i && i < channels.length) { 338 return channels[i].receive(); 339 } 340 return null; 341 342 } 343 344}