Skip to content

Commit

Permalink
[proxima-direct-cassandra] reinitialize cluster after failed session …
Browse files Browse the repository at this point in the history
…creation
  • Loading branch information
je-ik committed Sep 23, 2024
1 parent 2b7a371 commit a9c25dd
Showing 1 changed file with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import cz.o2.proxima.core.storage.AbstractStorage.SerializableAbstractStorage;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.util.Classpath;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.DataAccessor;
Expand All @@ -49,6 +50,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -232,18 +234,26 @@ private Cluster getCluster(URI uri) {
}

private Cluster getCluster(String authority, @Nullable String username) {
final String clusterCachedKey = (username != null ? username + "@" : "") + authority;
final String clusterCachedKey = computeClusterKey(authority, username);

synchronized (CLUSTER_MAP) {
Cluster cluster = CLUSTER_MAP.get(clusterCachedKey);
if (cluster == null) {
cluster = createCluster(authority);
CLUSTER_MAP.put(clusterCachedKey, cluster);
}
Cluster cluster =
CLUSTER_MAP.computeIfAbsent(clusterCachedKey, k -> createCluster(authority));
return Objects.requireNonNull(cluster);
}
}

private void removeCluster(URI uri) {
synchronized (CLUSTER_MAP) {
Optional.ofNullable(CLUSTER_MAP.remove(computeClusterKey(uri.getAuthority(), this.username)))
.ifPresent(Cluster::close);
}
}

private static String computeClusterKey(String authority, @Nullable String username) {
return (username != null ? username + "@" : "") + authority;
}

@VisibleForTesting
Cluster createCluster(String authority) {
log.info("Creating cluster for authority {} in accessor {}", authority, this);
Expand All @@ -252,7 +262,6 @@ Cluster createCluster(String authority) {

@VisibleForTesting
Builder configureClusterBuilder(Builder builder, String authority) {

builder.addContactPointsWithPorts(
Arrays.stream(authority.split(","))
.map(CassandraDBAccessor::getAddress)
Expand All @@ -273,10 +282,27 @@ static InetSocketAddress getAddress(String p) {
}

Session ensureSession() {
return ensureSessionInternal(0);
}

private Session ensureSessionInternal(int retry) {
Cluster cluster = getCluster(getUri());
Preconditions.checkState(cluster != null);
/** Session we are connected to. */
Session session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect);
/* Session we are connected to. */
Session session;
try {
session = CLUSTER_SESSIONS.computeIfAbsent(cluster, Cluster::connect);
} catch (Exception ex) {
if (retry < 3) {
ExceptionUtils.ignoringInterrupted(
() -> TimeUnit.MILLISECONDS.sleep((int) (Math.pow(2, retry) * 100)));
} else {
throw ex;
}
log.warn("Exception while creating session from cluster. Retry {}.", retry, ex);
removeCluster(getUri());
return ensureSessionInternal(retry + 1);
}
if (session.isClosed()) {
synchronized (this) {
session = CLUSTER_SESSIONS.get(cluster);
Expand Down

0 comments on commit a9c25dd

Please sign in to comment.