Skip to content

Commit

Permalink
Add the support of the BLMPOP command (#1774)
Browse files Browse the repository at this point in the history
  • Loading branch information
HolyLow authored Sep 22, 2023
1 parent 723829f commit 711e41b
Show file tree
Hide file tree
Showing 3 changed files with 444 additions and 0 deletions.
124 changes: 124 additions & 0 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,129 @@ class CommandBRPop : public CommandBPop {
CommandBRPop() : CommandBPop(false) {}
};

class CommandBLMPop : public BlockingCommander {
public:
CommandBLMPop() = default;
CommandBLMPop(const CommandBLMPop &) = delete;
CommandBLMPop &operator=(const CommandBLMPop &) = delete;

~CommandBLMPop() override = default;

// format: BLMPOP timeout #numkeys key0 [key1 ...] <LEFT | RIGHT> [COUNT count]
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);

auto timeout = GET_OR_RET(parser.TakeFloat());
timeout_ = static_cast<int64_t>(timeout * 1000 * 1000);

auto num_keys = GET_OR_RET(parser.TakeInt<uint32_t>());
keys_.clear();
keys_.reserve(num_keys);
for (uint32_t i = 0; i < num_keys; ++i) {
keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
}

auto left_or_right = util::ToLower(GET_OR_RET(parser.TakeStr()));
if (left_or_right == "left") {
left_ = true;
} else if (left_or_right == "right") {
left_ = false;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}

while (parser.Good()) {
if (parser.EatEqICase("count") && count_ == static_cast<uint32_t>(-1)) {
count_ = GET_OR_RET(parser.TakeInt<uint32_t>());
} else {
return parser.InvalidSyntax();
}
}
if (count_ == static_cast<uint32_t>(-1)) {
count_ = 1;
}

return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
InitConnection(conn);

auto s = ExecuteUnblocked();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already been output
}

return StartBlocking(timeout_, output);
}

rocksdb::Status ExecuteUnblocked() {
redis::List list_db(svr_->storage, conn_->GetNamespace());
std::vector<std::string> elems;
std::string chosen_key;
rocksdb::Status s;
for (const auto &key : keys_) {
s = list_db.PopMulti(key, left_, count_, &elems);
if (s.ok() && !elems.empty()) {
chosen_key = key;
break;
}
if (!s.IsNotFound()) {
break;
}
}

if (s.ok()) {
if (!elems.empty()) {
conn_->GetServer()->UpdateWatchedKeysManually({chosen_key});
std::string elems_bulk = redis::MultiBulkString(elems);
conn_->Reply(redis::Array({redis::BulkString(chosen_key), std::move(elems_bulk)}));
}
} else if (!s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
}

return s;
}

void BlockKeys() override {
for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}
}

void UnblockKeys() override {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
}

bool OnBlockingWrite() override {
auto s = ExecuteUnblocked();
return !s.IsNotFound();
}

std::string NoopReply() override { return redis::NilString(); }

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
CommandKeyRange range;
range.first_key = 3;
range.key_step = 1;
// This parsing would always succeed as this cmd has been parsed before.
auto num_key = *ParseInt<int32_t>(args[2], 10);
range.last_key = range.first_key + num_key - 1;
return range;
};

private:
bool left_;
uint32_t count_ = -1;
int64_t timeout_ = 0; // microseconds
std::vector<std::string> keys_;
Server *svr_ = nullptr;
};

class CommandLRem : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -727,6 +850,7 @@ class CommandLPos : public Commander {

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBLPop>("blpop", -3, "write no-script", 1, -2, 1),
MakeCmdAttr<CommandBRPop>("brpop", -3, "write no-script", 1, -2, 1),
MakeCmdAttr<CommandBLMPop>("blmpop", -5, "write no-script", CommandBLMPop::keyRangeGen),
MakeCmdAttr<CommandLIndex>("lindex", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandLInsert>("linsert", 5, "write", 1, 1, 1),
MakeCmdAttr<CommandLLen>("llen", 2, "read-only", 1, 1, 1),
Expand Down
Loading

0 comments on commit 711e41b

Please sign in to comment.