Skip to content

Commit

Permalink
Fix how hash slot assignment is retrieved and stored
Browse files Browse the repository at this point in the history
Previously, the `RedisClusterClient` used to obtain the hash slot
assignment as the first step of each `connect()` call. This is fairly
expensive and increases the load on the first endpoint in the list
(because we target the first endpoint when issuing `CLUSTER SLOTS`).

It is also unnecessary. Redis always sends a redirection when the node
to which a command is sent is not assigned the hash slot targetted
by the command. Until we observe such redirection, the hash slot
assignment we observed before is still valid. Hence, we can store
the hash slot assignment in the `RedisClusterClient` and reuse it
for all `RedisClusterConnection` objects, until the `MOVED` error
is seen. In such case, we reset the hash slot assignment so that
the next `connect()` call fetches it again.
  • Loading branch information
Ladicek committed Sep 20, 2023
1 parent 820ca0b commit c4303b5
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 63 deletions.
144 changes: 82 additions & 62 deletions src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static io.vertx.redis.client.Command.*;
Expand Down Expand Up @@ -124,6 +125,8 @@ public static void addMasterOnlyCommand(Command command) {
private final RedisClusterConnectOptions connectOptions;
private final PoolOptions poolOptions;

private final AtomicReference<Slots> slots = new AtomicReference<>();

public RedisClusterClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions poolOptions, RedisClusterConnectOptions connectOptions, TracingPolicy tracingPolicy) {
super(vertx, tcpOptions, poolOptions, connectOptions, tracingPolicy);
this.connectOptions = connectOptions;
Expand All @@ -139,72 +142,47 @@ public RedisClusterClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions
@Override
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();
// attempt to load the slots from the first good endpoint
connect(connectOptions.getEndpoints(), 0, promise);
getSlots()
.onSuccess(slots -> connect(slots, promise))
.onFailure(promise::fail);
return promise.future();
}

private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
if (index >= endpoints.size()) {
// stop condition
onConnect.handle(Future.failedFuture("Cannot connect to any of the provided endpoints"));
private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> onConnected) {
// validate if the pool config is valid
final int totalUniqueEndpoints = slots.endpoints().length;
if (poolOptions.getMaxSize() < totalUniqueEndpoints) {
// this isn't a valid setup, the connection pool will not accommodate all the required connections
onConnected.handle(Future.failedFuture("RedisOptions maxPoolSize < Cluster size(" + totalUniqueEndpoints + "): The pool is not able to hold all required connections!"));
return;
}

connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
connect(endpoints, index + 1, onConnect);
})
.onSuccess(conn -> {
// fetch slots from the cluster immediately to ensure slots are correct
getSlots(endpoints.get(index), conn, getSlots -> {
if (getSlots.failed()) {
// the slots command failed.
conn.close().onFailure(LOG::warn);
// try with the next one
connect(endpoints, index + 1, onConnect);
return;
}

// slots are loaded (this connection isn't needed anymore)
conn.close().onFailure(LOG::warn);
// create a cluster connection
final Slots slots = getSlots.result();
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicInteger counter = new AtomicInteger();
final Map<String, PooledRedisConnection> connections = new HashMap<>();

// validate if the pool config is valid
final int totalUniqueEndpoints = slots.endpoints().length;
if (poolOptions.getMaxSize() < totalUniqueEndpoints) {
// this isn't a valid setup, the connection pool will not accommodate all the required connections
onConnect.handle(Future.failedFuture("RedisOptions maxPoolSize < Cluster size(" + totalUniqueEndpoints + "): The pool is not able to hold all required connections!"));
return;
}

for (String endpoint : slots.endpoints()) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
failed.set(true);
connectionComplete(counter, slots, connections, failed, onConnect);
})
.onSuccess(cconn -> {
// there can be concurrent access to the connection map
// since this is a one time operation we can pay the penalty of
// synchronizing on each write (hopefully is only a few writes)
synchronized (connections) {
connections.put(endpoint, cconn);
}
connectionComplete(counter, slots, connections, failed, onConnect);
});
// create a cluster connection
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicInteger counter = new AtomicInteger();
final Map<String, PooledRedisConnection> connections = new HashMap<>();

for (String endpoint : slots.endpoints()) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
failed.set(true);
connectionComplete(counter, slots, connections, failed, onConnected);
})
.onSuccess(cconn -> {
// there can be concurrent access to the connection map
// since this is a one time operation we can pay the penalty of
// synchronizing on each write (hopefully is only a few writes)
synchronized (connections) {
connections.put(endpoint, cconn);
}
connectionComplete(counter, slots, connections, failed, onConnected);
});
});
}
}

