diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index ad5a50dbd41..a9612351734 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -334,6 +334,129 @@ class CommandBRPop : public CommandBPop { CommandBRPop() : CommandBPop(false) {} }; +class CommandBLMPop : public BlockingCommander { + public: + CommandBLMPop() = default; + CommandBLMPop(const CommandBLMPop &) = delete; + CommandBLMPop &operator=(const CommandBLMPop &) = delete; + + ~CommandBLMPop() override = default; + + // format: BLMPOP timeout #numkeys key0 [key1 ...] [COUNT count] + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + + auto timeout = GET_OR_RET(parser.TakeFloat()); + timeout_ = static_cast(timeout * 1000 * 1000); + + auto num_keys = GET_OR_RET(parser.TakeInt()); + keys_.clear(); + keys_.reserve(num_keys); + for (uint32_t i = 0; i < num_keys; ++i) { + keys_.emplace_back(GET_OR_RET(parser.TakeStr())); + } + + auto left_or_right = util::ToLower(GET_OR_RET(parser.TakeStr())); + if (left_or_right == "left") { + left_ = true; + } else if (left_or_right == "right") { + left_ = false; + } else { + return {Status::RedisParseErr, errInvalidSyntax}; + } + + while (parser.Good()) { + if (parser.EatEqICase("count") && count_ == static_cast(-1)) { + count_ = GET_OR_RET(parser.TakeInt()); + } else { + return parser.InvalidSyntax(); + } + } + if (count_ == static_cast(-1)) { + count_ = 1; + } + + return Status::OK(); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + svr_ = svr; + InitConnection(conn); + + auto s = ExecuteUnblocked(); + if (s.ok() || !s.IsNotFound()) { + return Status::OK(); // error has already been output + } + + return StartBlocking(timeout_, output); + } + + rocksdb::Status ExecuteUnblocked() { + redis::List list_db(svr_->storage, conn_->GetNamespace()); + std::vector elems; + std::string chosen_key; + rocksdb::Status s; + for (const auto &key : keys_) { + s = list_db.PopMulti(key, left_, count_, &elems); + if (s.ok() && !elems.empty()) { + chosen_key = key; + break; + } + if (!s.IsNotFound()) { + break; + } + } + + if (s.ok()) { + if (!elems.empty()) { + conn_->GetServer()->UpdateWatchedKeysManually({chosen_key}); + std::string elems_bulk = redis::MultiBulkString(elems); + conn_->Reply(redis::Array({redis::BulkString(chosen_key), std::move(elems_bulk)})); + } + } else if (!s.IsNotFound()) { + conn_->Reply(redis::Error("ERR " + s.ToString())); + } + + return s; + } + + void BlockKeys() override { + for (const auto &key : keys_) { + svr_->BlockOnKey(key, conn_); + } + } + + void UnblockKeys() override { + for (const auto &key : keys_) { + svr_->UnblockOnKey(key, conn_); + } + } + + bool OnBlockingWrite() override { + auto s = ExecuteUnblocked(); + return !s.IsNotFound(); + } + + std::string NoopReply() override { return redis::NilString(); } + + static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector &args) { + CommandKeyRange range; + range.first_key = 3; + range.key_step = 1; + // This parsing would always succeed as this cmd has been parsed before. + auto num_key = *ParseInt(args[2], 10); + range.last_key = range.first_key + num_key - 1; + return range; + }; + + private: + bool left_; + uint32_t count_ = -1; + int64_t timeout_ = 0; // microseconds + std::vector keys_; + Server *svr_ = nullptr; +}; + class CommandLRem : public Commander { public: Status Parse(const std::vector &args) override { @@ -727,6 +850,7 @@ class CommandLPos : public Commander { REDIS_REGISTER_COMMANDS(MakeCmdAttr("blpop", -3, "write no-script", 1, -2, 1), MakeCmdAttr("brpop", -3, "write no-script", 1, -2, 1), + MakeCmdAttr("blmpop", -5, "write no-script", CommandBLMPop::keyRangeGen), MakeCmdAttr("lindex", 3, "read-only", 1, 1, 1), MakeCmdAttr("linsert", 5, "write", 1, 1, 1), MakeCmdAttr("llen", 2, "read-only", 1, 1, 1), diff --git a/tests/gocase/unit/type/list/list_test.go b/tests/gocase/unit/type/list/list_test.go index 0522e783afd..e7f744e02d7 100644 --- a/tests/gocase/unit/type/list/list_test.go +++ b/tests/gocase/unit/type/list/list_test.go @@ -1177,4 +1177,309 @@ func TestList(t *testing.T) { require.EqualError(t, lmpopNoCount(rdb, ctx, direction, key1, key2).Err(), redis.Nil.Error()) }) } + + for _, direction := range []string{"LEFT", "RIGHT"} { + key1 := "blmpop-list1" + key2 := "blmpop-list2" + rdb.Del(ctx, key1, key2) + require.EqualValues(t, 5, rdb.LPush(ctx, key1, "one", "two", "three", "four", "five").Val()) + require.EqualValues(t, 5, rdb.LPush(ctx, key2, "ONE", "TWO", "THREE", "FOUR", "FIVE").Val()) + + zeroTimeout := time.Second * 0 + + // TEST SUIT #1: non-blocking scenario (at least one queried list is not empty). + // In these cases, the behavior should be the same to LMPOP. + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countSingle %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 1, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"five"}, resultVal) + } else { + require.Equal(t, []string{"one"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countMulti %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 2, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"four", "three"}, resultVal) + } else { + require.Equal(t, []string{"two", "three"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey countTooMuch %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"two", "one"}, resultVal) + } else { + require.Equal(t, []string{"four", "five"}, resultVal) + } + }) + + require.EqualValues(t, 2, rdb.LPush(ctx, key1, "six", "seven").Val()) + t.Run(fmt.Sprintf("BLMPOP test unblocked firstKey countOver %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"seven", "six"}, resultVal) + } else { + require.Equal(t, []string{"six", "seven"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countSingle %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 1, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"FIVE"}, resultVal) + } else { + require.Equal(t, []string{"ONE"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countMulti %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 2, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"FOUR", "THREE"}, resultVal) + } else { + require.Equal(t, []string{"TWO", "THREE"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey countOver %s", direction), func(t *testing.T) { + result := rdb.BLMPop(ctx, zeroTimeout, direction, 10, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"TWO", "ONE"}, resultVal) + } else { + require.Equal(t, []string{"FOUR", "FIVE"}, resultVal) + } + }) + + blmpopNoCount := func(c *redis.Client, ctx context.Context, timeout string, direction string, keys ...string) *redis.KeyValuesCmd { + args := make([]interface{}, 3+len(keys), 6+len(keys)) + args[0] = "blmpop" + args[1] = timeout + args[2] = len(keys) + for i, key := range keys { + args[3+i] = key + } + args = append(args, strings.ToLower(direction)) + cmd := redis.NewKeyValuesCmd(ctx, args...) + _ = c.Process(ctx, cmd) + return cmd + } + rdb.Del(ctx, key1, key2) + require.EqualValues(t, 2, rdb.LPush(ctx, key1, "one", "two").Val()) + require.EqualValues(t, 2, rdb.LPush(ctx, key2, "ONE", "TWO").Val()) + + t.Run(fmt.Sprintf("BLMPOP test unblocked oneKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"two"}, resultVal) + } else { + require.Equal(t, []string{"one"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked firstKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key1, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"one"}, resultVal) + } else { + require.Equal(t, []string{"two"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"TWO"}, resultVal) + } else { + require.Equal(t, []string{"ONE"}, resultVal) + } + }) + + t.Run(fmt.Sprintf("BLMPOP test unblocked secondKey noCount one %s", direction), func(t *testing.T) { + result := blmpopNoCount(rdb, ctx, "0", direction, key1, key2) + resultKey, resultVal := result.Val() + require.NoError(t, result.Err()) + require.EqualValues(t, key2, resultKey) + if direction == "LEFT" { + require.Equal(t, []string{"ONE"}, resultVal) + } else { + require.Equal(t, []string{"TWO"}, resultVal) + } + }) + + // TEST SUIT #2: blocking scenario, but data reaches within timeout. + t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countSingle %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "1")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO"}) + } + require.EqualValues(t, 1, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countMulti %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO", "ONE"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served oneKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "1", key1, direction, "count", "10")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO", "ONE"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served firstKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key1, []string{"ONE", "TWO"}) + } else { + rd.MustReadStringsWithKey(t, key1, []string{"TWO", "ONE"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served secondKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served bothKey FIFO countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key1, "ONE", "TWO").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) + require.EqualValues(t, 2, rdb.LLen(ctx, key1).Val()) + }) + + t.Run(fmt.Sprintf("BLMPOP test blocked served secondKey noCount %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction)) + time.Sleep(time.Millisecond * 100) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two"}) + } + require.EqualValues(t, 1, rdb.LLen(ctx, key2).Val()) + }) + + // TEST SUIT #3: blocking scenario, and timeout is triggered. + t.Run(fmt.Sprintf("BLMPOP test blocked timeout %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "1", "2", key1, key2, direction)) + time.Sleep(time.Millisecond * 1200) + rd.MustMatch(t, "") + }) + + // TEST SUIT #4: blocking scenario, and timeout is 0 (permanently blocked). + t.Run(fmt.Sprintf("BLMPOP test blocked infinitely served secondKey countOver %s", direction), func(t *testing.T) { + rd := srv.NewTCPClient() + defer func() { require.NoError(t, rd.Close()) }() + require.NoError(t, rdb.Del(ctx, key1, key2).Err()) + require.NoError(t, rd.WriteArgs("blmpop", "0", "2", key1, key2, direction, "count", "2")) + time.Sleep(time.Millisecond * 1200) + require.NoError(t, rdb.RPush(ctx, key2, "one", "two").Err()) + time.Sleep(time.Millisecond * 100) + if direction == "LEFT" { + rd.MustReadStringsWithKey(t, key2, []string{"one", "two"}) + } else { + rd.MustReadStringsWithKey(t, key2, []string{"two", "one"}) + } + require.EqualValues(t, 0, rdb.Exists(ctx, key2).Val()) + }) + } } diff --git a/tests/gocase/util/tcp_client.go b/tests/gocase/util/tcp_client.go index 1a65bcb1a48..46ed3ac9af5 100644 --- a/tests/gocase/util/tcp_client.go +++ b/tests/gocase/util/tcp_client.go @@ -87,6 +87,21 @@ func (c *TCPClient) MustReadStrings(t testing.TB, s []string) { } } +func (c *TCPClient) MustReadStringsWithKey(t testing.TB, key string, s []string) { + r, err := c.ReadLine() + require.NoError(t, err) + require.EqualValues(t, '*', r[0]) + n, err := strconv.Atoi(r[1:]) + require.NoError(t, err) + require.Equal(t, n, 2) + + _, err = c.ReadLine() + require.NoError(t, err) + c.MustRead(t, key) + + c.MustReadStrings(t, s) +} + func (c *TCPClient) MustMatch(t testing.TB, rx string) { r, err := c.ReadLine() require.NoError(t, err)