diff --git a/redis/commands.py b/redis/commands.py index 003b0f1bcc..8c892bece5 100644 --- a/redis/commands.py +++ b/redis/commands.py @@ -1807,6 +1807,18 @@ def xgroup_destroy(self, name, groupname): """ return self.execute_command('XGROUP DESTROY', name, groupname) + def xgroup_createconsumer(self, name, groupname, consumername): + """ + Consumers in a consumer group are auto-created every time a new + consumer name is mentioned by some command. + They can be explicitly created by using this command. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to create. + """ + return self.execute_command('XGROUP CREATECONSUMER', name, groupname, + consumername) + def xgroup_setid(self, name, groupname, id): """ Set the consumer group last delivered ID to something else. diff --git a/tests/test_commands.py b/tests/test_commands.py index 3c87dd9dd6..4a6d019f9e 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -2598,6 +2598,22 @@ def test_xgroup_delconsumer(self, r): # deleting the consumer should return 2 pending messages assert r.xgroup_delconsumer(stream, group, consumer) == 2 + @skip_if_server_version_lt('6.2.0') + def test_xgroup_createconsumer(self, r): + stream = 'stream' + group = 'group' + consumer = 'consumer' + r.xadd(stream, {'foo': 'bar'}) + r.xadd(stream, {'foo': 'bar'}) + r.xgroup_create(stream, group, 0) + assert r.xgroup_createconsumer(stream, group, consumer) == 1 + + # read all messages from the group + r.xreadgroup(group, consumer, streams={stream: '>'}) + + # deleting the consumer should return 2 pending messages + assert r.xgroup_delconsumer(stream, group, consumer) == 2 + @skip_if_server_version_lt('5.0.0') def test_xgroup_destroy(self, r): stream = 'stream'