From 1469746ddc9105faa4054ea95d9af61c9dce5fd9 Mon Sep 17 00:00:00 2001 From: hulk Date: Fri, 9 Sep 2022 10:14:08 +0800 Subject: [PATCH] Fix successor commands won't be processed before receiving the next read event (#839) Currently, we will stop processing commands when running into block commands like BRPOP/BLPOP and there remained commands in the connection, but the connection continues handling those commands since the read event was triggered. --- src/redis_cmd.cc | 15 ++++++++++----- tests/gocase/unit/protocol/regression_test.go | 11 +++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index 52c381d0268..16b00e93856 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -1635,12 +1635,13 @@ class CommandBPop : public Commander { static void WriteCB(bufferevent *bev, void *ctx) { auto self = reinterpret_cast(ctx); auto s = self->TryPopFromList(); - // if pop fail ,currently we compromised to close bpop request if (s.IsNotFound()) { - self->conn_->Reply(Redis::NilString()); - LOG(ERROR) << "[BPOP] Failed to execute redis command: " << self->conn_->current_cmd_->GetAttributes()->name - << ", err: another concurrent pop request must have stole the data before this bpop request" - << " or bpop is in a pipeline cmd list(cmd before bpop replyed trigger this writecb)"; + // The connection may be waked up but can't pop from list. For example, + // connection A is blocking on list and connection B push a new element + // then wake up the connection A, but this element may be token by other connection C. + // So we need to wait for the wake event again by disabling the WRITE event. + bufferevent_disable(bev, EV_WRITE); + return; } if (self->timer_ != nullptr) { event_free(self->timer_); @@ -1650,6 +1651,10 @@ class CommandBPop : public Commander { bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, Redis::Connection::OnEvent, self->conn_); bufferevent_enable(bev, EV_READ); + // We need to manually trigger the read event since we will stop processing commands + // in connection after the blocking command, so there may have some commands to be processed. + // Related issue: https://github.com/apache/incubator-kvrocks/issues/831 + bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); } static void EventCB(bufferevent *bev, int16_t events, void *ctx) { diff --git a/tests/gocase/unit/protocol/regression_test.go b/tests/gocase/unit/protocol/regression_test.go index e5bb7794a13..4a6cfc5e46b 100644 --- a/tests/gocase/unit/protocol/regression_test.go +++ b/tests/gocase/unit/protocol/regression_test.go @@ -55,10 +55,9 @@ func TestRegression(t *testing.T) { v = rdb.RPush(ctx, "handle", "a") require.EqualValues(t, 1, v.Val()) - // TODO should read the second pushed element - //for _, res := range resList { - // r, err := c.ReadLine() - // require.NoError(t, err) - // require.Equal(t, res, r) - //} + for _, res := range resList { + r, err := c.ReadLine() + require.NoError(t, err) + require.Equal(t, res, r) + } }