diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index b757e651f10..607ecd51a68 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -65,6 +65,7 @@ const char *errInvalidExpireTime = "invalid expire time"; const char *errWrongNumOfArguments = "wrong number of arguments"; const char *errValueNotInterger = "value is not an integer or out of range"; const char *errAdministorPermissionRequired = "administor permission required to perform the command"; +const char *errValueMustBePositive = "value is out of range, must be positive"; class CommandAuth : public Commander { public: @@ -1448,23 +1449,59 @@ class CommandRPushX : public CommandPush { class CommandPop : public Commander { public: explicit CommandPop(bool left) { left_ = left; } + Status Parse(const std::vector &args) override { + if (args.size() > 3) { + return Status(Status::RedisParseErr, errWrongNumOfArguments); + } + if (args.size() == 2) { + return Status::OK(); + } + try { + int32_t v = std::stol(args[2]); + if (v < 0) { + return Status(Status::RedisParseErr, errValueMustBePositive); + } + count_ = v; + with_count_ = true; + } catch (const std::exception& ) { + return Status(Status::RedisParseErr, errValueNotInterger); + } + return Status::OK(); + } + Status Execute(Server *svr, Connection *conn, std::string *output) override { Redis::List list_db(svr->storage_, conn->GetNamespace()); - std::string elem; - rocksdb::Status s = list_db.Pop(args_[1], &elem, left_); - if (!s.ok() && !s.IsNotFound()) { - return Status(Status::RedisExecErr, s.ToString()); - } - if (s.IsNotFound()) { - *output = Redis::NilString(); + if (with_count_) { + std::vector elems; + rocksdb::Status s = list_db.PopMulti(args_[1], left_, count_, &elems); + if (!s.ok() && !s.IsNotFound()) { + return Status(Status::RedisExecErr, s.ToString()); + } + if (s.IsNotFound()) { + *output = Redis::MultiLen(-1); + } else { + *output = Redis::MultiBulkString(elems); + } } else { - *output = Redis::BulkString(elem); + std::string elem; + rocksdb::Status s = list_db.Pop(args_[1], left_, &elem); + if (!s.ok() && !s.IsNotFound()) { + return Status(Status::RedisExecErr, s.ToString()); + } + if (s.IsNotFound()) { + *output = Redis::NilString(); + } else { + *output = Redis::BulkString(elem); + } } + return Status::OK(); } private: bool left_; + bool with_count_ = false; + uint32_t count_ = 1; }; class CommandLPop : public CommandPop { @@ -1533,7 +1570,7 @@ class CommandBPop : public Commander { rocksdb::Status s; for (const auto &key : keys_) { last_key = key; - s = list_db.Pop(key, &elem, left_); + s = list_db.Pop(key, left_, &elem); if (s.ok() || !s.IsNotFound()) { break; } @@ -4766,8 +4803,8 @@ CommandAttributes redisCommandTable[] = { ADD_CMD("rpush", -3, "write", 1, 1, 1, CommandRPush), ADD_CMD("lpushx", -3, "write", 1, 1, 1, CommandLPushX), ADD_CMD("rpushx", -3, "write", 1, 1, 1, CommandRPushX), - ADD_CMD("lpop", 2, "write", 1, 1, 1, CommandLPop), - ADD_CMD("rpop", 2, "write", 1, 1, 1, CommandRPop), + ADD_CMD("lpop", -2, "write", 1, 1, 1, CommandLPop), + ADD_CMD("rpop", -2, "write", 1, 1, 1, CommandRPop), ADD_CMD("blpop", -3, "write no-script", 1, -2, 1, CommandBLPop), ADD_CMD("brpop", -3, "write no-script", 1, -2, 1, CommandBRPop), ADD_CMD("lrem", 4, "write", 1, 1, 1, CommandLRem), diff --git a/src/redis_list.cc b/src/redis_list.cc index c89d8e67b8e..9792ed0540a 100644 --- a/src/redis_list.cc +++ b/src/redis_list.cc @@ -87,9 +87,21 @@ rocksdb::Status List::push(const Slice &user_key, return storage_->Write(rocksdb::WriteOptions(), &batch); } -rocksdb::Status List::Pop(const Slice &user_key, std::string *elem, bool left) { +rocksdb::Status List::Pop(const Slice &user_key, bool left, std::string *elem) { elem->clear(); + std::vector elems; + auto s = PopMulti(user_key, left, 1, &elems); + if (!s.ok()) return s; + + *elem = elems[0]; + return rocksdb::Status::OK(); +} + +rocksdb::Status List::PopMulti(const rocksdb::Slice &user_key, bool left, uint32_t count, + std::vector *elems) { + elems->clear(); + std::string ns_key; AppendNamespacePrefix(user_key, &ns_key); @@ -98,30 +110,39 @@ rocksdb::Status List::Pop(const Slice &user_key, std::string *elem, bool left) { rocksdb::Status s = GetMetadata(ns_key, &metadata); if (!s.ok()) return s; - uint64_t index = left ? metadata.head : metadata.tail - 1; - std::string buf; - PutFixed64(&buf, index); - std::string sub_key; - InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode(&sub_key); - s = db_->Get(rocksdb::ReadOptions(), sub_key, elem); - if (!s.ok()) { - // FIXME: should be always exists?? - return s; - } rocksdb::WriteBatch batch; RedisCommand cmd = left ? kRedisCmdLPop : kRedisCmdRPop; WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)}); batch.PutLogData(log_data.Encode()); - batch.Delete(sub_key); - if (metadata.size == 1) { + + while (metadata.size > 0 && count > 0) { + uint64_t index = left ? metadata.head : metadata.tail - 1; + std::string buf; + PutFixed64(&buf, index); + std::string sub_key; + InternalKey(ns_key, buf, metadata.version, storage_->IsSlotIdEncoded()).Encode(&sub_key); + std::string elem; + s = db_->Get(rocksdb::ReadOptions(), sub_key, &elem); + if (!s.ok()) { + // FIXME: should be always exists?? + return s; + } + + elems->push_back(elem); + batch.Delete(sub_key); + metadata.size -= 1; + left ? ++metadata.head : --metadata.tail; + --count; + } + + if (metadata.size == 0) { batch.Delete(metadata_cf_handle_, ns_key); } else { std::string bytes; - metadata.size -= 1; - left ? ++metadata.head : --metadata.tail; metadata.Encode(&bytes); batch.Put(metadata_cf_handle_, ns_key, bytes); } + return storage_->Write(rocksdb::WriteOptions(), &batch); } @@ -438,7 +459,7 @@ rocksdb::Status List::RPopLPush(const Slice &src, const Slice &dst, std::string return rocksdb::Status::InvalidArgument(kErrMsgWrongType); } - s = Pop(src, elem, false); + s = Pop(src, false, elem); if (!s.ok()) return s; int ret; diff --git a/src/redis_list.h b/src/redis_list.h index f199c4a4ecf..a180212cfd1 100644 --- a/src/redis_list.h +++ b/src/redis_list.h @@ -36,7 +36,8 @@ class List : public Database { rocksdb::Status Trim(const Slice &user_key, int start, int stop); rocksdb::Status Set(const Slice &user_key, int index, Slice elem); rocksdb::Status Insert(const Slice &user_key, const Slice &pivot, const Slice &elem, bool before, int *ret); - rocksdb::Status Pop(const Slice &user_key, std::string *elem, bool left); + rocksdb::Status Pop(const Slice &user_key, bool left, std::string *elem); + rocksdb::Status PopMulti(const Slice &user_key, bool left, uint32_t count, std::vector *elems); rocksdb::Status Rem(const Slice &user_key, int count, const Slice &elem, int *ret); rocksdb::Status Index(const Slice &user_key, int index, std::string *elem); rocksdb::Status RPopLPush(const Slice &src, const Slice &dst, std::string *elem); diff --git a/tests/t_list_test.cc b/tests/t_list_test.cc index 4fda8bf88ea..7a5a2e903a5 100644 --- a/tests/t_list_test.cc +++ b/tests/t_list_test.cc @@ -89,14 +89,14 @@ TEST_F(RedisListTest, PushAndPop) { EXPECT_EQ(fields_.size(), ret); for (size_t i = 0; i < fields_.size(); i++) { std::string elem; - list->Pop(key_, &elem, false); + list->Pop(key_, false, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Push(key_, fields_, false, &ret); EXPECT_EQ(fields_.size(), ret); for (size_t i = 0; i < fields_.size(); i++) { std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Del(key_); @@ -124,7 +124,7 @@ TEST_F(RedisListTest, Index) { EXPECT_EQ(fields_[i].ToString(), elem); } for (size_t i = 0; i < fields_.size(); i++) { - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } rocksdb::Status s = list->Index(key_,-1, &elem); @@ -142,7 +142,7 @@ TEST_F(RedisListTest, Set) { list->Index(key_, -1, &elem); EXPECT_EQ(new_elem.ToString(), elem); for (size_t i = 0; i < fields_.size(); i++) { - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); } list->Del(key_); } @@ -159,7 +159,7 @@ TEST_F(RedisListTest, Range) { } for (size_t i = 0; i < fields_.size(); i++) { std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Del(key_); @@ -178,7 +178,7 @@ TEST_F(RedisListTest, Rem) { EXPECT_EQ(fields_.size()-1, len); for (size_t i = 1; i < fields_.size(); i++) { std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } // lrem key_ 0 list-test-key-1 @@ -191,7 +191,7 @@ TEST_F(RedisListTest, Rem) { for (size_t i = 0; i < fields_.size(); i++) { std::string elem; if (fields_[i] == del_elem) continue; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } // lrem key_ 1 nosuchelement @@ -204,7 +204,7 @@ TEST_F(RedisListTest, Rem) { EXPECT_EQ(fields_.size(), len); for (size_t i = 0; i < fields_.size(); i++) { std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } // lrem key_ -1 list-test-key-1 @@ -219,7 +219,7 @@ TEST_F(RedisListTest, Rem) { if (fields_[i] == del_elem) { if (++cnt > 3) continue; } - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } // lrem key_ -5 list-test-key-1 @@ -232,7 +232,7 @@ TEST_F(RedisListTest, Rem) { for (size_t i = 0; i < fields_.size(); i++) { std::string elem; if (fields_[i] == del_elem) continue; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Del(key_); @@ -255,7 +255,7 @@ TEST_F(RedisListSpecificTest, Rem) { if (++cnt <= 1) continue; } std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } // lrem key_ -2 9 @@ -270,7 +270,7 @@ TEST_F(RedisListSpecificTest, Rem) { if (++cnt <= 2) continue; } std::string elem; - list->Pop(key_, &elem, false); + list->Pop(key_, false, &elem); EXPECT_EQ(elem, fields_[i-1].ToString()); } list->Del(key_); @@ -286,7 +286,7 @@ TEST_F(RedisListTest, Trim) { EXPECT_EQ(fields_.size()-1, len); for (size_t i = 1; i < fields_.size(); i++) { std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Del(key_); @@ -310,7 +310,7 @@ TEST_F(RedisListSpecificTest, Trim) { for (size_t i = 3; i < fields_.size()-2; i++) { if (fields_[i] == del_elem) continue; std::string elem; - list->Pop(key_, &elem, true); + list->Pop(key_, true, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Del(key_); @@ -328,7 +328,7 @@ TEST_F(RedisListTest, RPopLPush) { } for (size_t i = 0; i < fields_.size(); i++) { std::string elem; - list->Pop(dst, &elem, false); + list->Pop(dst, false, &elem); EXPECT_EQ(elem, fields_[i].ToString()); } list->Del(key_); @@ -444,3 +444,113 @@ TEST_F(RedisListLMoveTest, LMoveSrcRightDstRight) { listElementsAreEqualTo(key_, 0, fields_.size(), {fields_[0], fields_[1], fields_[2]}); listElementsAreEqualTo(dst_key_, 0, dst_fields_.size()+1, {dst_fields_[0], dst_fields_[1], dst_fields_[2], dst_fields_[3], fields_[3]}); } + +TEST_F(RedisListTest, LPopEmptyList) { + std::string non_existing_key{"non-existing-key"}; + list->Del(non_existing_key); + std::string elem; + auto s = list->Pop(non_existing_key, true, &elem); + EXPECT_TRUE(s.IsNotFound()); + std::vector elems; + s = list->PopMulti(non_existing_key, true, 10, &elems); + EXPECT_TRUE(s.IsNotFound()); +} + +TEST_F(RedisListTest, LPopOneElement) { + int ret; + list->Push(key_, fields_, false, &ret); + EXPECT_EQ(fields_.size(), ret); + for (size_t i = 0; i < fields_.size(); i++) { + std::string elem; + list->Pop(key_, true, &elem); + EXPECT_EQ(elem, fields_[i].ToString()); + } + std::string elem; + auto s = list->Pop(key_, true, &elem); + EXPECT_TRUE(s.IsNotFound()); + list->Del(key_); +} + +TEST_F(RedisListTest, LPopMulti) { + int ret; + list->Push(key_, fields_, false, &ret); + EXPECT_EQ(fields_.size(), ret); + std::vector elems; + size_t requested_size = fields_.size() / 3; + auto s = list->PopMulti(key_, true, requested_size, &elems); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(elems.size(), requested_size); + for (size_t i = 0; i < elems.size(); ++i) { + EXPECT_EQ(elems[i], fields_[i].ToString()); + } + list->Del(key_); +} + +TEST_F(RedisListTest, LPopMultiCountGreaterThanListSize) { + int ret; + list->Push(key_, fields_, false, &ret); + EXPECT_EQ(fields_.size(), ret); + std::vector elems; + auto s = list->PopMulti(key_, true, 2*ret, &elems); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(elems.size(), ret); + for (size_t i = 0; i < elems.size(); ++i) { + EXPECT_EQ(elems[i], fields_[i].ToString()); + } + list->Del(key_); +} + +TEST_F(RedisListTest, RPopEmptyList) { + std::string non_existing_key{"non-existing-key"}; + list->Del(non_existing_key); + std::string elem; + auto s = list->Pop(non_existing_key, false, &elem); + EXPECT_TRUE(s.IsNotFound()); + std::vector elems; + s = list->PopMulti(non_existing_key, false, 10, &elems); + EXPECT_TRUE(s.IsNotFound()); +} + +TEST_F(RedisListTest, RPopOneElement) { + int ret; + list->Push(key_, fields_, false, &ret); + EXPECT_EQ(fields_.size(), ret); + for (size_t i = 0; i < fields_.size(); i++) { + std::string elem; + list->Pop(key_, false, &elem); + EXPECT_EQ(elem, fields_[fields_.size() - i - 1].ToString()); + } + std::string elem; + auto s = list->Pop(key_, false, &elem); + EXPECT_TRUE(s.IsNotFound()); + list->Del(key_); +} + +TEST_F(RedisListTest, RPopMulti) { + int ret; + list->Push(key_, fields_, false, &ret); + EXPECT_EQ(fields_.size(), ret); + std::vector elems; + size_t requested_size = fields_.size() / 3; + auto s = list->PopMulti(key_, false, requested_size, &elems); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(elems.size(), requested_size); + for (size_t i = 0; i < elems.size(); ++i) { + EXPECT_EQ(elems[i], fields_[fields_.size() - i - 1].ToString()); + } + list->Del(key_); +} + +TEST_F(RedisListTest, RPopMultiCountGreaterThanListSize) { + int ret; + list->Push(key_, fields_, false, &ret); + EXPECT_EQ(fields_.size(), ret); + std::vector elems; + auto s = list->PopMulti(key_, false, 2*ret, &elems); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(elems.size(), ret); + for (size_t i = 0; i < elems.size(); ++i) { + EXPECT_EQ(elems[i], fields_[fields_.size() - i - 1].ToString()); + } + list->Del(key_); +} diff --git a/tests/tcl/tests/unit/type/list.tcl b/tests/tcl/tests/unit/type/list.tcl index bb67cdad828..a8b78392cb6 100644 --- a/tests/tcl/tests/unit/type/list.tcl +++ b/tests/tcl/tests/unit/type/list.tcl @@ -569,6 +569,56 @@ start_server { assert_error *WRONGTYPE* {r rpop notalist} } + test "LPOP/RPOP with wrong number of arguments" { + assert_error {*wrong number of arguments*} {r lpop key 1 1} + assert_error {*wrong number of arguments*} {r rpop key 2 2} + } + + test {RPOP/LPOP with the optional count argument} { + assert_equal 7 [r lpush listcount aa bb cc dd ee ff gg] + assert_equal {gg} [r lpop listcount 1] + assert_equal {ff ee} [r lpop listcount 2] + assert_equal {aa bb} [r rpop listcount 2] + assert_equal {cc} [r rpop listcount 1] + assert_equal {dd} [r rpop listcount 123] + assert_error "*ERR*range*" {r lpop forbarqaz -123} + } + + test "LPOP/RPOP with the count 0 returns an empty array" { + # Make sure we can distinguish between an empty array and a null response + r readraw 1 + + r lpush listcount zero + assert_equal {*0} [r lpop listcount 0] + assert_equal {*0} [r rpop listcount 0] + + r readraw 0 + } + + test "LPOP/RPOP against non existing key" { + r readraw 1 + + r del non_existing_key + assert_equal [r lpop non_existing_key] {$-1} + assert_equal [r rpop non_existing_key] {$-1} + + r readraw 0 + } + + test "LPOP/RPOP with against non existing key" { + r readraw 1 + + r del non_existing_key + + assert_equal [r lpop non_existing_key 0] {*-1} + assert_equal [r lpop non_existing_key 1] {*-1} + + assert_equal [r rpop non_existing_key 0] {*-1} + assert_equal [r rpop non_existing_key 1] {*-1} + + r readraw 0 + } + foreach {type num} {quicklist 250 quicklist 500} { test "Mass RPOP/LPOP - $type" { r del mylist