Skip to content

Commit

Permalink
Java: Add commands XGROUP CREATECONSUMER and XGROUP DELCONSUMER (#…
Browse files Browse the repository at this point in the history
…360)

* Add XGROUP CreateConsumer, DelConsumer

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

* Update Xgroup createconsumer docs

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>

---------

Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
acarbonetto committed Jun 18, 2024
1 parent 9a51338 commit ed46eba
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 3 deletions.
18 changes: 15 additions & 3 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,21 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
}),
}),
b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double),
b"HEXISTS" | b"HSETNX" | b"EXPIRE" | b"EXPIREAT" | b"PEXPIRE" | b"PEXPIREAT"
| b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY"
| b"XGROUP DESTROY" | b"MSETNX" => Some(ExpectedReturnType::Boolean),
b"HEXISTS"
| b"HSETNX"
| b"EXPIRE"
| b"EXPIREAT"
| b"PEXPIRE"
| b"PEXPIREAT"
| b"SISMEMBER"
| b"PERSIST"
| b"SMOVE"
| b"RENAMENX"
| b"MOVE"
| b"COPY"
| b"MSETNX"
| b"XGROUP DESTROY"
| b"XGROUP CREATECONSUMER" => Some(ExpectedReturnType::Boolean),
b"SMISMEMBER" => Some(ExpectedReturnType::ArrayOfBools),
b"SMEMBERS" | b"SINTER" | b"SDIFF" | b"SUNION" => Some(ExpectedReturnType::Set),
b"ZSCORE" | b"GEODIST" => Some(ExpectedReturnType::DoubleOrNull),
Expand Down
2 changes: 2 additions & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enum RequestType {
UnWatch = 184;
GeoSearchStore = 185;
SUnion = 186;
XGroupCreateConsumer = 188;
XGroupDelConsumer = 189;
}

