diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java index cbc9aff00..721027ea4 100644 --- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java +++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java @@ -31,6 +31,7 @@ import cz.o2.proxima.core.storage.AbstractStorage.SerializableAbstractStorage; import cz.o2.proxima.core.storage.Partition; import cz.o2.proxima.core.util.Classpath; +import cz.o2.proxima.core.util.ExceptionUtils; import cz.o2.proxima.direct.core.AttributeWriterBase; import cz.o2.proxima.direct.core.Context; import cz.o2.proxima.direct.core.DataAccessor; @@ -49,6 +50,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -232,18 +234,26 @@ private Cluster getCluster(URI uri) { } private Cluster getCluster(String authority, @Nullable String username) { - final String clusterCachedKey = (username != null ? username + "@" : "") + authority; + final String clusterCachedKey = computeClusterKey(authority, username); synchronized (CLUSTER_MAP) { - Cluster cluster = CLUSTER_MAP.get(clusterCachedKey); - if (cluster == null) { - cluster = createCluster(authority); - CLUSTER_MAP.put(clusterCachedKey, cluster); - } + Cluster cluster = + CLUSTER_MAP.computeIfAbsent(clusterCachedKey, k -> createCluster(authority)); return Objects.requireNonNull(cluster); } } + private void removeCluster(URI uri) { + synchronized (CLUSTER_MAP) { + Optional.ofNullable(CLUSTER_MAP.remove(computeClusterKey(uri.getAuthority(), this.username))) + .ifPresent(Cluster::close); + } + } + + private static String computeClusterKey(String authority, @Nullable String username) { + return (username != null ? username + "@" : "") + authority; + } + @VisibleForTesting Cluster createCluster(String authority) { log.info("Creating cluster for authority {} in accessor {}", authority, this); @@ -252,7 +262,6 @@ Cluster createCluster(String authority) { @VisibleForTesting Builder configureClusterBuilder(Builder builder, String authority) { - builder.addContactPointsWithPorts( Arrays.stream(authority.split(",")) .map(CassandraDBAccessor::getAddress) @@ -273,10 +282,27 @@ static InetSocketAddress getAddress(String p) { } Session ensureSession() { + return ensureSessionInternal(0); + } + + private Session ensureSessionInternal(int retry) { Cluster cluster = getCluster(getUri()); Preconditions.checkState(cluster != null); - /** Session we are connected to. */ - Session session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect); + /* Session we are connected to. */ + Session session; + try { + session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect); + } catch (Exception ex) { + if (retry < 3) { + ExceptionUtils.ignoringInterrupted( + () -> TimeUnit.MILLISECONDS.sleep((int) (Math.pow(2, retry) * 100))); + } else { + throw ex; + } + log.warn("Exception while creating session from cluster. Retry {}.", retry, ex); + removeCluster(getUri()); + return ensureSessionInternal(retry + 1); + } if (session.isClosed()) { synchronized (this) { session = CLUSTER_SESSIONS.get(cluster);