Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Testcase for listener snapshot #508

174 changes: 168 additions & 6 deletions src/kvstore/test/NebulaListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
#include "meta/ActiveHostsMan.h"

DECLARE_uint32(raft_heartbeat_interval_secs);
DECLARE_int32(wal_ttl);
DECLARE_int64(wal_file_size);
DECLARE_int32(wal_buffer_size);
DECLARE_int32(listener_pursue_leader_threshold);
DECLARE_int32(clean_wal_interval_secs);

using nebula::meta::PartHosts;
using nebula::meta::ListenerHosts;

Expand All @@ -41,12 +47,31 @@ class DummyListener : public Listener {
return data_;
}

void clearData() {
data_.clear();
}

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override {
std::lock_guard<std::mutex> g(raftLock_);
return Listener::commitSnapshot(data, committedLogId, committedLogTerm, finished);
bool unl = raftLock_.try_lock();
auto result = Listener::commitSnapshot(data, committedLogId,
committedLogTerm, finished);
if (unl) {
raftLock_.unlock();
}
committedSnapshot_.first += result.first;
committedSnapshot_.second += result.second;
return result;
}

std::pair<int64_t, int64_t> committedSnapshot() {
return committedSnapshot_;
}

std::pair<LogID, TermID> committedId() {
return lastCommittedLogId();
}

protected:
Expand All @@ -65,18 +90,16 @@ class DummyListener : public Listener {
}

std::pair<LogID, TermID> lastCommittedLogId() override {
return std::make_pair(committedLogId_, term_);
return std::make_pair(committedLogId_, lastLogTerm_);
}

LogID lastApplyLogId() override {
return lastApplyLogId_;
}

void cleanup() override {
}

private:
std::vector<KV> data_;
std::pair<int64_t, int64_t> committedSnapshot_{0, 0};
};

class ListenerBasicTest : public ::testing::TestWithParam<std::tuple<int32_t, int32_t, int32_t>> {
Expand Down Expand Up @@ -251,6 +274,7 @@ class ListenerBasicTest : public ::testing::TestWithParam<std::tuple<int32_t, in
}
}
LOG(FATAL) << "Should not reach here!";
return 0;
}

protected:
Expand All @@ -268,6 +292,19 @@ class ListenerBasicTest : public ::testing::TestWithParam<std::tuple<int32_t, in
std::unordered_map<PartitionID, std::shared_ptr<DummyListener>> dummys_;
};

class ListenerAdvanceTest : public ListenerBasicTest {
public:
void SetUp() override {
FLAGS_wal_ttl = 3;
FLAGS_wal_file_size = 128;
FLAGS_wal_buffer_size = 1;
FLAGS_listener_pursue_leader_threshold = 0;
FLAGS_clean_wal_interval_secs = 3;
FLAGS_raft_heartbeat_interval_secs = 5;
ListenerBasicTest::SetUp();
}
};

TEST_P(ListenerBasicTest, SimpleTest) {
LOG(INFO) << "Insert some data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
Expand Down Expand Up @@ -407,13 +444,138 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) {
}
}

TEST_P(ListenerBasicTest, ListenerResetByWalTest) {
FLAGS_wal_ttl = 14400;
FLAGS_wal_file_size = 1024;
FLAGS_wal_buffer_size = 512;
FLAGS_listener_pursue_leader_threshold = 0;
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = 0; i < 100000; i++) {
data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(spaceId_, partId, std::move(data),
[&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}

// wait listener commit
sleep(FLAGS_raft_heartbeat_interval_secs + 3);

for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummys_[partId];
const auto& data = dummy->data();
CHECK_EQ(100000, data.size());
}

for (int32_t partId = 1; partId <= partCount_; partId++) {
dummys_[partId]->clearData();
dummys_[partId]->resetListener();
CHECK_EQ(0, dummys_[partId]->data().size());
CHECK_EQ(0, dummys_[partId]->getApplyId());
}

sleep(FLAGS_raft_heartbeat_interval_secs + 3);

for (int32_t partId = 1; partId <= partCount_; partId++) {
while (true) {
if (dummys_[partId]->pursueLeaderDone()) {
break;
}
}
}

for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummys_[partId];
const auto& data = dummy->data();
CHECK_EQ(100000, data.size());
}
}

TEST_P(ListenerAdvanceTest, ListenerResetBySnapshotTest) {
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = 0; i < 1000000; i++) {
auto vKey = NebulaKeyUtils::vertexKey(8, partId, folly::to<std::string>(i), 5);
data.emplace_back(std::move(vKey),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(spaceId_, partId, std::move(data),
[&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}

// wait listener commit
sleep(FLAGS_raft_heartbeat_interval_secs + 1);

for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummys_[partId];
const auto& data = dummy->data();
CHECK_EQ(1000000, data.size());
}

sleep(FLAGS_clean_wal_interval_secs + 1);

for (int32_t partId = 1; partId <= partCount_; partId++) {
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
auto res = stores_[index]->part(spaceId_, partId);
CHECK(ok(res));
auto part = value(std::move(res));
part->wal()->reset();
}

sleep(FLAGS_clean_wal_interval_secs);

for (int32_t partId = 1; partId <= partCount_; partId++) {
dummys_[partId]->resetListener();
CHECK_EQ(0, dummys_[partId]->getApplyId());
auto termAndId = dummys_[partId]->committedId();
CHECK_EQ(0, termAndId.first);
CHECK_EQ(0, termAndId.second);
}

sleep(FLAGS_clean_wal_interval_secs + 1);

std::vector<bool> partResult;
for (int32_t partId = 1; partId <= partCount_; partId++) {
auto retry = 0;
while (retry++ < 6) {
auto result = dummys_[partId]->committedSnapshot();
if (result.first >= 1000000) {
partResult.emplace_back(true);
break;
}
}
CHECK_LT(retry, 6);
}
CHECK_EQ(partResult.size(), partCount_);
}


INSTANTIATE_TEST_CASE_P(
PartCount_Replicas_ListenerCount,
ListenerBasicTest,
::testing::Values(std::make_tuple(1, 1, 1)));

INSTANTIATE_TEST_CASE_P(
PartCount_Replicas_ListenerCount,
ListenerAdvanceTest,
::testing::Values(std::make_tuple(1, 1, 1)));

} // namespace kvstore
} // namespace nebula

Expand Down