Skip to content

Commit

Permalink
LPOP/RPOP support pop multi elements (#594)
Browse files Browse the repository at this point in the history
Before this commit, LPOP/RPOP of kvrocks only support pop one element once,
now, like redis, LPOP/RPOP support 'count' parameter to pop multi elements.
  • Loading branch information
torwig authored May 25, 2022
1 parent 98e1e79 commit 834a164
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 43 deletions.
59 changes: 48 additions & 11 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const char *errInvalidExpireTime = "invalid expire time";
const char *errWrongNumOfArguments = "wrong number of arguments";
const char *errValueNotInterger = "value is not an integer or out of range";
const char *errAdministorPermissionRequired = "administor permission required to perform the command";
const char *errValueMustBePositive = "value is out of range, must be positive";

class CommandAuth : public Commander {
public:
Expand Down Expand Up @@ -1448,23 +1449,59 @@ class CommandRPushX : public CommandPush {
class CommandPop : public Commander {
public:
explicit CommandPop(bool left) { left_ = left; }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() > 3) {
return Status(Status::RedisParseErr, errWrongNumOfArguments);
}
if (args.size() == 2) {
return Status::OK();
}
try {
int32_t v = std::stol(args[2]);
if (v < 0) {
return Status(Status::RedisParseErr, errValueMustBePositive);
}
count_ = v;
with_count_ = true;
} catch (const std::exception& ) {
return Status(Status::RedisParseErr, errValueNotInterger);
}
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::List list_db(svr->storage_, conn->GetNamespace());
std::string elem;
rocksdb::Status s = list_db.Pop(args_[1], &elem, left_);
if (!s.ok() && !s.IsNotFound()) {
return Status(Status::RedisExecErr, s.ToString());
}
if (s.IsNotFound()) {
*output = Redis::NilString();
if (with_count_) {
std::vector<std::string> elems;
rocksdb::Status s = list_db.PopMulti(args_[1], left_, count_, &elems);
if (!s.ok() && !s.IsNotFound()) {
return Status(Status::RedisExecErr, s.ToString());
}
if (s.IsNotFound()) {
*output = Redis::MultiLen(-1);
} else {
*output = Redis::MultiBulkString(elems);
}
} else {
*output = Redis::BulkString(elem);
std::string elem;
rocksdb::Status s = list_db.Pop(args_[1], left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
return Status(Status::RedisExecErr, s.ToString());
}
if (s.IsNotFound()) {
*output = Redis::NilString();
} else {
*output = Redis::BulkString(elem);
}
}

return Status::OK();
}

private:
bool left_;
bool with_count_ = false;
uint32_t count_ = 1;
};

class CommandLPop : public CommandPop {
Expand Down Expand Up @@ -1533,7 +1570,7 @@ class CommandBPop : public Commander {
rocksdb::Status s;
for (const auto &key : keys_) {
last_key = key;
s = list_db.Pop(key, &elem, left_);
s = list_db.Pop(key, left_, &elem);
if (s.ok() || !s.IsNotFound()) {
break;
}
Expand Down Expand Up @@ -4766,8 +4803,8 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("rpush", -3, "write", 1, 1, 1, CommandRPush),
ADD_CMD("lpushx", -3, "write", 1, 1, 1, CommandLPushX),
ADD_CMD("rpushx", -3, "write", 1, 1, 1, CommandRPushX),
ADD_CMD("lpop", 2, "write", 1, 1, 1, CommandLPop),
ADD_CMD("rpop", 2, "write", 1, 1, 1, CommandRPop),
ADD_CMD("lpop", -2, "write", 1, 1, 1, CommandLPop),
ADD_CMD("rpop", -2, "write", 1, 1, 1, CommandRPop),
ADD_CMD("blpop", -3, "write no-script", 1, -2, 1, CommandBLPop),
ADD_CMD("brpop", -3, "write no-script", 1, -2, 1, CommandBRPop),
ADD_CMD("lrem", 4, "write", 1, 1, 1, CommandLRem),
Expand Down
53 changes: 37 additions & 16 deletions src/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,21 @@ rocksdb::Status List::push(const Slice &user_key,
return storage_->Write(rocksdb::WriteOptions(), &batch);
}

rocksdb::Status List::Pop(const Slice &user_key, std::string *elem, bool left) {
rocksdb::Status List::Pop(const Slice &user_key, bool left, std::string *elem) {
elem->clear();

std::vector<std::string> elems;
auto s = PopMulti(user_key, left, 1, &elems);
if (!s.ok()) return s;

*elem = elems[0];
return rocksdb::Status::OK();
}

rocksdb::Status List::PopMulti(const rocksdb::Slice &user_key, bool left, uint32_t count,
std::vector<std::string> *elems) {
elems->clear();

std::string ns_key;
AppendNamespacePrefix(user_key, &ns_key);

Expand All @@ -98,30 +110,39 @@ rocksdb::Status List::Pop(const Slice &user_key, std::string *elem, bool left) {
rocksdb::Status s = GetMetadata(ns_key, &metadata);
if (!s.ok()) return s;

uint64_t index = left ? metadata.head : metadata.tail - 1;
std::string buf;
PutFixed64(&buf, index);
std::string sub_key;
InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode(&sub_key);
s = db_->Get(rocksdb::ReadOptions(), sub_key, elem);
if (!s.ok()) {
// FIXME: should be always exists??
return s;
}
rocksdb::WriteBatch batch;
RedisCommand cmd = left ? kRedisCmdLPop : kRedisCmdRPop;
WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)});
batch.PutLogData(log_data.Encode());
batch.Delete(sub_key);
if (metadata.size == 1) {

while (metadata.size > 0 && count > 0) {
uint64_t index = left ? metadata.head : metadata.tail - 1;
std::string buf;
PutFixed64(&buf, index);
std::string sub_key;
InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode(&sub_key);
std::string elem;
s = db_->Get(rocksdb::ReadOptions(), sub_key, &elem);
if (!s.ok()) {
// FIXME: should be always exists??
return s;
}

elems->push_back(elem);
batch.Delete(sub_key);
metadata.size -= 1;
left ? ++metadata.head : --metadata.tail;
--count;
}

if (metadata.size == 0) {
batch.Delete(metadata_cf_handle_, ns_key);
} else {
std::string bytes;
metadata.size -= 1;
left ? ++metadata.head : --metadata.tail;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);
}

return storage_->Write(rocksdb::WriteOptions(), &batch);
}

Expand Down Expand Up @@ -438,7 +459,7 @@ rocksdb::Status List::RPopLPush(const Slice &src, const Slice &dst, std::string
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}

s = Pop(src, elem, false);
s = Pop(src, false, elem);
if (!s.ok()) return s;

int ret;
Expand Down
3 changes: 2 additions & 1 deletion src/redis_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class List : public Database {
rocksdb::Status Trim(const Slice &user_key, int start, int stop);
rocksdb::Status Set(const Slice &user_key, int index, Slice elem);
rocksdb::Status Insert(const Slice &user_key, const Slice &pivot, const Slice &elem, bool before, int *ret);
rocksdb::Status Pop(const Slice &user_key, std::string *elem, bool left);
rocksdb::Status Pop(const Slice &user_key, bool left, std::string *elem);
rocksdb::Status PopMulti(const Slice &user_key, bool left, uint32_t count, std::vector<std::string> *elems);
rocksdb::Status Rem(const Slice &user_key, int count, const Slice &elem, int *ret);
rocksdb::Status Index(const Slice &user_key, int index, std::string *elem);
rocksdb::Status RPopLPush(const Slice &src, const Slice &dst, std::string *elem);
Expand Down
Loading

0 comments on commit 834a164

Please sign in to comment.