Skip to content

Commit

Permalink
Merge pull request #407 from aicis/sane-connections
Browse files Browse the repository at this point in the history
Close connections on Interrupt
  • Loading branch information
quackzar authored Feb 27, 2023
2 parents 8760b22 + 26457c6 commit 3467d79
Showing 1 changed file with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,18 @@ private Map<Integer, Socket> connectNetwork(final NetworkConfiguration conf,
// possible. For this purpose we use a CompletionService.

try (ServerSocket server = serverFactory.createServerSocket(conf.getMe().getPort())) {
server.setSoTimeout((int) timeout.toMillis());
Future<Map<Integer, Socket>> clients = connectionExecutor.submit(() -> connectClient(conf));
Future<Map<Integer, Socket>> servers = connectionExecutor.submit(() -> connectServer(server, conf));
connectionExecutor.shutdown();
boolean termination = connectionExecutor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!termination) {
throw new RuntimeException("Timed out waiting for client connections");
throw new IOException("Timed out waiting for client connections");
}
socketMap.putAll(servers.get());
socketMap.putAll(clients.get());
} catch (ExecutionException | IOException | InterruptedException e) {
for (Map.Entry<?, Socket> entry : socketMap.entrySet()) {
// In case the first completed successfully but the other did not.
try {
entry.getValue().close();
}
catch (IOException ignored){
// ignore, we don't care that it is already closed.
}
}
closeSocketMap(socketMap);
// These two exceptions are expected in this form for some tests, they might break things if they change (maybe).
if (e instanceof ExecutionException || e instanceof IOException) {
// HandshakeExceptions need to be explicit.
Expand All @@ -123,6 +116,10 @@ private Map<Integer, Socket> connectClient(final NetworkConfiguration conf)
Map<Integer, Socket> socketMap = new HashMap<>(conf.noOfParties() - conf.getMyId());
try {
for (int i = conf.getMyId() + 1; i <= conf.noOfParties(); i++) {
if (Thread.interrupted()) {
closeSocketMap(socketMap);
break;
}
// TODO: Split this up into N async tasks instead
Party p = conf.getParty(i);
boolean connectionMade = false;
Expand All @@ -147,9 +144,7 @@ private Map<Integer, Socket> connectClient(final NetworkConfiguration conf)
}
}
catch (Exception e) {
for (Map.Entry<?, Socket> entry : socketMap.entrySet()) {
entry.getValue().close();
}
closeSocketMap(socketMap);
throw e;
}
return socketMap;
Expand All @@ -163,21 +158,32 @@ private Map<Integer, Socket> connectServer(ServerSocket server, final NetworkCon
if (conf.getMyId() > 1) {
try {
logger.info("P{}: bound at port {}", conf.getMyId(), conf.getMe().getPort());
for (int i = 1; i < conf.getMyId(); i++) {
Socket sock = server.accept();
int id = 0;
for (int j = 0; j < PARTY_ID_BYTES; j++) {
id ^= sock.getInputStream().read() << j * Byte.SIZE;
for (int i = 1; i < conf.getMyId(); i++) {
if (Thread.interrupted()) {
closeSocketMap(socketMap);
break;
}
Socket sock = server.accept();
int id = 0;
for (int j = 0; j < PARTY_ID_BYTES; j++) {
id ^= sock.getInputStream().read() << j * Byte.SIZE;
}
socketMap.put(id, sock);
logger.info("P{}: accepted connection from P{}", conf.getMyId(), id);
socketMap.put(id, sock);
}
socketMap.put(id, sock);
logger.info("P{}: accepted connection from P{}", conf.getMyId(), id);
socketMap.put(id, sock);
}
} catch (IOException e) {
logger.info(e.getMessage());
}
}
return socketMap;
}

static void closeSocketMap(Map<Integer, Socket> map) {
for (Socket s : map.values()) {
try {
s.close();
} catch (IOException ignored) {}
}
}
}

0 comments on commit 3467d79

Please sign in to comment.