From 9c6b4e80248310bb4114650f367fab7bca50e94f Mon Sep 17 00:00:00 2001 From: HolyLow Date: Wed, 20 Sep 2023 20:38:07 +0800 Subject: [PATCH 1/5] implement the blmpop with parent class --- src/commands/cmd_list.cc | 114 ++++++++++++++++++++++++++++++++++++++ src/commands/commander.cc | 75 +++++++++++++++++++++++++ src/commands/commander.h | 34 ++++++++++++ 3 files changed, 223 insertions(+) diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index ad5a50dbd41..2d6bbf8a370 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -334,6 +334,119 @@ class CommandBRPop : public CommandBPop { CommandBRPop() : CommandBPop(false) {} }; +// todo: implement the BLMPOP command here. +// the method is: +// 1. copy the code structure of BPOP here; +// 2. replace the logic with the LMPOP's logic; +// 3. handle the error returning, cancellation, etc. +// 4. add test... +// when implementing, try to separate the code to prepare for further abstraction. +class CommandBLMPop : public BlockedPopCommander { + public: + CommandBLMPop() = default; + CommandBLMPop(const CommandBLMPop &) = delete; + CommandBLMPop &operator=(const CommandBLMPop &) = delete; + + ~CommandBLMPop() override = default; + + // format: BLMPOP timeout #numkeys key0 [key1 ...] [COUNT count] + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + + auto timeout = GET_OR_RET(parser.TakeFloat()); + setTimeout(static_cast(timeout * 1000 * 1000)); + + auto num_keys = GET_OR_RET(parser.TakeInt()); + keys_.clear(); + keys_.reserve(num_keys); + for (uint32_t i = 0; i < num_keys; ++i) { + keys_.emplace_back(GET_OR_RET(parser.TakeStr())); + } + + auto left_or_right = util::ToLower(GET_OR_RET(parser.TakeStr())); + if (left_or_right == "left") { + left_ = true; + } else if (left_or_right == "right") { + left_ = false; + } else { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + while (parser.Good()) { + if (parser.EatEqICase("count") && count_ == static_cast(-1)) { + count_ = GET_OR_RET(parser.TakeInt()); + } else { + return parser.InvalidSyntax(); + } + } + if (count_ == static_cast(-1)) { + count_ = 1; + } + + return Status::OK(); + } + + static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { + CommandKeyRange range; + range.first_key = 3; + range.key_step = 1; + // This parsing would always succeed as this cmd has been parsed before. + auto num_key = *ParseInt(args[2], 10); + range.last_key = range.first_key + num_key - 1; + return range; + }; + + private: + rocksdb::Status executeUnblocked() override { + redis::List list_db(svr_->storage, conn_->GetNamespace()); + std::vector elems; + std::string chosen_key; + rocksdb::Status s; + for (const auto &key : keys_) { + s = list_db.PopMulti(key, left_, count_, &elems); + if (s.ok() && !elems.empty()) { + chosen_key = key; + break; + } + if (!s.IsNotFound()) { + break; + } + } + + if (s.ok()) { + if (!elems.empty()) { + conn_->GetServer()->UpdateWatchedKeysManually({chosen_key}); + std::string elems_bulk = redis::MultiBulkString(elems); + conn_->Reply(redis::Array({redis::BulkString(chosen_key), std::move(elems_bulk)})); + } + } else if (!s.IsNotFound()) { + conn_->Reply(redis::Error("ERR " + s.ToString())); + } + + return s; + } + + std::string emptyOutput() override { + return redis::NilString(); + } + + void blockAllKeys() override { + for (const auto &key : keys_) { + svr_->BlockOnKey(key, conn_); + } + } + + void unblockAllKeys() override { + for (const auto &key : keys_) { + svr_->UnblockOnKey(key, conn_); + } + } + + bool left_; + uint32_t count_ = -1; + std::vector keys_; +}; + class CommandLRem : public Commander { public: Status Parse(const std::vector &args) override { @@ -727,6 +840,7 @@ class CommandLPos : public Commander { REDIS_REGISTER_COMMANDS(MakeCmdAttr("blpop", -3, "write no-script", 1, -2, 1), MakeCmdAttr("brpop", -3, "write no-script", 1, -2, 1), + MakeCmdAttr("blmpop", -5, "write no-script", CommandBLMPop::keyRangeGen), MakeCmdAttr("lindex", 3, "read-only", 1, 1, 1), MakeCmdAttr("linsert", 5, "write", 1, 1, 1), MakeCmdAttr("llen", 2, "read-only", 1, 1, 1), diff --git a/src/commands/commander.cc b/src/commands/commander.cc index b767b017408..9bfca8e7d47 100644 --- a/src/commands/commander.cc +++ b/src/commands/commander.cc @@ -21,6 +21,7 @@ #include "commander.h" #include "cluster/cluster_defs.h" +#include "server/redis_connection.h" namespace redis { @@ -32,6 +33,80 @@ RegisterToCommandTable::RegisterToCommandTable(std::initializer_listGetBufferEvent(); + auto s = executeUnblocked(); + if (s.ok() || !s.IsNotFound()) { + return Status::OK(); // error has already output in executeUnblocked + } + + if (conn->IsInExec()) { + *output = emptyOutput(); + return Status::OK(); // No blocking in multi-exec + } + + blockAllKeys(); + + SetCB(bev); + + if (timeout_) { + timer_.reset(NewTimer(bufferevent_get_base(bev))); + int64_t timeout_second = timeout_ / 1000 / 1000; + int64_t timeout_microsecond = timeout_ % (1000 * 1000); + timeval tm = {timeout_second, static_cast(timeout_microsecond)}; + evtimer_add(timer_.get(), &tm); + } + + return {Status::BlockingCmd}; +} + +void BlockedPopCommander::OnWrite(bufferevent *bev) { + auto s = executeUnblocked(); + if (s.IsNotFound()) { + // The connection may be waked up but can't pop from list. For example, + // connection A is blocking on list and connection B push a new element + // then wake up the connection A, but this element may be token by other connection C. + // So we need to wait for the wake event again by disabling the WRITE event. + bufferevent_disable(bev, EV_WRITE); + return; + } + + if (timer_) { + timer_.reset(); + } + + unblockAllKeys(); + conn_->SetCB(bev); + bufferevent_enable(bev, EV_READ); + // We need to manually trigger the read event since we will stop processing commands + // in connection after the blocking command, so there may have some commands to be processed. + // Related issue: https://github.com/apache/kvrocks/issues/831 + bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); +} + + +void BlockedPopCommander::OnEvent(bufferevent *bev, int16_t events) { + if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { + if (timer_ != nullptr) { + timer_.reset(); + } + unblockAllKeys(); + } + conn_->OnEvent(bev, events); +} + +void BlockedPopCommander::TimerCB(int, int16_t events) { + conn_->Reply(redis::NilString()); + timer_.reset(); + unblockAllKeys(); + auto bev = conn_->GetBufferEvent(); + conn_->SetCB(bev); + bufferevent_enable(bev, EV_READ); +} + size_t GetCommandNum() { return command_details::redis_command_table.size(); } const CommandMap *GetOriginalCommands() { return &command_details::original_commands; } diff --git a/src/commands/commander.h b/src/commands/commander.h index f6caad240fc..761900090e1 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -38,6 +38,7 @@ #include #include "cluster/cluster_defs.h" +#include "event_util.h" #include "parse_util.h" #include "server/redis_reply.h" #include "status.h" @@ -83,6 +84,39 @@ class Commander { const CommandAttributes *attributes_ = nullptr; }; +class BlockedPopCommander : public Commander, + private EvbufCallbackBase, + private EventCallbackBase { + public: + Status Execute(Server *svr, Connection *conn, std::string *output) final; + + void OnWrite(bufferevent *bev); + + void OnEvent(bufferevent *bev, int16_t events); + + void TimerCB(int, int16_t events); + + protected: + virtual rocksdb::Status executeUnblocked() = 0; + + virtual void blockAllKeys() = 0; + + virtual void unblockAllKeys() = 0; + + virtual std::string emptyOutput() = 0; + + void setTimeout(int64_t timeout) { + timeout_ = timeout; + } + + Server *svr_ = nullptr; + Connection *conn_ = nullptr; + + private: + int64_t timeout_ = 0; // microseconds + UniqueEvent timer_; +}; + class CommanderWithParseMove : Commander { public: Status Parse() override { return ParseMove(std::move(args_)); } From 117b7a4325b6601e15afdcf8409465120121b884 Mon Sep 17 00:00:00 2001 From: HolyLow Date: Wed, 20 Sep 2023 20:39:37 +0800 Subject: [PATCH 2/5] clang format --- src/commands/cmd_list.cc | 4 +--- src/commands/commander.cc | 1 - src/commands/commander.h | 8 +++----- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index 2d6bbf8a370..7125bf2c8bc 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -426,9 +426,7 @@ class CommandBLMPop : public BlockedPopCommander { return s; } - std::string emptyOutput() override { - return redis::NilString(); - } + std::string emptyOutput() override { return redis::NilString(); } void blockAllKeys() override { for (const auto &key : keys_) { diff --git a/src/commands/commander.cc b/src/commands/commander.cc index 9bfca8e7d47..4945a795c86 100644 --- a/src/commands/commander.cc +++ b/src/commands/commander.cc @@ -87,7 +87,6 @@ void BlockedPopCommander::OnWrite(bufferevent *bev) { bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); } - void BlockedPopCommander::OnEvent(bufferevent *bev, int16_t events) { if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { if (timer_ != nullptr) { diff --git a/src/commands/commander.h b/src/commands/commander.h index 761900090e1..a03657e7cc6 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -85,8 +85,8 @@ class Commander { }; class BlockedPopCommander : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { + private EvbufCallbackBase, + private EventCallbackBase { public: Status Execute(Server *svr, Connection *conn, std::string *output) final; @@ -105,9 +105,7 @@ class BlockedPopCommander : public Commander, virtual std::string emptyOutput() = 0; - void setTimeout(int64_t timeout) { - timeout_ = timeout; - } + void setTimeout(int64_t timeout) { timeout_ = timeout; } Server *svr_ = nullptr; Connection *conn_ = nullptr; From 21a96cc01b4beff6d217e3b00780693100223ea3 Mon Sep 17 00:00:00 2001 From: HolyLow Date: Fri, 22 Sep 2023 14:40:43 +0800 Subject: [PATCH 3/5] adding uts, but one ut fails, because of racing? --- src/commands/commander.cc | 2 +- tests/gocase/unit/type/list/list_test.go | 272 +++++++++++++++++++++++ tests/gocase/util/tcp_client.go | 15 ++ 3 files changed, 288 insertions(+), 1 deletion(-) diff --git a/src/commands/commander.cc b/src/commands/commander.cc index 4945a795c86..49583f9567a 100644 --- a/src/commands/commander.cc +++ b/src/commands/commander.cc @@ -98,7 +98,7 @@ void BlockedPopCommander::OnEvent(bufferevent *bev, int16_t events) { } void BlockedPopCommander::TimerCB(int, int16_t events) { - conn_->Reply(redis::NilString()); + conn_->Reply(emptyOutput()); timer_.reset(); unblockAllKeys(); auto bev = conn_->GetBufferEvent(); diff --git a/tests/gocase/unit/type/list/list_test.go b/tests/gocase/unit/type/list/list_test.go index 0522e783afd..327d34e6c48 100644 --- a/tests/gocase/unit/type/list/list_test.go +++ b/tests/gocase/unit/type/list/list_test.go @@ -1177,4 +1177,276 @@ func TestList(t *testing.T) { require.EqualError(t, lmpopNoCount(rdb, ctx, direction, key1, key2).Err(), redis.Nil.Error()) }) } + + // test cases for BLMPOP: + // overall: consider both directions; consider poped from first / second list; + // case 1: has data already; todo: what if has data but not enough number of data? + // case 2: no data, but served within the timeout; todo: served with non-enough key and served with enough key? + // case 3: no data, and timeout ends; + // case 3: no data, timeout=0, should block infinitely. + for _, direction := range []string{"LEFT", "RIGHT"} { + key1 := "blmpop-list1" + key2 := "blmpop-list2" + rdb.Del(ctx, key1, key2) + require.EqualValues(t, 5, rdb.LPush(ctx, key1, "one", "two", "three", "four", "five").Val()) + require.EqualValues(t, 5, rdb.LPush(ctx, key2, "ONE", "TWO", "THREE", "FOUR", "FIVE").Val()) + + zeroTimeout := time.Second * 0 + + // TEST SUIT #1: non-blocking scenario (at least one queried list is not empty). + // In these cases, the behavior should be the same to LMPOP. + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countSingle %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 1, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"five"}, resultVal) + } else { + require.Equal(t, []string{"one"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countMulti %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 2, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"four", "three"}, resultVal) + } else { + require.Equal(t, []string{"two", "three"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countTooMuch %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"two", "one"}, resultVal) + } else { + require.Equal(t, []string{"four", "five"}, resultVal) + } + }) + + require.EqualValues(t, 2, rdb.LPush(ctx, key1, "six", "seven").Val()) + t.Run(fmt.Sprintf("BLMPOP test unblocked firstKey countOver %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"seven", "six"}, resultVal) + } else { + require.Equal(t, []string{"six", "seven"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countSingle %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 1, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"FIVE"}, resultVal) + } else { + require.Equal(t, []string{"ONE"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countMulti %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 2, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"FOUR", "THREE"}, resultVal) + } else { + require.Equal(t, []string{"TWO", "THREE"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countOver %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"TWO", "ONE"}, resultVal) + } else { + require.Equal(t, []string{"FOUR", "FIVE"}, resultVal) + } + }) + + blmpopNoCount := func(c *redis.Client, ctx context.Context, timeout string, direction string, keys ...string) *redis.KeyValuesCmd { + args := make([]interface{}, 3+len(keys), 6+len(keys)) + args[0] = "blmpop" + args[1] = timeout + args[2] = len(keys) + for i, key := range keys { + args[3+i] = key + } + args = append(args, strings.ToLower(direction)) + cmd := redis.NewKeyValuesCmd(ctx, args...) + _ = c.Process(ctx, cmd) + return cmd + } + rdb.Del(ctx, key1, key2) + require.EqualValues(t, 2, rdb.LPush(ctx, key1, "one", "two").Val()) + require.EqualValues(t, 2, rdb.LPush(ctx, key2, "ONE", "TWO").Val()) + + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"two"}, resultVal) + } else { + require.Equal(t, []string{"one"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked firstKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"one"}, resultVal) + } else { + require.Equal(t, []string{"two"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"TWO"}, resultVal) + } else { + require.Equal(t, []string{"ONE"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"ONE"}, resultVal) + } else { + require.Equal(t, []string{"TWO"}, resultVal) + } + }) + + // TEST SUIT #2: blocking scenario, but data reaches within timeout. + t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countSingle %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "1", key1, direction, "count", "1")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO"}) + } + require.EqualValues(t, 1, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countMulti %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "1", key1, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO", "ONE"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "1", key1, direction, "count", "10")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO", "ONE"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served firstKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO", "ONE"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served secondKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served bothKey FIFO countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) + require.EqualValues(t, 2, rdb.Exists(ctx, key1).Val()) + }) + + // TEST SUIT #3: blocking scenario, and timeout is triggered. + + // TEST SUIT #4: blocking scenario, and timeout is 0 (permanently blocked). + } } diff --git a/tests/gocase/util/tcp_client.go b/tests/gocase/util/tcp_client.go index 1a65bcb1a48..46ed3ac9af5 100644 --- a/tests/gocase/util/tcp_client.go +++ b/tests/gocase/util/tcp_client.go @@ -87,6 +87,21 @@ func (c *TCPClient) MustReadStrings(t testing.TB, s []string) { } } +func (c *TCPClient) MustReadStringsWithKey(t testing.TB, key string, s []string) { + r, err := c.ReadLine() + require.NoError(t, err) + require.EqualValues(t, '*', r[0]) + n, err := strconv.Atoi(r[1:]) + require.NoError(t, err) + require.Equal(t, n, 2) + + _, err = c.ReadLine() + require.NoError(t, err) + c.MustRead(t, key) + + c.MustReadStrings(t, s) +} + func (c *TCPClient) MustMatch(t testing.TB, rx string) { r, err := c.ReadLine() require.NoError(t, err) From 62895570f8aad608f8383cd68f113f57de3d6646 Mon Sep 17 00:00:00 2001 From: HolyLow Date: Fri, 22 Sep 2023 15:15:03 +0800 Subject: [PATCH 4/5] add full uts --- src/commands/cmd_list.cc | 7 --- tests/gocase/unit/type/list/list_test.go | 59 ++++++++++++++++++------ 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index 7125bf2c8bc..63fc782625a 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -334,13 +334,6 @@ class CommandBRPop : public CommandBPop { CommandBRPop() : CommandBPop(false) {} }; -// todo: implement the BLMPOP command here. -// the method is: -// 1. copy the code structure of BPOP here; -// 2. replace the logic with the LMPOP's logic; -// 3. handle the error returning, cancellation, etc. -// 4. add test... -// when implementing, try to separate the code to prepare for further abstraction. class CommandBLMPop : public BlockedPopCommander { public: CommandBLMPop() = default; diff --git a/tests/gocase/unit/type/list/list_test.go b/tests/gocase/unit/type/list/list_test.go index 327d34e6c48..e7f744e02d7 100644 --- a/tests/gocase/unit/type/list/list_test.go +++ b/tests/gocase/unit/type/list/list_test.go @@ -1178,12 +1178,6 @@ func TestList(t *testing.T) { }) } - // test cases for BLMPOP: - // overall: consider both directions; consider poped from first / second list; - // case 1: has data already; todo: what if has data but not enough number of data? - // case 2: no data, but served within the timeout; todo: served with non-enough key and served with enough key? - // case 3: no data, and timeout ends; - // case 3: no data, timeout=0, should block infinitely. for _, direction := range []string{"LEFT", "RIGHT"} { key1 := "blmpop-list1" key2 := "blmpop-list2" @@ -1350,7 +1344,7 @@ func TestList(t *testing.T) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - require.NoError(t, rd.WriteArgs("blmpop", "0", "1", key1, direction, "count", "1")) + require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "1")) time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) time.Sleep(time.Millisecond * 100) @@ -1366,7 +1360,7 @@ func TestList(t *testing.T) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - require.NoError(t, rd.WriteArgs("blmpop", "0", "1", key1, direction, "count", "2")) + require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "2")) time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) time.Sleep(time.Millisecond * 100) @@ -1382,7 +1376,7 @@ func TestList(t *testing.T) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - require.NoError(t, rd.WriteArgs("blmpop", "0", "1", key1, direction, "count", "10")) + require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "10")) time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) time.Sleep(time.Millisecond * 100) @@ -1398,7 +1392,7 @@ func TestList(t *testing.T) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) time.Sleep(time.Millisecond * 100) @@ -1414,7 +1408,7 @@ func TestList(t *testing.T) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) time.Sleep(time.Millisecond * 100) @@ -1430,7 +1424,7 @@ func TestList(t *testing.T) { rd := srv.NewTCPClient() defer func() { require.NoError(t, rd.Close()) }() require.NoError(t, rdb.Del(ctx, key1, key2).Err()) - require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) time.Sleep(time.Millisecond * 100) require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) time.Sleep(time.Millisecond * 100) @@ -1442,11 +1436,50 @@ func TestList(t *testing.T) { rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) } require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) - require.EqualValues(t, 2, rdb.Exists(ctx, key1).Val()) + require.EqualValues(t, 2, rdb.LLen(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served secondKey noCount %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction)) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two"}) + } + require.EqualValues(t, 1, rdb.LLen(ctx, key2).Val()) }) // TEST SUIT #3: blocking scenario, and timeout is triggered. + t.Run(fmt.Sprintf("BLMPOP test blocked timeout %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction)) + time.Sleep(time.Millisecond * 1200) + rd.MustMatch(t, "") + }) // TEST SUIT #4: blocking scenario, and timeout is 0 (permanently blocked). + t.Run(fmt.Sprintf("BLMPOP test blocked infinitely served secondKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 1200) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) + }) } } From 0b4df62608e9c67bf7a2fb94a6a498257f01d187 Mon Sep 17 00:00:00 2001 From: HolyLow Date: Fri, 22 Sep 2023 16:12:59 +0800 Subject: [PATCH 5/5] refactor to adapt to BlockingCommander --- src/commands/cmd_list.cc | 53 +++++++++++++++++++--------- src/commands/commander.cc | 74 --------------------------------------- src/commands/commander.h | 32 ----------------- 3 files changed, 36 insertions(+), 123 deletions(-) diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index 63fc782625a..a9612351734 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -334,7 +334,7 @@ class CommandBRPop : public CommandBPop { CommandBRPop() : CommandBPop(false) {} }; -class CommandBLMPop : public BlockedPopCommander { +class CommandBLMPop : public BlockingCommander { public: CommandBLMPop() = default; CommandBLMPop(const CommandBLMPop &) = delete; @@ -347,7 +347,7 @@ class CommandBLMPop : public BlockedPopCommander { CommandParser parser(args, 1); auto timeout = GET_OR_RET(parser.TakeFloat()); - setTimeout(static_cast(timeout * 1000 * 1000)); + timeout_ = static_cast(timeout * 1000 * 1000); auto num_keys = GET_OR_RET(parser.TakeInt()); keys_.clear(); @@ -379,18 +379,19 @@ class CommandBLMPop : public BlockedPopCommander { return Status::OK(); } - static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { - CommandKeyRange range; - range.first_key = 3; - range.key_step = 1; - // This parsing would always succeed as this cmd has been parsed before. - auto num_key = *ParseInt(args[2], 10); - range.last_key = range.first_key + num_key - 1; - return range; - }; + Status Execute(Server *svr, Connection *conn, std::string *output) override { + svr_ = svr; + InitConnection(conn); - private: - rocksdb::Status executeUnblocked() override { + auto s = ExecuteUnblocked(); + if (s.ok() || !s.IsNotFound()) { + return Status::OK(); // error has already been output + } + + return StartBlocking(timeout_, output); + } + + rocksdb::Status ExecuteUnblocked() { redis::List list_db(svr_->storage, conn_->GetNamespace()); std::vector elems; std::string chosen_key; @@ -419,23 +420,41 @@ class CommandBLMPop : public BlockedPopCommander { return s; } - std::string emptyOutput() override { return redis::NilString(); } - - void blockAllKeys() override { + void BlockKeys() override { for (const auto &key : keys_) { svr_->BlockOnKey(key, conn_); } } - void unblockAllKeys() override { + void UnblockKeys() override { for (const auto &key : keys_) { svr_->UnblockOnKey(key, conn_); } } + bool OnBlockingWrite() override { + auto s = ExecuteUnblocked(); + return !s.IsNotFound(); + } + + std::string NoopReply() override { return redis::NilString(); } + + static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { + CommandKeyRange range; + range.first_key = 3; + range.key_step = 1; + // This parsing would always succeed as this cmd has been parsed before. + auto num_key = *ParseInt(args[2], 10); + range.last_key = range.first_key + num_key - 1; + return range; + }; + + private: bool left_; uint32_t count_ = -1; + int64_t timeout_ = 0; // microseconds std::vector keys_; + Server *svr_ = nullptr; }; class CommandLRem : public Commander { diff --git a/src/commands/commander.cc b/src/commands/commander.cc index 49583f9567a..b767b017408 100644 --- a/src/commands/commander.cc +++ b/src/commands/commander.cc @@ -21,7 +21,6 @@ #include "commander.h" #include "cluster/cluster_defs.h" -#include "server/redis_connection.h" namespace redis { @@ -33,79 +32,6 @@ RegisterToCommandTable::RegisterToCommandTable(std::initializer_listGetBufferEvent(); - auto s = executeUnblocked(); - if (s.ok() || !s.IsNotFound()) { - return Status::OK(); // error has already output in executeUnblocked - } - - if (conn->IsInExec()) { - *output = emptyOutput(); - return Status::OK(); // No blocking in multi-exec - } - - blockAllKeys(); - - SetCB(bev); - - if (timeout_) { - timer_.reset(NewTimer(bufferevent_get_base(bev))); - int64_t timeout_second = timeout_ / 1000 / 1000; - int64_t timeout_microsecond = timeout_ % (1000 * 1000); - timeval tm = {timeout_second, static_cast(timeout_microsecond)}; - evtimer_add(timer_.get(), &tm); - } - - return {Status::BlockingCmd}; -} - -void BlockedPopCommander::OnWrite(bufferevent *bev) { - auto s = executeUnblocked(); - if (s.IsNotFound()) { - // The connection may be waked up but can't pop from list. For example, - // connection A is blocking on list and connection B push a new element - // then wake up the connection A, but this element may be token by other connection C. - // So we need to wait for the wake event again by disabling the WRITE event. - bufferevent_disable(bev, EV_WRITE); - return; - } - - if (timer_) { - timer_.reset(); - } - - unblockAllKeys(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - // We need to manually trigger the read event since we will stop processing commands - // in connection after the blocking command, so there may have some commands to be processed. - // Related issue: https://github.com/apache/kvrocks/issues/831 - bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); -} - -void BlockedPopCommander::OnEvent(bufferevent *bev, int16_t events) { - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - if (timer_ != nullptr) { - timer_.reset(); - } - unblockAllKeys(); - } - conn_->OnEvent(bev, events); -} - -void BlockedPopCommander::TimerCB(int, int16_t events) { - conn_->Reply(emptyOutput()); - timer_.reset(); - unblockAllKeys(); - auto bev = conn_->GetBufferEvent(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); -} - size_t GetCommandNum() { return command_details::redis_command_table.size(); } const CommandMap *GetOriginalCommands() { return &command_details::original_commands; } diff --git a/src/commands/commander.h b/src/commands/commander.h index a03657e7cc6..f6caad240fc 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -38,7 +38,6 @@ #include #include "cluster/cluster_defs.h" -#include "event_util.h" #include "parse_util.h" #include "server/redis_reply.h" #include "status.h" @@ -84,37 +83,6 @@ class Commander { const CommandAttributes *attributes_ = nullptr; }; -class BlockedPopCommander : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { - public: - Status Execute(Server *svr, Connection *conn, std::string *output) final; - - void OnWrite(bufferevent *bev); - - void OnEvent(bufferevent *bev, int16_t events); - - void TimerCB(int, int16_t events); - - protected: - virtual rocksdb::Status executeUnblocked() = 0; - - virtual void blockAllKeys() = 0; - - virtual void unblockAllKeys() = 0; - - virtual std::string emptyOutput() = 0; - - void setTimeout(int64_t timeout) { timeout_ = timeout; } - - Server *svr_ = nullptr; - Connection *conn_ = nullptr; - - private: - int64_t timeout_ = 0; // microseconds - UniqueEvent timer_; -}; - class CommanderWithParseMove : Commander { public: Status Parse() override { return ParseMove(std::move(args_)); }