Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stream): add KeyRangeGen for XRead and XReadGroup #2657

Merged
merged 9 commits into from
Nov 15, 2024
29 changes: 27 additions & 2 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@
#include "types/redis_stream.h"

namespace redis {
namespace {
// for XRead and XReadGroup stream range parse.
CommandKeyRange ParseStreamReadRange(const std::vector<std::string> &args, uint32_t start_offset) {
// assert here we must have a stream in args since it has been parsed.
auto stream_keyword_iter = std::find_if(std::next(args.cbegin(), start_offset), args.cend(),
[](const std::string &arg) { return util::EqualICase(arg, "streams"); });
int stream_pos = static_cast<int>(std::distance(args.cbegin(), stream_keyword_iter));
int stream_size = static_cast<int>(args.size() - stream_pos) / 2;

CommandKeyRange range;
range.first_key = stream_pos + 1;
range.key_step = 1;
range.last_key = range.first_key + stream_size - 1;
return range;
}
} // namespace

class CommandXAck : public Commander {
public:
Expand Down Expand Up @@ -1404,6 +1420,10 @@ class CommandXRead : public Commander,
bufferevent_enable(bev, EV_READ);
}

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
return ParseStreamReadRange(args, 0);
};

private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
Expand Down Expand Up @@ -1715,6 +1735,10 @@ class CommandXReadGroup : public Commander,
bufferevent_enable(bev, EV_READ);
}

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
return ParseStreamReadRange(args, 4);
};

private:
std::vector<std::string> streams_;
std::vector<StreamEntryID> ids_;
Expand Down Expand Up @@ -1896,8 +1920,9 @@ REDIS_REGISTER_COMMANDS(Stream, MakeCmdAttr<CommandXAck>("xack", -4, "write no-d
MakeCmdAttr<CommandXPending>("xpending", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only blocking", NO_KEY),
MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write blocking", NO_KEY),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only blocking", CommandXRead::keyRangeGen),
MakeCmdAttr<CommandXReadGroup>("xreadgroup", -7, "write blocking",
CommandXReadGroup::keyRangeGen),
MakeCmdAttr<CommandXTrim>("xtrim", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXSetId>("xsetid", -3, "write", 1, 1, 1))

Expand Down
Loading