Skip to content

Commit

Permalink
Java: Adding command WAIT (valkey-io#1707)
Browse files Browse the repository at this point in the history
* Java: Adding command WAIT

Java: Adding command WAIT

* addressing comments

* fixing timeout_idx in get_timeout_from_cmd_args call

* update timeout check

* fixing rust test

* adding special case for WAIT

* rust linter

* remove special case in get_timeout_from_cmd_args

* adding description for timeout 0

* rust linter

* updating timeout test

* changing transaction documentation

---------

Co-authored-by: TJ Zhang <tj.zhang@improving.com>
  • Loading branch information
tjzhang-BQ and TJ Zhang authored Jun 30, 2024
1 parent 5ce7a32 commit 40c3a55
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 0 deletions.
12 changes: 12 additions & 0 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult<Opti
.position(b"BLOCK")
.map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds))
.unwrap_or(Ok(RequestTimeoutOption::ClientConfig)),
b"WAIT" => get_timeout_from_cmd_arg(cmd, 2, TimeUnit::Milliseconds),
_ => Ok(RequestTimeoutOption::ClientConfig),
}?;

Expand Down Expand Up @@ -736,6 +737,17 @@ mod tests {
0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("WAIT").arg(1).arg("500");
let result = get_request_timeout(&cmd, Duration::from_millis(500));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ enum RequestType {
SScan = 200;
ZScan = 201;
HScan = 202;
Wait = 208;
}

message Command {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub enum RequestType {
SScan = 200,
ZScan = 201,
HScan = 202,
Wait = 208,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -425,6 +426,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::SScan => RequestType::SScan,
ProtobufRequestType::ZScan => RequestType::ZScan,
ProtobufRequestType::HScan => RequestType::HScan,
ProtobufRequestType::Wait => RequestType::Wait,
}
}
}
Expand Down Expand Up @@ -637,6 +639,7 @@ impl RequestType {
RequestType::SScan => Some(cmd("SSCAN")),
RequestType::ZScan => Some(cmd("ZSCAN")),
RequestType::HScan => Some(cmd("HSCAN")),
RequestType::Wait => Some(cmd("WAIT")),
}
}
}
9 changes: 9 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
Expand Down Expand Up @@ -2950,4 +2951,12 @@ public CompletableFuture<Object[]> hscan(
String[] arguments = concatenateArrays(new String[] {key, cursor}, hScanOptions.toArgs());
return commandManager.submitNewCommand(HScan, arguments, this::handleArrayResponse);
}

@Override
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
return commandManager.submitNewCommand(
Wait,
new String[] {Long.toString(numreplicas), Long.toString(timeout)},
this::handleLongResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1207,4 +1207,22 @@ CompletableFuture<String> restore(
* }</pre>
*/
CompletableFuture<Long> sortStore(String key, String destination);

/**
* Blocks the current client until all the previous write commands are successfully transferred
* and acknowledged by at least <code>numreplicas</code> of replicas. If <code>timeout</code> is
* reached, the command returns even if the specified number of replicas were not yet reached.
*
* @param numreplicas The number of replicas to reach.
* @param timeout The timeout value specified in milliseconds. A value of <code>0</code> will
* block indefinitely.
* @return The number of replicas reached by all the writes performed in the context of the
* current connection.
* @example
* <pre>{@code
* client.set("key", "value).get();
* assert client.wait(1L, 1000L).get() == 1L;
* }</pre>
*/
CompletableFuture<Long> wait(long numreplicas, long timeout);
}
17 changes: 17 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
Expand Down Expand Up @@ -5623,6 +5624,22 @@ public T hscan(@NonNull String key, @NonNull String cursor, @NonNull HScanOption
return getThis();
}

