import java.io.*; import java.net.*; import java.util.Scanner; /** * This class is part of a demonstration of distributed computing. * It is to be used with CLMandelbrotTask.java on the worker computer. * CLMandelbrotWorker should be run as a command-line program on each * worker computer involved in the distributed computation. When it is * running, it listens for a connection from the master computer. The * listening port number can be specified on the command line; if none is * specified, a default port number is used. It is possible to run several * copies of this program on the same computer, as long as they listen * on different ports. * * After receiving a connection request from the master program, this * program expects to receive a sequence of tasks (of type CLMandelbrotTask) * from the master program. It computes each task that it receives and * sends the results back to the master. The process ends when this * program receives a CLOSE_CONNECTION_COMMAND from the master program. * It then goes back to listening for another connection. (The connection * can also be terminated by a SHUT_DOWN_COMMAND. When this command is * received, this program shuts down. However, this feature is not currently * used. Since graceful shutdown is not implemented, you can stop the * worker program using CONTROL-C.) * * Note that data sent over the network is encoded as text. The first * word on a line of text identifies the type of data. */ public class CLMandelbrotWorker { /** * Default listening port, if none is specified on the command line. */ private static final int DEFAULT_PORT = 13572; /** * The first and only word on a message representing a close command. */ private static final String CLOSE_CONNECTION_COMMAND = "close"; /** * The first and only word on a message representing a shutdown command. */ private static final String SHUT_DOWN_COMMAND = "shutdown"; /** * The first word on a message representing a CLMandelbrotTask. This * is followed by the incoming data (id, maxIterations, y, xmin, dx, and * count) for the task. Items on the line are separated by spaces. */ private static final String TASK_COMMAND = "task"; /** * The first word on a message representing the results from a * CLMandelbrotTask. This is followed by the task id, the number * of items in the results, and then the results. Items on the * line are separated by spaces. */ private static final String RESULT_COMMAND = "result"; private static boolean shutdownCommandReceived; /** * The main program listens for connections from the master program * and does all the communication over the connection. Note that this * worker program does not use threads (except for the thread in which * the main program runs). */ public static void main(String[] args) { /* Get the port number from the command line, if present. */ int port = DEFAULT_PORT; if (args.length > 0) { try { port = Integer.parseInt(args[0]); if (port < 0 || port > 65535) throw new NumberFormatException(); } catch (NumberFormatException e) { port = DEFAULT_PORT; } } System.out.println("Starting with listening port number " + port); while (shutdownCommandReceived == false) { /* Listen for a connection request from the master program. */ ServerSocket listener = null; try { listener = new ServerSocket(port); } catch (Exception e) { System.out.println("ERROR: Can't create listening socket on port " + port); System.exit(1); } /* Process the connection. Note that the listener socket is closed as long as the connection remains open, since this program can only deal with one connection at a time. A new listener is created after the connection closes. */ try { Socket connection = listener.accept(); listener.close(); System.out.println("Accepted connection from " + connection.getInetAddress()); handleConnection(connection); } catch (Exception e) { System.out.println("ERROR: Server shut down with error:"); System.out.println(e); System.exit(2); } } System.out.println("Shutting down normally."); } // end main() /** * Decode a message that was received from the server and that represents * a CLMandelbrotTask. * @param taskData the message that represents the task. It is already known * that the first word of the message is the TASK_COMMAND. * @return the task represented by this message. * @throws IOException if the data in the message is not in the correct form. */ private static CLMandelbrotTask readTask(String taskData) throws IOException { try { Scanner scanner = new Scanner(taskData); CLMandelbrotTask task = new CLMandelbrotTask(); scanner.next(); // skip the command at the start of the line. task.id = scanner.nextInt(); task.maxIterations = scanner.nextInt(); task.y = scanner.nextDouble(); task.xmin = scanner.nextDouble(); task.dx = scanner.nextDouble(); task.count = scanner.nextInt(); return task; } catch (Exception e) { throw new IOException("Illegal data found while reading task information."); } } /** * Encode the result of a task into String form, so that it can be sent * as a message back to the master program. * @param task the task to be encoded. Its compute() method has already * been executed. */ private static String writeResults(CLMandelbrotTask task) { StringBuffer buffer = new StringBuffer(); buffer.append(RESULT_COMMAND); buffer.append(' '); buffer.append(task.id); buffer.append(' '); buffer.append(task.count); for (int i = 0; i < task.count; i++) { buffer.append(' '); buffer.append(task.results[i]); } return buffer.toString(); } /** * Handle communication over a connection to the master program. Accept and * process CLMandelbrotTasks until a close or shutdown message is received * (or an error occurs). * @param connection an already-connected socket for the connection. */ private static void handleConnection(Socket connection) { try { BufferedReader in = new BufferedReader( new InputStreamReader( connection.getInputStream()) ); PrintWriter out = new PrintWriter(connection.getOutputStream()); while (true) { String line = in.readLine(); // Message from the master. if (line == null) { // End-of-stream encountered -- should not happen. throw new Exception("Connection closed unexpectedly."); } if (line.startsWith(CLOSE_CONNECTION_COMMAND)) { // Represents the normal termination of the connection. System.out.println("Received close command."); break; } else if (line.startsWith(SHUT_DOWN_COMMAND)) { // Represents the normal termination of the connection // and also tells this worker to shut down. System.out.println("Received shutdown command."); shutdownCommandReceived = true; break; } else if (line.startsWith(TASK_COMMAND)) { // Represents a CLMandelbrotTask that this worker is // supposed to perform. CLMandelbrotTask task = readTask(line); // Decode the message. task.compute(); // Perform the task. out.println(writeResults(task)); // Send back the results. out.flush(); } else { // No other messages are part of the protocol. throw new Exception("Illegal copmmand received."); } } } catch (Exception e) { System.out.println("Client connection closed with error " + e); } finally { try { connection.close(); // Make sure the socket is closed. } catch (Exception e) { } } } }