diff --git a/core/src/main/java/dk/alexandra/fresco/framework/network/socket/Connector.java b/core/src/main/java/dk/alexandra/fresco/framework/network/socket/Connector.java index c0fafda10..bb30b1056 100644 --- a/core/src/main/java/dk/alexandra/fresco/framework/network/socket/Connector.java +++ b/core/src/main/java/dk/alexandra/fresco/framework/network/socket/Connector.java @@ -78,25 +78,18 @@ private Map connectNetwork(final NetworkConfiguration conf, // possible. For this purpose we use a CompletionService. try (ServerSocket server = serverFactory.createServerSocket(conf.getMe().getPort())) { + server.setSoTimeout((int) timeout.toMillis()); Future> clients = connectionExecutor.submit(() -> connectClient(conf)); Future> servers = connectionExecutor.submit(() -> connectServer(server, conf)); connectionExecutor.shutdown(); boolean termination = connectionExecutor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); if (!termination) { - throw new RuntimeException("Timed out waiting for client connections"); + throw new IOException("Timed out waiting for client connections"); } socketMap.putAll(servers.get()); socketMap.putAll(clients.get()); } catch (ExecutionException | IOException | InterruptedException e) { - for (Map.Entry entry : socketMap.entrySet()) { - // In case the first completed successfully but the other did not. - try { - entry.getValue().close(); - } - catch (IOException ignored){ - // ignore, we don't care that it is already closed. - } - } + closeSocketMap(socketMap); // These two exceptions are expected in this form for some tests, they might break things if they change (maybe). if (e instanceof ExecutionException || e instanceof IOException) { // HandshakeExceptions need to be explicit. @@ -123,6 +116,10 @@ private Map connectClient(final NetworkConfiguration conf) Map socketMap = new HashMap<>(conf.noOfParties() - conf.getMyId()); try { for (int i = conf.getMyId() + 1; i <= conf.noOfParties(); i++) { + if (Thread.interrupted()) { + closeSocketMap(socketMap); + break; + } // TODO: Split this up into N async tasks instead Party p = conf.getParty(i); boolean connectionMade = false; @@ -147,9 +144,7 @@ private Map connectClient(final NetworkConfiguration conf) } } catch (Exception e) { - for (Map.Entry entry : socketMap.entrySet()) { - entry.getValue().close(); - } + closeSocketMap(socketMap); throw e; } return socketMap; @@ -163,16 +158,20 @@ private Map connectServer(ServerSocket server, final NetworkCon if (conf.getMyId() > 1) { try { logger.info("P{}: bound at port {}", conf.getMyId(), conf.getMe().getPort()); - for (int i = 1; i < conf.getMyId(); i++) { - Socket sock = server.accept(); - int id = 0; - for (int j = 0; j < PARTY_ID_BYTES; j++) { - id ^= sock.getInputStream().read() << j * Byte.SIZE; + for (int i = 1; i < conf.getMyId(); i++) { + if (Thread.interrupted()) { + closeSocketMap(socketMap); + break; + } + Socket sock = server.accept(); + int id = 0; + for (int j = 0; j < PARTY_ID_BYTES; j++) { + id ^= sock.getInputStream().read() << j * Byte.SIZE; + } + socketMap.put(id, sock); + logger.info("P{}: accepted connection from P{}", conf.getMyId(), id); + socketMap.put(id, sock); } - socketMap.put(id, sock); - logger.info("P{}: accepted connection from P{}", conf.getMyId(), id); - socketMap.put(id, sock); - } } catch (IOException e) { logger.info(e.getMessage()); } @@ -180,4 +179,11 @@ private Map connectServer(ServerSocket server, final NetworkCon return socketMap; } + static void closeSocketMap(Map map) { + for (Socket s : map.values()) { + try { + s.close(); + } catch (IOException ignored) {} + } + } }