private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections, AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnect) {
private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, PooledRedisConnection> connections,
AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnected) {
if (counter.incrementAndGet() == slots.endpoints().length) {
// end condition
if (failed.get()) {
Expand All @@ -220,31 +198,73 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map<String,
}
}
// return
onConnect.handle(Future.failedFuture("Failed to connect to all nodes of the cluster"));
onConnected.handle(Future.failedFuture("Failed to connect to all nodes of the cluster"));
} else {
onConnect.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectOptions, slots, connections)));
onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectOptions, slots,
() -> this.slots.set(null), connections)));
}
}
}

private void getSlots(String endpoint, RedisConnection conn, Handler<AsyncResult<Slots>> onGetSlots) {
private Future<Slots> getSlots() {
Slots slots = this.slots.get();
if (slots != null) {
return Future.succeededFuture(slots);
}

Promise<Slots> promise = vertx.promise();
// attempt to load the slots from the first good endpoint
getSlots(connectOptions.getEndpoints(), 0, promise);
return promise.future();
}

private void getSlots(List<String> endpoints, int index, Handler<AsyncResult<Slots>> onGotSlots) {
if (index >= endpoints.size()) {
// stop condition
onGotSlots.handle(Future.failedFuture("Cannot connect to any of the provided endpoints"));
return;
}

connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
// try with the next endpoint
getSlots(endpoints, index + 1, onGotSlots);
})
.onSuccess(conn -> {
getSlots(endpoints.get(index), conn, result -> {
// the connection is not needed anymore, regardless of success or failure
// (on success, we just finish, on failure, we'll try another endpoint)
conn.close().onFailure(LOG::warn);

if (result.failed()) {
// the slots command failed, try with next endpoint
getSlots(endpoints, index + 1, onGotSlots);
} else {
Slots slots = result.result();
this.slots.set(slots);
onGotSlots.handle(Future.succeededFuture(slots));
}
});
});
}

private void getSlots(String endpoint, RedisConnection conn, Handler<AsyncResult<Slots>> onGotSlots) {
conn.send(cmd(CLUSTER).arg("SLOTS"), send -> {
if (send.failed()) {
// failed to load the slots from this connection
onGetSlots.handle(Future.failedFuture(send.cause()));
onGotSlots.handle(Future.failedFuture(send.cause()));
return;
}

final Response reply = send.result();

if (reply == null || reply.size() == 0) {
// no slots available we can't really proceed
onGetSlots.handle(Future.failedFuture("SLOTS No slots available in the cluster."));
onGotSlots.handle(Future.failedFuture("SLOTS No slots available in the cluster."));
return;
}

onGetSlots.handle(Future.succeededFuture(new Slots(endpoint, reply)));
onGotSlots.handle(Future.succeededFuture(new Slots(endpoint, reply)));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ public static void addMasterOnlyCommand(Command command) {
private final VertxInternal vertx;
private final RedisClusterConnectOptions connectOptions;
private final Slots slots;
private final Runnable onMoved;
private final Map<String, PooledRedisConnection> connections;

RedisClusterConnection(Vertx vertx, RedisClusterConnectOptions connectOptions, Slots slots, Map<String, PooledRedisConnection> connections) {
RedisClusterConnection(Vertx vertx, RedisClusterConnectOptions connectOptions, Slots slots, Runnable onMoved,
Map<String, PooledRedisConnection> connections) {
this.vertx = (VertxInternal) vertx;
this.connectOptions = connectOptions;
this.slots = slots;
this.onMoved = onMoved;
this.connections = connections;
}

Expand Down Expand Up @@ -261,6 +264,7 @@ private void send(String endpoint, int retries, Request command, Handler<AsyncRe
final ErrorType cause = (ErrorType) send.cause();

if (cause.is("MOVED")) {
this.onMoved.run();
// cluster is unbalanced, need to reconnect
handler.handle(Future.failedFuture(cause));
return;
Expand Down Expand Up @@ -405,6 +409,7 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
final ErrorType cause = (ErrorType) send.cause();

if (cause.is("MOVED")) {
this.onMoved.run();
// cluster is unbalanced, need to reconnect
handler.handle(Future.failedFuture(cause));
return;
Expand Down

0 comments on commit c4303b5

Please sign in to comment.