From c8d93085f25ec299d139d4d82ebf2fa27d828e58 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Thu, 28 Mar 2024 23:59:41 +0600 Subject: [PATCH 1/2] Consider null values in empty StreamPendingSummary --- .../redis/clients/jedis/BuilderFactory.java | 23 ++++--- .../commands/jedis/StreamsCommandsTest.java | 64 ++++++++++++------- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index 83e552de83..e5695e3df0 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1812,15 +1812,20 @@ public StreamPendingSummary build(Object data) { } List objectList = (List) data; - long total = BuilderFactory.LONG.build(objectList.get(0)); - String minId = SafeEncoder.encode((byte[]) objectList.get(1)); - String maxId = SafeEncoder.encode((byte[]) objectList.get(2)); - List> consumerObjList = (List>) objectList.get(3); - Map map = new HashMap<>(consumerObjList.size()); - for (List consumerObj : consumerObjList) { - map.put(SafeEncoder.encode((byte[]) consumerObj.get(0)), Long.parseLong(SafeEncoder.encode((byte[]) consumerObj.get(1)))); - } - return new StreamPendingSummary(total, new StreamEntryID(minId), new StreamEntryID(maxId), map); + long total = LONG.build(objectList.get(0)); + StreamEntryID minId = STREAM_ENTRY_ID.build(objectList.get(1)); + StreamEntryID maxId = STREAM_ENTRY_ID.build(objectList.get(2)); + Map map; + if (objectList.get(3) == null) { + map = null; + } else { + List> consumerObjList = (List>) objectList.get(3); + map = new HashMap<>(consumerObjList.size()); + for (List consumerObj : consumerObjList) { + map.put(STRING.build(consumerObj.get(0)), Long.parseLong(STRING.build(consumerObj.get(1)))); + } + } + return new StreamPendingSummary(total, minId, maxId, map); } @Override diff --git a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java index 552c8ac1f6..0ac12f9d05 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java @@ -599,13 +599,22 @@ public void xack() { @Test public void xpendingWithParams() { + final String stream = "xpendeing-stream"; + + assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, true)); + + // Get the summary from empty stream + StreamPendingSummary emptySummary = jedis.xpending(stream, "xpendeing-group"); + assertEquals(0, emptySummary.getTotal()); + assertNull(emptySummary.getMinId()); + assertNull(emptySummary.getMaxId()); + assertNull(emptySummary.getConsumerMessageCount()); + Map map = new HashMap<>(); map.put("f1", "v1"); - StreamEntryID id1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + StreamEntryID id1 = jedis.xadd(stream, (StreamEntryID) null, map); - assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); - - Map streamQeury1 = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + Map streamQeury1 = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); // Read the event from Stream put it on pending List>> range = jedis.xreadGroup("xpendeing-group", @@ -614,8 +623,14 @@ public void xpendingWithParams() { assertEquals(1, range.get(0).getValue().size()); assertEquals(map, range.get(0).getValue().get(0).getFields()); + // Get the summary about the pending messages + StreamPendingSummary pendingSummary = jedis.xpending(stream, "xpendeing-group"); + assertEquals(1, pendingSummary.getTotal()); + assertEquals(id1, pendingSummary.getMinId()); + assertEquals(1l, pendingSummary.getConsumerMessageCount().get("xpendeing-consumer").longValue()); + // Get the pending event - List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + List pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3).consumer("xpendeing-consumer")); assertEquals(1, pendingRange.size()); assertEquals(id1, pendingRange.get(0).getID()); @@ -624,32 +639,33 @@ public void xpendingWithParams() { assertTrue(pendingRange.get(0).toString().contains("xpendeing-consumer")); // Without consumer - pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", new XPendingParams().count(3)); + pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().count(3)); assertEquals(1, pendingRange.size()); assertEquals(id1, pendingRange.get(0).getID()); assertEquals(1, pendingRange.get(0).getDeliveredTimes()); assertEquals("xpendeing-consumer", pendingRange.get(0).getConsumerName()); // with idle - pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + pendingRange = jedis.xpending(stream, "xpendeing-group", new XPendingParams().idle(Duration.ofMinutes(1).toMillis()).count(3)); assertEquals(0, pendingRange.size()); } @Test public void xpendingRange() { + final String stream = "xpendeing-stream"; Map map = new HashMap<>(); map.put("foo", "bar"); - StreamEntryID m1 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); - StreamEntryID m2 = jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); - jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false); + StreamEntryID m1 = jedis.xadd(stream, (StreamEntryID) null, map); + StreamEntryID m2 = jedis.xadd(stream, (StreamEntryID) null, map); + jedis.xgroupCreate(stream, "xpendeing-group", null, false); // read 1 message from the group with each consumer - Map streamQeury = singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); + Map streamQeury = singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY); jedis.xreadGroup("xpendeing-group", "consumer1", XReadGroupParams.xReadGroupParams().count(1), streamQeury); jedis.xreadGroup("xpendeing-group", "consumer2", XReadGroupParams.xReadGroupParams().count(1), streamQeury); - List response = jedis.xpending("xpendeing-stream", "xpendeing-group", + List response = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams("(0", "+", 5)); assertEquals(2, response.size()); assertEquals(m1, response.get(0).getID()); @@ -657,7 +673,7 @@ public void xpendingRange() { assertEquals(m2, response.get(1).getID()); assertEquals("consumer2", response.get(1).getConsumerName()); - response = jedis.xpending("xpendeing-stream", "xpendeing-group", + response = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams(StreamEntryID.MINIMUM_ID, StreamEntryID.MAXIMUM_ID, 5)); assertEquals(2, response.size()); assertEquals(m1, response.get(0).getID()); @@ -668,18 +684,19 @@ public void xpendingRange() { @Test public void xclaimWithParams() { + final String stream = "xpendeing-stream"; Map map = new HashMap<>(); map.put("f1", "v1"); - jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + jedis.xadd(stream, (StreamEntryID) null, map); - assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); + assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false)); // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); + singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event - List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + List pendingRange = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer")); // Sleep for 100ms so we can claim events pending for more than 50ms @@ -689,7 +706,7 @@ public void xclaimWithParams() { e.printStackTrace(); } - List streamEntrys = jedis.xclaim("xpendeing-stream", "xpendeing-group", + List streamEntrys = jedis.xclaim(stream, "xpendeing-group", "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0), pendingRange.get(0).getID()); assertEquals(1, streamEntrys.size()); @@ -699,18 +716,19 @@ public void xclaimWithParams() { @Test public void xclaimJustId() { + final String stream = "xpendeing-stream"; Map map = new HashMap<>(); map.put("f1", "v1"); - jedis.xadd("xpendeing-stream", (StreamEntryID) null, map); + jedis.xadd(stream, (StreamEntryID) null, map); - assertEquals("OK", jedis.xgroupCreate("xpendeing-stream", "xpendeing-group", null, false)); + assertEquals("OK", jedis.xgroupCreate(stream, "xpendeing-group", null, false)); // Read the event from Stream put it on pending jedis.xreadGroup("xpendeing-group", "xpendeing-consumer", XReadGroupParams.xReadGroupParams().count(1).block(1), - singletonMap("xpendeing-stream", StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); + singletonMap(stream, StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY)); // Get the pending event - List pendingRange = jedis.xpending("xpendeing-stream", "xpendeing-group", + List pendingRange = jedis.xpending(stream, "xpendeing-group", XPendingParams.xPendingParams().count(3).consumer("xpendeing-consumer")); // Sleep for 100ms so we can claim events pending for more than 50ms try { @@ -719,7 +737,7 @@ public void xclaimJustId() { e.printStackTrace(); } - List streamEntryIDS = jedis.xclaimJustId("xpendeing-stream", "xpendeing-group", + List streamEntryIDS = jedis.xclaimJustId(stream, "xpendeing-group", "xpendeing-consumer2", 50, XClaimParams.xClaimParams().idle(0).retryCount(0), pendingRange.get(0).getID()); assertEquals(1, streamEntryIDS.size()); From 7d78ce15f143299c18b4033fccf72975e5cb8b82 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Fri, 29 Mar 2024 00:15:46 +0600 Subject: [PATCH 2/2] java stream --- .../java/redis/clients/jedis/BuilderFactory.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BuilderFactory.java b/src/main/java/redis/clients/jedis/BuilderFactory.java index e5695e3df0..26d5b5c384 100644 --- a/src/main/java/redis/clients/jedis/BuilderFactory.java +++ b/src/main/java/redis/clients/jedis/BuilderFactory.java @@ -1815,16 +1815,10 @@ public StreamPendingSummary build(Object data) { long total = LONG.build(objectList.get(0)); StreamEntryID minId = STREAM_ENTRY_ID.build(objectList.get(1)); StreamEntryID maxId = STREAM_ENTRY_ID.build(objectList.get(2)); - Map map; - if (objectList.get(3) == null) { - map = null; - } else { - List> consumerObjList = (List>) objectList.get(3); - map = new HashMap<>(consumerObjList.size()); - for (List consumerObj : consumerObjList) { - map.put(STRING.build(consumerObj.get(0)), Long.parseLong(STRING.build(consumerObj.get(1)))); - } - } + Map map = objectList.get(3) == null ? null + : ((List>) objectList.get(3)).stream().collect( + Collectors.toMap(pair -> STRING.build(pair.get(0)), + pair -> Long.parseLong(STRING.build(pair.get(1))))); return new StreamPendingSummary(total, minId, maxId, map); }