message Command {
Expand Down
8 changes: 8 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ pub enum RequestType {
UnWatch = 184,
GeoSearchStore = 185,
SUnion = 186,
XGroupCreateConsumer = 188,
XGroupDelConsumer = 189,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -391,6 +393,8 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::Watch => RequestType::Watch,
ProtobufRequestType::UnWatch => RequestType::UnWatch,
ProtobufRequestType::GeoSearchStore => RequestType::GeoSearchStore,
ProtobufRequestType::XGroupCreateConsumer => RequestType::XGroupCreateConsumer,
ProtobufRequestType::XGroupDelConsumer => RequestType::XGroupDelConsumer,
}
}
}
Expand Down Expand Up @@ -584,6 +588,10 @@ impl RequestType {
RequestType::Watch => Some(cmd("WATCH")),
RequestType::UnWatch => Some(cmd("UNWATCH")),
RequestType::GeoSearchStore => Some(cmd("GEOSEARCHSTORE")),
RequestType::XGroupCreateConsumer => {
Some(get_two_word_command("XGROUP", "CREATECONSUMER"))
}
RequestType::XGroupDelConsumer => Some(get_two_word_command("XGROUP", "DELCONSUMER")),
}
}
}
16 changes: 16 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -1413,6 +1415,20 @@ public CompletableFuture<Boolean> xgroupDestroy(@NonNull String key, @NonNull St
XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Boolean> xgroupCreateConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
return commandManager.submitNewCommand(
XGroupCreateConsumer, new String[] {key, group, consumer}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Long> xgroupDelConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
return commandManager.submitNewCommand(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,39 @@ CompletableFuture<String> xgroupCreate(
* }</pre>
*/
CompletableFuture<Boolean> xgroupDestroy(String key, String groupname);

/**
* Creates a consumer named <code>consumer</code> in the consumer group <code>group</code> for the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-createconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return <code>true</code> if the consumer is created. Otherwise, <code>false</code>.
* @example
* <pre>{@code
* // Creates the consumer "myconsumer" in consumer group "mygroup"
* assert client.xgroupDestroy("mystream", "mygroup", "myconsumer").get();
* }</pre>
*/
CompletableFuture<Boolean> xgroupCreateConsumer(String key, String group, String consumer);

/**
* Deletes a consumer named <code>consumer</code> in the consumer group <code>group</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-delconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return The number of pending messages the <code>consumer</code> had before it was deleted.
* @example
* <pre>{@code
* // Deletes the consumer "myconsumer" in consumer group "mygroup"
* Long pendingMsgCount = client.xgroupDestroy("mystream", "mygroup", "myconsumer").get();
* System.out.println("Consumer 'myconsumer' had " +
* + pendingMsgCount + " pending messages unclaimed.");
* }</pre>
*/
CompletableFuture<Long> xgroupDelConsumer(String key, String group, String consumer);
}
36 changes: 36 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -3009,6 +3011,40 @@ public T xgroupDestroy(@NonNull String key, @NonNull String groupname) {
return getThis();
}

/**
* Creates a consumer named <code>consumer</code> in the consumer group <code>group</code> for the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-createconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return Command Response - <code>true</code> if the consumer is created. Otherwise, <code>false
* </code>.
*/
public T xgroupCreateConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
protobufTransaction.addCommands(
buildCommand(XGroupCreateConsumer, buildArgs(key, group, consumer)));
return getThis();
}

/**
* Deletes a consumer named <code>consumer</code> in the consumer group <code>group</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-delconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return Command Response - The number of pending messages the <code>consumer</code> had before
* it was deleted.
*/
public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull String consumer) {
protobufTransaction.addCommands(
buildCommand(XGroupDelConsumer, buildArgs(key, group, consumer)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
54 changes: 54 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -4454,6 +4456,58 @@ public void xgroupDestroy() {
assertEquals(Boolean.TRUE, payload);
}

@SneakyThrows
@Test
public void xgroupCreateConsumer() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String consumerName = "testConsumerName";
String[] arguments = new String[] {key, groupName, consumerName};

CompletableFuture<Boolean> testResponse = new CompletableFuture<>();
testResponse.complete(Boolean.TRUE);

// match on protobuf request
when(commandManager.<Boolean>submitNewCommand(eq(XGroupCreateConsumer), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Boolean> response =
service.xgroupCreateConsumer(key, groupName, consumerName);
Boolean payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(Boolean.TRUE, payload);
}

@SneakyThrows
@Test
public void xgroupDelConsumer() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String consumerName = "testConsumerName";
String[] arguments = new String[] {key, groupName, consumerName};
Long result = 28L;

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(result);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(XGroupDelConsumer), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.xgroupDelConsumer(key, groupName, consumerName);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void type_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -775,6 +777,12 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
transaction.xgroupDestroy("key", "group");
results.add(Pair.of(XGroupDestroy, buildArgs("key", "group")));

transaction.xgroupCreateConsumer("key", "group", "consumer");
results.add(Pair.of(XGroupCreateConsumer, buildArgs("key", "group", "consumer")));

transaction.xgroupDelConsumer("key", "group", "consumer");
results.add(Pair.of(XGroupDelConsumer, buildArgs("key", "group", "consumer")));

transaction.time();
results.add(Pair.of(Time, buildArgs()));

Expand Down
51 changes: 51 additions & 0 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3477,6 +3477,57 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) {
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) {
String key = UUID.randomUUID().toString();
String stringKey = UUID.randomUUID().toString();
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// create group and consumer for the group
assertEquals(
OK,
client
.xgroupCreate(
key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
.get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());

// create consumer for group that does not exist results in a NOGROUP request error
ExecutionException executionException =
assertThrows(
ExecutionException.class,
() -> client.xgroupCreateConsumer(key, "not_a_group", consumerName).get());
assertInstanceOf(RequestException.class, executionException.getCause());
assertTrue(executionException.getMessage().contains("NOGROUP"));

// create consumer for group again
assertFalse(client.xgroupCreateConsumer(key, groupName, consumerName).get());

// Deletes a consumer that is not created yet returns 0
assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get());

// TODO use XREADGROUP to mark pending messages for the consumer so that we get non-zero return
assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get());

// key is a string and cannot be created as a stream
assertEquals(OK, client.set(stringKey, "not_a_stream").get());
executionException =
assertThrows(
ExecutionException.class,
() -> client.xgroupCreateConsumer(stringKey, groupName, consumerName).get());
assertInstanceOf(RequestException.class, executionException.getCause());

executionException =
assertThrows(
ExecutionException.class,
() -> client.xgroupDelConsumer(stringKey, groupName, consumerName).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
final String streamKey1 = "{streamKey}-1-" + UUID.randomUUID();
final String groupName1 = "{groupName}-1-" + UUID.randomUUID();
final String groupName2 = "{groupName}-2-" + UUID.randomUUID();
final String consumer1 = "{consumer}-1-" + UUID.randomUUID();
final String consumer2 = "{consumer}-2-" + UUID.randomUUID();

transaction
.xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build())
Expand All @@ -733,6 +735,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
.xgroupCreate(streamKey1, groupName1, "0-0")
.xgroupCreate(
streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build())
.xgroupCreateConsumer(streamKey1, groupName1, consumer1)
.xgroupDelConsumer(streamKey1, groupName1, consumer1)
.xgroupDestroy(streamKey1, groupName1)
.xgroupDestroy(streamKey1, groupName2)
.xdel(streamKey1, new String[] {"0-3", "0-5"});
Expand All @@ -753,6 +757,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
1L, // xtrim(streamKey1, new MinId(true, "0-2"))
OK, // xgroupCreate(streamKey1, groupName1, "0-0")
OK, // xgroupCreate(streamKey1, groupName1, "0-0", options)
true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1)
0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
true, // xgroupDestroy(streamKey1, groupName1)
true, // xgroupDestroy(streamKey1, groupName2)
1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});
Expand Down

0 comments on commit ed46eba

Please sign in to comment.