Skip to content

Commit

Permalink
More aggressively close connections on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed Feb 27, 2023
1 parent 06f9578 commit 91326a1
Showing 1 changed file with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ private Map<Integer, Socket> connectNetwork(final NetworkConfiguration conf,
if (!termination) {
throw new RuntimeException("Timed out waiting for client connections");
}
socketMap.putAll(clients.get());
socketMap.putAll(servers.get());
socketMap.putAll(clients.get());
} catch (ExecutionException | IOException | InterruptedException e) {
for (Map.Entry<?, Socket> entry : socketMap.entrySet()) {
// In case the first completed successfully but the other did not.
try {
entry.getValue().close();
}
Expand Down Expand Up @@ -120,29 +121,37 @@ private Map<Integer, Socket> connectNetwork(final NetworkConfiguration conf,
private Map<Integer, Socket> connectClient(final NetworkConfiguration conf)
throws InterruptedException, IOException {
Map<Integer, Socket> socketMap = new HashMap<>(conf.noOfParties() - conf.getMyId());
for (int i = conf.getMyId() + 1; i <= conf.noOfParties(); i++) {
// TODO: Split this up into N async tasks instead
Party p = conf.getParty(i);
boolean connectionMade = false;
int attempts = 0;
while (!connectionMade) {
try {
Socket sock = socketFactory.createSocket(p.getHostname(), p.getPort());
for (int j = 0; j < PARTY_ID_BYTES; j++) {
byte b = (byte) (conf.getMyId() >>> j * Byte.SIZE);
sock.getOutputStream().write(b);
try {
for (int i = conf.getMyId() + 1; i <= conf.noOfParties(); i++) {
// TODO: Split this up into N async tasks instead
Party p = conf.getParty(i);
boolean connectionMade = false;
int attempts = 0;
while (!connectionMade) {
try {
Socket sock = socketFactory.createSocket(p.getHostname(), p.getPort());
for (int j = 0; j < PARTY_ID_BYTES; j++) {
byte b = (byte) (conf.getMyId() >>> j * Byte.SIZE);
sock.getOutputStream().write(b);
}
connectionMade = true;
socketMap.put(i, sock);
logger.info("P{}: connected to {}", conf.getMyId(), p);
} catch (ConnectException e) {
// A ConnectionException is expected if the opposing side is not listening for our
// connection attempt yet. We ignore this and try again.
Thread.sleep(1L << ++attempts);
// This should probably not busy-wait for each party
}
connectionMade = true;
socketMap.put(i, sock);
logger.info("P{}: connected to {}", conf.getMyId(), p);
} catch (ConnectException e) {
// A ConnectionException is expected if the opposing side is not listening for our
// connection attempt yet. We ignore this and try again.
Thread.sleep(1L << ++attempts);
// This should probably not busy-wait for each party
}
}
}
catch (Exception e) {
for (Map.Entry<?, Socket> entry : socketMap.entrySet()) {
entry.getValue().close();
}
throw e;
}
return socketMap;
}

Expand Down

0 comments on commit 91326a1

Please sign in to comment.