Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): add KeyRangeGen for XRead and XReadGroup #2657

Merged
merged 9 commits into from
Nov 15, 2024
25 changes: 23 additions & 2 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@
#include "types/redis_stream.h"

namespace redis {
namespace {
// for XRead and XReadGroup stream range parse.
CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args) {
// assert here we must have a stream in args since it has been parsed.
auto stream_keyword_iter =
std::find_if(args.cbegin(), args.cend(), [](const std::string &arg) { return util::EqualICase(arg, "streams"); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For XREADGROUP, I think we need to find from begin + 4, to avoid a group or consumer named streams.

refer to https://redis.io/docs/latest/commands/xreadgroup/ .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @PragmaTwice ,

Thanks very much for your review! 😊

I have updated the ParseStreamReadRange with a begin offset.

Best Regards,
Edward

int stream_pos = static_cast<int>(std::distance(args.cbegin(), stream_keyword_iter));
int stream_size = static_cast<int>(args.size() - stream_pos) / 2;

CommandKeyRange range;
range.first_key = stream_pos + 1;
range.key_step = 1;
range.last_key = range.first_key + stream_size - 1;
return range;
}
} // namespace

class CommandXAck : public Commander {
public:
Expand Down Expand Up @@ -1404,6 +1420,8 @@ class CommandXRead : public Commander,
bufferevent_enable(bev, EV_READ);
}

static const inline CommandKeyRangeGen keyRangeGen = ParseStreamReadRange;

private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
Expand Down Expand Up @@ -1715,6 +1733,8 @@ class CommandXReadGroup : public Commander,
bufferevent_enable(bev, EV_READ);
}

static const inline CommandKeyRangeGen keyRangeGen = ParseStreamReadRange;

private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
Expand Down Expand Up @@ -1896,8 +1916,9 @@ REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-d
MakeCmdAttr<CommandXPending>("xpending", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only blocking", NO_KEY),
MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write blocking", NO_KEY),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only blocking", CommandXRead::keyRangeGen),
MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write blocking",
CommandXReadGroup::keyRangeGen),
MakeCmdAttr<CommandXTrim>("xtrim", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1, 1, 1))

Expand Down