/**
* Returns the number of replicas that acknowledged the write commands sent by the current client
* before this command, both in the case where the specified number of replicas are reached, or
* when the timeout is reached.
*
* @param numreplicas The number of replicas to reach.
* @param timeout The timeout value specified in milliseconds.
* @return Command Response - The number of replicas reached by all the writes performed in the
* context of the current connection.
*/
public T wait(long numreplicas, long timeout) {
String[] args = buildArgs(Long.toString(numreplicas), Long.toString(timeout));
protobufTransaction.addCommands(buildCommand(Wait, args));
return getThis();
}

/** Build protobuf {@link Command} object for given command and arguments. */
protected Command buildCommand(RequestType requestType) {
// An empty args array is still needed for parameter-less commands.
Expand Down
25 changes: 25 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
Expand Down Expand Up @@ -9002,6 +9003,30 @@ public void sortStore_with_options_returns_success() {
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void wait_returns_success() {
// setup
long numreplicas = 1L;
long timeout = 1000L;
Long result = 5L;
String[] args = new String[] {"1", "1000"};

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(result);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(Wait), eq(args), any())).thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.wait(numreplicas, timeout);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void sscan_with_options_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
Expand Down Expand Up @@ -1387,6 +1388,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
HScanOptions.COUNT_OPTION_STRING,
"10")));

transaction.wait(1L, 1000L);
results.add(Pair.of(Wait, buildArgs("1", "1000")));

var protobufTransaction = transaction.getProtobufTransaction().build();

for (int idx = 0; idx < protobufTransaction.getCommandsCount(); idx++) {
Expand Down
43 changes: 43 additions & 0 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -8054,4 +8054,47 @@ public void hscan(BaseClient client) {
() -> client.hscan(key1, "-1", HScanOptions.builder().count(-1L).build()).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void waitTest(BaseClient client) {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;

// assert that wait returns 0 under standalone and 1 under cluster mode.
assertEquals(OK, client.set(key, "value").get());
assertTrue(client.wait(numreplicas, timeout).get() >= (client instanceof RedisClient ? 0 : 1));

// command should fail on a negative timeout value
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> client.wait(1L, -1L).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void wait_timeout_check(BaseClient client) {
String key = UUID.randomUUID().toString();
// create new client with default request timeout (250 millis)
try (var testClient =
client instanceof RedisClient
? RedisClient.CreateClient(commonClientConfig().build()).get()
: RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {

// ensure that commands do not time out, even if timeout > request timeout
assertEquals(OK, testClient.set(key, "value").get());
assertEquals((client instanceof RedisClient ? 0 : 1), testClient.wait(1L, 1000L).get());

// with 0 timeout (no timeout) wait should block indefinitely,
// but we wrap the test with timeout to avoid test failing or being stuck forever
assertEquals(OK, testClient.set(key, "value2").get());
assertThrows(
TimeoutException.class, // <- future timeout, not command timeout
() -> testClient.wait(100L, 0L).get(1000, TimeUnit.MILLISECONDS));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,24 @@ public void sort() {

assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
ClusterTransaction transaction = new ClusterTransaction();

transaction.set(key, "value").wait(numreplicas, timeout);
Object[] results = clusterClient.exec(transaction).get();
Object[] expectedResult =
new Object[] {
OK, // set(key, "value")
0L, // wait(numreplicas, timeout)
};
assertEquals(expectedResult[0], results[0]);
assertTrue((Long) expectedResult[1] <= (Long) results[1]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,26 @@ public void sort_and_sortReadOnly() {
assertArrayEquals(expectedResults, client.exec(transaction2).get());
}
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
Transaction transaction = new Transaction();

transaction.set(key, "value");
transaction.wait(numreplicas, timeout);

Object[] results = client.exec(transaction).get();
Object[] expectedResult =
new Object[] {
OK, // set(key, "value")
0L, // wait(numreplicas, timeout)
};
assertEquals(expectedResult[0], results[0]);
assertTrue((long) expectedResult[1] <= (long) results[1]);
}
}

0 comments on commit 40c3a55

Please sign in to comment.