Skip to content

Commit

Permalink
Merge pull request #406 from aicis/sane-connections
Browse files Browse the repository at this point in the history
Improve connection handling on Exceptions
  • Loading branch information
quackzar authored Feb 27, 2023
2 parents 859f17a + 8b9ddb2 commit 8760b22
Show file tree
Hide file tree
Showing 30 changed files with 135 additions and 121 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>master-pom</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<!-- ================ FRESCO BUILD ========================== -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,20 @@

import dk.alexandra.fresco.framework.Party;
import dk.alexandra.fresco.framework.configuration.NetworkConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;

public class Connector implements NetworkConnector {

Expand Down Expand Up @@ -73,35 +67,45 @@ public Map<Integer, Socket> getSocketMap() {
*/
private Map<Integer, Socket> connectNetwork(final NetworkConfiguration conf,
final Duration timeout) {

Map<Integer, Socket> socketMap = new HashMap<>(conf.noOfParties());

// We use two threads. One for the client connections and one for the server connections.
final int connectionThreads = 2;
ExecutorService connectionExecutor = Executors.newFixedThreadPool(connectionThreads);

// If either the client or the server thread fails we would like cancel the other as soon as
// possible. For this purpose we use a CompletionService.
CompletionService<Map<Integer, Socket>> connectionService =
new ExecutorCompletionService<>(connectionExecutor);
connectionService.submit(() -> connectClient(conf));
connectionService.submit(() -> connectServer(conf));
Duration remainingTime = timeout;
try {
Instant start = Instant.now();
for (int i = 0; i < connectionThreads; i++) {
remainingTime = remainingTime.minus(Duration.between(start, Instant.now()));
Future<Map<Integer, Socket>> completed =
connectionService.poll(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
if (completed == null) {
throw new TimeoutException("Timed out waiting for client connections");

try (ServerSocket server = serverFactory.createServerSocket(conf.getMe().getPort())) {
Future<Map<Integer, Socket>> clients = connectionExecutor.submit(() -> connectClient(conf));
Future<Map<Integer, Socket>> servers = connectionExecutor.submit(() -> connectServer(server, conf));
connectionExecutor.shutdown();
boolean termination = connectionExecutor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!termination) {
throw new RuntimeException("Timed out waiting for client connections");
}
socketMap.putAll(servers.get());
socketMap.putAll(clients.get());
} catch (ExecutionException | IOException | InterruptedException e) {
for (Map.Entry<?, Socket> entry : socketMap.entrySet()) {
// In case the first completed successfully but the other did not.
try {
entry.getValue().close();
}
catch (IOException ignored){
// ignore, we don't care that it is already closed.
}
}
// These two exceptions are expected in this form for some tests, they might break things if they change (maybe).
if (e instanceof ExecutionException || e instanceof IOException) {
// HandshakeExceptions need to be explicit.
if (e.getCause() instanceof SSLHandshakeException) {
throw new RuntimeException("Failed to connect network", e.getCause());
} else {
// Below, will either collect the connections made by the completed thread, or throw an
// ExecutionException, if the thread failed while trying to make the required connections.
socketMap.putAll(completed.get());
throw new RuntimeException("Failed to connect network", e);
}
}
} catch (ExecutionException e) {
throw new RuntimeException("Failed to connect network", e.getCause());
} catch (Exception e) {
throw new RuntimeException("Failed to connect network", e);
} finally {
connectionExecutor.shutdownNow();
}
Expand All @@ -117,50 +121,60 @@ private Map<Integer, Socket> connectNetwork(final NetworkConfiguration conf,
private Map<Integer, Socket> connectClient(final NetworkConfiguration conf)
throws InterruptedException, IOException {
Map<Integer, Socket> socketMap = new HashMap<>(conf.noOfParties() - conf.getMyId());
for (int i = conf.getMyId() + 1; i <= conf.noOfParties(); i++) {
Party p = conf.getParty(i);
boolean connectionMade = false;
int attempts = 0;
while (!connectionMade) {
try {
Socket sock = socketFactory.createSocket(p.getHostname(), p.getPort());
for (int j = 0; j < PARTY_ID_BYTES; j++) {
byte b = (byte) (conf.getMyId() >>> j * Byte.SIZE);
sock.getOutputStream().write(b);
try {
for (int i = conf.getMyId() + 1; i <= conf.noOfParties(); i++) {
// TODO: Split this up into N async tasks instead
Party p = conf.getParty(i);
boolean connectionMade = false;
int attempts = 0;
while (!connectionMade) {
try {
Socket sock = socketFactory.createSocket(p.getHostname(), p.getPort());
for (int j = 0; j < PARTY_ID_BYTES; j++) {
byte b = (byte) (conf.getMyId() >>> j * Byte.SIZE);
sock.getOutputStream().write(b);
}
connectionMade = true;
socketMap.put(i, sock);
logger.info("P{}: connected to {}", conf.getMyId(), p);
} catch (ConnectException e) {
// A ConnectionException is expected if the opposing side is not listening for our
// connection attempt yet. We ignore this and try again.
Thread.sleep(1L << ++attempts);
// This should probably not busy-wait for each party
}
connectionMade = true;
socketMap.put(i, sock);
logger.info("P{}: connected to {}", conf.getMyId(), p);
} catch (ConnectException e) {
// A connect exception is expected if the opposing side is not listening for our
// connection attempt yet. We ignore this and try again.
Thread.sleep(1 << ++attempts);
}
}
}
catch (Exception e) {
for (Map.Entry<?, Socket> entry : socketMap.entrySet()) {
entry.getValue().close();
}
throw e;
}
return socketMap;
}

/**
* Listens for connections from the opposing parties with lower id's.
*
* @throws IOException thrown if an {@link IOException} occurs while listening.
*/
private Map<Integer, Socket> connectServer(final NetworkConfiguration conf) throws IOException {
private Map<Integer, Socket> connectServer(ServerSocket server, final NetworkConfiguration conf) {
Map<Integer, Socket> socketMap = new HashMap<>(conf.getMyId() - 1);
if (conf.getMyId() > 1) {
try (ServerSocket server = serverFactory.createServerSocket(conf.getMe().getPort())) {
try {
logger.info("P{}: bound at port {}", conf.getMyId(), conf.getMe().getPort());
for (int i = 1; i < conf.getMyId(); i++) {
Socket sock = server.accept();
int id = 0;
for (int j = 0; j < PARTY_ID_BYTES; j++) {
id ^= sock.getInputStream().read() << j * Byte.SIZE;
}
socketMap.put(id, sock);
logger.info("P{}: accepted connection from P{}", conf.getMyId(), id);
socketMap.put(id, sock);
for (int i = 1; i < conf.getMyId(); i++) {
Socket sock = server.accept();
int id = 0;
for (int j = 0; j < PARTY_ID_BYTES; j++) {
id ^= sock.getInputStream().read() << j * Byte.SIZE;
}
socketMap.put(id, sock);
logger.info("P{}: accepted connection from P{}", conf.getMyId(), id);
socketMap.put(id, sock);
}
} catch (IOException e) {
logger.info(e.getMessage());
}
}
return socketMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@
import dk.alexandra.fresco.framework.configuration.NetworkConfiguration;
import dk.alexandra.fresco.framework.network.CloseableNetwork;
import dk.alexandra.fresco.framework.util.ExceptionConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.Socket;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link CloseableNetwork} implementation based on regular the {@link Socket} interface (i.e.,
Expand Down Expand Up @@ -194,10 +191,10 @@ private void closeCommunication() {
r.stop();
}
for (Socket sock : sockets) {
ExceptionConverter.safe(() -> {
try {
sock.close();
return null;
}, "Unable to properly close socket");
} catch (IOException ignored) {
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package dk.alexandra.fresco.framework.network.socket;

import dk.alexandra.fresco.framework.configuration.NetworkConfiguration;
import dk.alexandra.fresco.framework.configuration.NetworkUtil;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import dk.alexandra.fresco.framework.configuration.NetworkUtil;
import org.junit.Test;

/**
* Note: this test simply covers code not tested in {@link TestSocketNetwork}.
*/
public class TestConnector {

@Test(expected = InterruptedException.class)
@Ignore("There is no reason for this to fail anymore")
public void testInterruptWhileConnecting() throws Throwable {
ExecutorService es = Executors.newFixedThreadPool(2);
Map<Integer, NetworkConfiguration> confs = NetworkUtil.getNetworkConfigurations(2);
Expand Down
4 changes: 2 additions & 2 deletions demos/aes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>demos</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand All @@ -19,7 +19,7 @@
<dependency>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>bristol</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion demos/aggregation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>demos</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion demos/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>demos</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion demos/distance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>demos</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion demos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>master-pom</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
4 changes: 2 additions & 2 deletions demos/psi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>demos</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand All @@ -19,7 +19,7 @@
<dependency>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>bristol</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion demos/sum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>dk.alexandra.fresco</groupId>
<artifactId>demos</artifactId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion lib/bristol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@
<parent>
<artifactId>fresco-lib-pom</artifactId>
<groupId>dk.alexandra.fresco</groupId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>
</project>
2 changes: 1 addition & 1 deletion lib/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>fresco-lib-pom</artifactId>
<groupId>dk.alexandra.fresco</groupId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion lib/dea/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<parent>
<artifactId>fresco-lib-pom</artifactId>
<groupId>dk.alexandra.fresco</groupId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>

</project>
2 changes: 1 addition & 1 deletion lib/debug/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>fresco-lib-pom</artifactId>
<groupId>dk.alexandra.fresco</groupId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion lib/fixed/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>fresco-lib-pom</artifactId>
<groupId>dk.alexandra.fresco</groupId>
<version>1.3.4-SNAPSHOT</version>
<version>1.3.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Loading

0 comments on commit 8760b22

Please sign in to comment.