Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add commands XGROUP CREATECONSUMER and XGROUP DELCONSUMER #360

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,9 +860,21 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
}),
}),
b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double),
b"HEXISTS" | b"HSETNX" | b"EXPIRE" | b"EXPIREAT" | b"PEXPIRE" | b"PEXPIREAT"
| b"SISMEMBER" | b"PERSIST" | b"SMOVE" | b"RENAMENX" | b"MOVE" | b"COPY"
| b"XGROUP DESTROY" | b"MSETNX" => Some(ExpectedReturnType::Boolean),
b"HEXISTS"
| b"HSETNX"
| b"EXPIRE"
| b"EXPIREAT"
| b"PEXPIRE"
| b"PEXPIREAT"
| b"SISMEMBER"
| b"PERSIST"
| b"SMOVE"
| b"RENAMENX"
| b"MOVE"
| b"COPY"
| b"MSETNX"
| b"XGROUP DESTROY"
| b"XGROUP CREATECONSUMER" => Some(ExpectedReturnType::Boolean),
b"SMISMEMBER" => Some(ExpectedReturnType::ArrayOfBools),
b"SMEMBERS" | b"SINTER" | b"SDIFF" => Some(ExpectedReturnType::Set),
b"ZSCORE" | b"GEODIST" => Some(ExpectedReturnType::DoubleOrNull),
Expand Down
2 changes: 2 additions & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ enum RequestType {
GeoSearch = 182;
Watch = 183;
UnWatch = 184;
XGroupCreateConsumer = 185;
XGroupDelConsumer = 186;
}

message Command {
Expand Down
8 changes: 8 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ pub enum RequestType {
GeoSearch = 182,
Watch = 183,
UnWatch = 184,
XGroupCreateConsumer = 185,
XGroupDelConsumer = 186,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -387,6 +389,8 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::GeoSearch => RequestType::GeoSearch,
ProtobufRequestType::Watch => RequestType::Watch,
ProtobufRequestType::UnWatch => RequestType::UnWatch,
ProtobufRequestType::XGroupCreateConsumer => RequestType::XGroupCreateConsumer,
ProtobufRequestType::XGroupDelConsumer => RequestType::XGroupDelConsumer,
}
}
}
Expand Down Expand Up @@ -578,6 +582,10 @@ impl RequestType {
RequestType::GeoSearch => Some(cmd("GEOSEARCH")),
RequestType::Watch => Some(cmd("WATCH")),
RequestType::UnWatch => Some(cmd("UNWATCH")),
RequestType::XGroupCreateConsumer => {
Some(get_two_word_command("XGROUP", "CREATECONSUMER"))
}
RequestType::XGroupDelConsumer => Some(get_two_word_command("XGROUP", "DELCONSUMER")),
}
}
}
16 changes: 16 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -1440,6 +1442,20 @@ public CompletableFuture<Boolean> xgroupDestroy(@NonNull String key, @NonNull St
XGroupDestroy, new String[] {key, groupname}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Boolean> xgroupCreateConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
return commandManager.submitNewCommand(
XGroupCreateConsumer, new String[] {key, group, consumer}, this::handleBooleanResponse);
}

@Override
public CompletableFuture<Long> xgroupDelConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
return commandManager.submitNewCommand(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,39 @@ CompletableFuture<String> xgroupCreate(
* }</pre>
*/
CompletableFuture<Boolean> xgroupDestroy(String key, String groupname);

/**
* Creates a consumer named <code>consumer</code> in the consumer group <code>group</code> for the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-createconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return <code>true</code> if the consumer is created. Otherwise, <code>false</code>.
* @example
* <pre>{@code
* // Creates the consumer "myconsumer" in consumer group "mygroup"
* assert client.xgroupDestroy("mystream", "mygroup", "myconsumer").get();
* }</pre>
*/
CompletableFuture<Boolean> xgroupCreateConsumer(String key, String group, String consumer);

/**
* Deletes a consumer named <code>consumer</code> in the consumer group <code>group</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-delconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return the number of pending messages the <code>consumer</code> had before it was deleted.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* @example
* <pre>{@code
* // Deletes the consumer "myconsumer" in consumer group "mygroup"
* Long pendingMsgCount = client.xgroupDestroy("mystream", "mygroup", "myconsumer").get();
* System.out.println("Consumer 'myconsumer' had " +
* + pendingMsgCount + " pending messages unclaimed.");
* }</pre>
*/
CompletableFuture<Long> xgroupDelConsumer(String key, String group, String consumer);
}
42 changes: 42 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -3009,6 +3011,46 @@ public T xgroupDestroy(@NonNull String key, @NonNull String groupname) {
return getThis();
}

/**
* Creates a consumer named <code>consumer</code> in the consumer group <code>group</code> for the
* stream stored at <code>key</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-createconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return Command Response - <code>true</code> if the consumer is created. Otherwise, <code>false
* </code>.
*/
public T xgroupCreateConsumer(
@NonNull String key, @NonNull String group, @NonNull String consumer) {
protobufTransaction.addCommands(
buildCommand(XGroupCreateConsumer, buildArgs(key, group, consumer)));
return getThis();
}

