Skip to content

Commit

Permalink
Fix successor commands won't be processed before receiving the next r…
Browse files Browse the repository at this point in the history
…ead event (#839)

Currently, we will stop processing commands when running into block commands
like BRPOP/BLPOP and there remained commands in the connection, but the connection
continues handling those commands since the read event was triggered.
  • Loading branch information
git-hulk authored Sep 9, 2022
1 parent a8d8ccb commit 1469746
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
15 changes: 10 additions & 5 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1635,12 +1635,13 @@ class CommandBPop : public Commander {
static void WriteCB(bufferevent *bev, void *ctx) {
auto self = reinterpret_cast<CommandBPop *>(ctx);
auto s = self->TryPopFromList();
// if pop fail ,currently we compromised to close bpop request
if (s.IsNotFound()) {
self->conn_->Reply(Redis::NilString());
LOG(ERROR) << "[BPOP] Failed to execute redis command: " << self->conn_->current_cmd_->GetAttributes()->name
<< ", err: another concurrent pop request must have stole the data before this bpop request"
<< " or bpop is in a pipeline cmd list(cmd before bpop replyed trigger this writecb)";
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}
if (self->timer_ != nullptr) {
event_free(self->timer_);
Expand All @@ -1650,6 +1651,10 @@ class CommandBPop : public Commander {
bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite,
Redis::Connection::OnEvent, self->conn_);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/incubator-kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
Expand Down
11 changes: 5 additions & 6 deletions tests/gocase/unit/protocol/regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ func TestRegression(t *testing.T) {
v = rdb.RPush(ctx, "handle", "a")
require.EqualValues(t, 1, v.Val())

// TODO should read the second pushed element
//for _, res := range resList {
// r, err := c.ReadLine()
// require.NoError(t, err)
// require.Equal(t, res, r)
//}
for _, res := range resList {
r, err := c.ReadLine()
require.NoError(t, err)
require.Equal(t, res, r)
}
}

0 comments on commit 1469746

Please sign in to comment.