Skip to content

Commit

Permalink
fix(server): Fix a bug when an expired transaction stays in watched q…
Browse files Browse the repository at this point in the history
…ueue.

Now we remove the transaction from the watched queues in a consistent manner based on the
keys it was assigned to watch.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed Jan 14, 2023
1 parent 50e14db commit 1bc3c1f
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 80 deletions.
2 changes: 0 additions & 2 deletions src/server/blocking_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) {
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}

awakened_transactions_.erase(trans);
}

// Called from commands like lpush.
Expand Down
6 changes: 4 additions & 2 deletions src/server/blocking_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class BlockingController {
size_t NumWatched(DbIndex db_indx) const;
std::vector<std::string> GetWatchedKeys(DbIndex db_indx) const;

void RemoveAwaked(Transaction* trans) {
awakened_transactions_.erase(trans);
}

private:
struct WatchQueue;
struct DbWatchTable;
Expand All @@ -70,7 +74,5 @@ class BlockingController {
// There can be multiple transactions like this because a transaction
// could awaken arbitrary number of keys.
absl::flat_hash_set<Transaction*> awakened_transactions_;

// absl::btree_multimap<TxId, Transaction*> waiting_convergence_;
};
} // namespace dfly
3 changes: 1 addition & 2 deletions src/server/blocking_controller_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ TEST_F(BlockingControllerTest, Timeout) {
time_point tp = steady_clock::now() + chrono::milliseconds(10);

trans_->Schedule();
auto keys = trans_->ShardArgsInShard(0);
auto cb = [&](Transaction* t, EngineShard* shard) { return t->WatchInShard(keys, shard); };
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->ShardArgsInShard(0); };

bool res = trans_->WaitOnWatch(tp, cb);

Expand Down
5 changes: 4 additions & 1 deletion src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,11 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {

CHECK_EQ(committed_txid_, trans->notify_txid());
bool keep = trans->RunInShard(this);
if (keep)
if (keep) {
return;
} else {
blocking_controller_->RemoveAwaked(trans);
}
}

if (continuation_trans_) {
Expand Down
1 change: 1 addition & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class EngineShard {

uint32_t periodic_task_ = 0;
uint32_t defrag_task_ = 0;

DefragTaskState defrag_state_;
std::unique_ptr<TieredStorage> tiered_storage_;
std::unique_ptr<BlockingController> blocking_controller_;
Expand Down
18 changes: 6 additions & 12 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,12 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
}

// Block
auto cb = [&](Transaction* t, EngineShard* shard) {
auto keys = t->ShardArgsInShard(shard->shard_id());
return t->WatchInShard(keys, shard);
auto wcb = [&](Transaction* t, EngineShard* shard) {
return t->ShardArgsInShard(shard->shard_id());
};

++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp, std::move(cb));
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;

if (!wait_succeeded)
Expand Down Expand Up @@ -884,10 +883,8 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
}

auto* stats = ServerState::tl_connection_stats();
auto wcb = [&](Transaction* t, EngineShard* shard) {
ArgSlice keys{&this->pop_key_, 1};
return t->WatchInShard(keys, shard);
};

auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };

// Block
++stats->num_blocked_clients;
Expand Down Expand Up @@ -919,10 +916,7 @@ OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) {
ArgSlice keys{&this->pop_key_, 1};
return t->WatchInShard(keys, shard);
};
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };

++stats->num_blocked_clients;

Expand Down
30 changes: 26 additions & 4 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
Expand All @@ -28,6 +29,17 @@ class ListFamilyTest : public BaseFamilyTest {
ListFamilyTest() {
num_threads_ = 4;
}

unsigned NumWatched() {
atomic_uint32_t sum{0};
shard_set->RunBriefInParallel([&](EngineShard* es) {
auto* bc = es->blocking_controller();
if (bc)
sum.fetch_add(bc->NumWatched(0), memory_order_relaxed);
});

return sum.load();
}
};

const char kKey1[] = "x";
Expand Down Expand Up @@ -114,6 +126,7 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
ASSERT_THAT(resp0, ArrLen(2));
EXPECT_THAT(resp0.GetVec(), ElementsAre("x", "1"));
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BLPopMultiple) {
Expand All @@ -137,7 +150,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3"));
ASSERT_FALSE(IsLocked(0, kKey1));
ASSERT_FALSE(IsLocked(0, kKey2));
// ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); });
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BLPopTimeout) {
Expand All @@ -155,6 +168,7 @@ TEST_F(ListFamilyTest, BLPopTimeout) {

EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY));
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BLPopTimeout2) {
Expand All @@ -171,6 +185,7 @@ TEST_F(ListFamilyTest, BLPopTimeout2) {
Run({"DEL", "blist2"});
Run({"RPUSH", "blist2", "d"});
Run({"BLPOP", "blist1", "blist2", "1"});
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BLPopMultiPush) {
Expand Down Expand Up @@ -210,6 +225,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A"));
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BLPopSerialize) {
Expand Down Expand Up @@ -638,10 +654,10 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {

auto push_fiber = [&]() {
auto id = "t-" + std::to_string(it_cnt.fetch_add(1));
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 300; i++) {
Run(id, {"rpush", "a", "DATA"});
}
fibers_ext::SleepFor(100ms);
fibers_ext::SleepFor(50ms);
running = false;
};

Expand All @@ -662,6 +678,7 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {

TEST_F(ListFamilyTest, BRPopLPushSingleShard) {
EXPECT_THAT(Run({"brpoplpush", "x", "y", "0.05"}), ArgType(RespExpr::NIL));
ASSERT_EQ(0, NumWatched());

EXPECT_THAT(Run({"lpush", "x", "val1"}), IntArg(1));
EXPECT_EQ(Run({"brpoplpush", "x", "y", "0.01"}), "val1");
Expand All @@ -682,6 +699,7 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShard) {
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "y"));
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
Expand All @@ -699,12 +717,15 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
ASSERT_EQ(resp, "1");
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "y"));
ASSERT_EQ(0, NumWatched());
}

TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
RespExpr resp;

EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL));

ASSERT_EQ(0, NumWatched());

Run({"lpush", "x", "val"});
EXPECT_EQ(Run({"brpoplpush", "x", "z", "0"}), "val");
resp = Run({"lrange", "z", "0", "-1"});
Expand Down Expand Up @@ -732,6 +753,7 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2"));
ASSERT_FALSE(IsLocked(0, "x"));
ASSERT_FALSE(IsLocked(0, "z"));
ASSERT_EQ(0, NumWatched());
// TODO: there is a bug here.
// we do not wake the dest shard, when source is awaked which prevents
// the atomicity and causes the first bug as well.
Expand Down
81 changes: 30 additions & 51 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ bool Transaction::RunInShard(EngineShard* shard) {
if (was_suspended || !become_suspended) {
shard->db_slice().Release(mode, largs);
sd.local_mask &= ~KEYLOCK_ACQUIRED;

if (was_suspended || (sd.local_mask & AWAKED_Q)) {
shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this);
}
}
sd.local_mask &= ~OUT_OF_ORDER;
}
Expand Down Expand Up @@ -813,13 +809,18 @@ void Transaction::RunQuickie(EngineShard* shard) {

// runs in coordinator thread.
// Marks the transaction as expired and removes it from the waiting queue.
void Transaction::ExpireBlocking() {
DVLOG(1) << "ExpireBlocking " << DebugId();
void Transaction::UnwatchBlocking(bool should_expire, WaitKeysPovider wcb) {
DVLOG(1) << "UnwatchBlocking " << DebugId();
DCHECK(!IsGlobal());

run_count_.store(unique_shard_cnt_, memory_order_release);

auto expire_cb = [this] { ExpireShardCb(EngineShard::tlocal()); };
auto expire_cb = [&] {
EngineShard* es = EngineShard::tlocal();
ArgSlice wkeys = wcb(this, es);

UnwatchShardCb(wkeys, should_expire, es);
};

if (unique_shard_cnt_ == 1) {
DCHECK_LT(unique_shard_id_, shard_set->size());
Expand All @@ -837,7 +838,7 @@ void Transaction::ExpireBlocking() {

// Wait for all callbacks to conclude.
WaitForShardCallbacks();
DVLOG(1) << "ExpireBlocking finished " << DebugId();
DVLOG(1) << "UnwatchBlocking finished " << DebugId();
}

const char* Transaction::Name() const {
Expand Down Expand Up @@ -1009,12 +1010,17 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
return reverse_index_[sd.arg_start + arg_index];
}

bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) {
bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysPovider wkeys_provider) {
// Assumes that transaction is pending and scheduled. TODO: To verify it with state machine.
VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")";
using namespace chrono;

Execute(cb, true);
auto cb = [&](Transaction* t, EngineShard* shard) {
auto keys = wkeys_provider(t, shard);
return t->WatchInShard(keys, shard);
};

Execute(move(cb), true);

coordinator_state_ |= COORD_BLOCKED;

Expand All @@ -1038,40 +1044,11 @@ bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) {
DVLOG(1) << "WaitOnWatch await_until " << int(status);
}

if ((coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout) {
ExpireBlocking();
coordinator_state_ &= ~COORD_BLOCKED;
return false;
}

#if 0
// We were notified by a shard, so lets make sure that our notifications converged to a stable
// form.
if (unique_shard_cnt_ > 1) {
run_count_.store(unique_shard_cnt_, memory_order_release);

auto converge_cb = [this] {
this->CheckForConvergence(EngineShard::tlocal());
};

for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
DCHECK_EQ(0, sd.local_mask & ARMED);
if (sd.arg_count == 0)
continue;
shard_set->Add(i, converge_cb);
}

// Wait for all callbacks to conclude.
WaitForShardCallbacks();
DVLOG(1) << "Convergence finished " << DebugId();
}
#endif

// Lift blocking mask.
bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout;
UnwatchBlocking(is_expired, wkeys_provider);
coordinator_state_ &= ~COORD_BLOCKED;

return true;
return !is_expired;
}

// Runs only in the shard thread.
Expand All @@ -1091,22 +1068,24 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
return OpStatus::OK;
}

void Transaction::ExpireShardCb(EngineShard* shard) {
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(Mode(), lock_args);
void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard) {
if (should_expire) {
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(Mode(), lock_args);

unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}

shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this);
shard->blocking_controller()->RemoveWatched(wkeys, this);

// Need to see why I decided to call this.
// My guess - probably to trigger the run of stalled transactions in case
// this shard concurrently awoke this transaction and stalled the processing
// of TxQueue.
shard->PollExecution("expirecb", nullptr);
shard->PollExecution("unwatchcb", nullptr);

CHECK_GE(DecreaseRunCnt(), 1u);
}
Expand Down
Loading

0 comments on commit 1bc3c1f

Please sign in to comment.