/**
* Deletes a consumer named <code>consumer</code> in the consumer group <code>group</code>.
*
* @see <a href="https://valkey.io/commands/xgroup-delconsumer/">valkey.io</a> for details.
* @param key The key of the stream.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return the number of pending messages the <code>consumer</code> had before it was deleted.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* @example
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* <pre>{@code
* // Deletes the consumer "myconsumer" in consumer group "mygroup"
* Long pendingMsgCount = client.xgroupDestroy("mystream", "mygroup", "myconsumer").get();
* System.out.println("Consumer 'myconsumer' had " +
* + pendingMsgCount + " pending messages unclaimed.");
* }</pre>
*/
public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull String consumer) {
protobufTransaction.addCommands(
buildCommand(XGroupDelConsumer, buildArgs(key, group, consumer)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
54 changes: 54 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -4454,6 +4456,58 @@ public void xgroupDestroy() {
assertEquals(Boolean.TRUE, payload);
}

@SneakyThrows
@Test
public void xgroupCreateConsumer() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String consumerName = "testConsumerName";
String[] arguments = new String[] {key, groupName, consumerName};

CompletableFuture<Boolean> testResponse = new CompletableFuture<>();
testResponse.complete(Boolean.TRUE);

// match on protobuf request
when(commandManager.<Boolean>submitNewCommand(eq(XGroupCreateConsumer), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Boolean> response =
service.xgroupCreateConsumer(key, groupName, consumerName);
Boolean payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(Boolean.TRUE, payload);
}

@SneakyThrows
@Test
public void xgroupDelConsumer() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String consumerName = "testConsumerName";
String[] arguments = new String[] {key, groupName, consumerName};
Long result = 28L;

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(result);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(XGroupDelConsumer), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.xgroupDelConsumer(key, groupName, consumerName);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void type_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -775,6 +777,12 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
transaction.xgroupDestroy("key", "group");
results.add(Pair.of(XGroupDestroy, buildArgs("key", "group")));

transaction.xgroupCreateConsumer("key", "group", "consumer");
results.add(Pair.of(XGroupCreateConsumer, buildArgs("key", "group", "consumer")));

transaction.xgroupDelConsumer("key", "group", "consumer");
results.add(Pair.of(XGroupDelConsumer, buildArgs("key", "group", "consumer")));

transaction.time();
results.add(Pair.of(Time, buildArgs()));

Expand Down
56 changes: 56 additions & 0 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3488,6 +3488,62 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) {
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xgroupCreateConsumer_xgroupDelConsumer(BaseClient client) {
String key = UUID.randomUUID().toString();
String stringKey = UUID.randomUUID().toString();
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// create group and consumer for the group
assertEquals(
OK,
client
.xgroupCreate(
key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
.get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());

// create consumer for group that does not exist results in a NOGROUP request error
ExecutionException executionException =
assertThrows(
ExecutionException.class,
() -> client.xgroupCreateConsumer(key, "not_a_group", consumerName).get());
assertInstanceOf(RequestException.class, executionException.getCause());
assertTrue(executionException.getMessage().contains("NOGROUP"));

// create consumer for group again
assertFalse(client.xgroupCreateConsumer(key, groupName, consumerName).get());

// Deletes a consumer that is not created yet returns 0
assertEquals(0L, client.xgroupDelConsumer(key, groupName, "not_a_consumer").get());

// String streamid_1 = client.xadd(key, Map.of("field1", "value1")).get();
// assertNotNull(streamid_1);
// String streamid_2 = client.xadd(key, Map.of("field2", "value2")).get();
// assertNotNull(streamid_2);
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved

// TODO use XREADGROUP to mark pending messages for the consumer so that we get non-zero return
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get());

// key is a string and cannot be created as a stream
assertEquals(OK, client.set(stringKey, "not_a_stream").get());
executionException =
assertThrows(
ExecutionException.class,
() -> client.xgroupCreateConsumer(stringKey, groupName, consumerName).get());
assertInstanceOf(RequestException.class, executionException.getCause());

executionException =
assertThrows(
ExecutionException.class,
() -> client.xgroupDelConsumer(stringKey, groupName, consumerName).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
final String streamKey1 = "{streamKey}-1-" + UUID.randomUUID();
final String groupName1 = "{groupName}-1-" + UUID.randomUUID();
final String groupName2 = "{groupName}-2-" + UUID.randomUUID();
final String consumer1 = "{consumer}-1-" + UUID.randomUUID();
final String consumer2 = "{consumer}-2-" + UUID.randomUUID();

transaction
.xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build())
Expand All @@ -733,6 +735,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
.xgroupCreate(streamKey1, groupName1, "0-0")
.xgroupCreate(
streamKey1, groupName2, "0-0", StreamGroupOptions.builder().makeStream().build())
.xgroupCreateConsumer(streamKey1, groupName1, consumer1)
.xgroupDelConsumer(streamKey1, groupName1, consumer1)
.xgroupDestroy(streamKey1, groupName1)
.xgroupDestroy(streamKey1, groupName2)
.xdel(streamKey1, new String[] {"0-3", "0-5"});
Expand All @@ -753,6 +757,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
1L, // xtrim(streamKey1, new MinId(true, "0-2"))
OK, // xgroupCreate(streamKey1, groupName1, "0-0")
OK, // xgroupCreate(streamKey1, groupName1, "0-0", options)
true, // xgroupCreateConsumer(streamKey1, groupName1, consumer1)
0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
true, // xgroupDestroy(streamKey1, groupName1)
true, // xgroupDestroy(streamKey1, groupName2)
1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});
Expand Down
Loading