From 5a61c3942feea4ebe4c41a2e393ebeb57cc499f3 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 10 Nov 2022 00:00:11 +0300 Subject: [PATCH 1/8] feat(server): Swith to stable state replication Signed-off-by: Vladislav Oleshko --- src/redis/rdb.h | 3 + src/server/dflycmd.cc | 117 +++++++++++++++++++++++++++++------ src/server/dflycmd.h | 19 ++++-- src/server/rdb_load.cc | 8 +++ src/server/rdb_load.h | 2 + src/server/rdb_save.cc | 20 ++++-- src/server/rdb_save.h | 2 + src/server/replica.cc | 134 ++++++++++++++++++++++++++++++++++------- src/server/replica.h | 20 +++++- src/server/snapshot.cc | 4 ++ 10 files changed, 274 insertions(+), 55 deletions(-) diff --git a/src/redis/rdb.h b/src/redis/rdb.h index 78770e59b517..3919929850aa 100644 --- a/src/redis/rdb.h +++ b/src/redis/rdb.h @@ -103,6 +103,9 @@ /* Test if a type is an object type. */ #define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) +// DFLY EXTENSIONS: WHAT FILE TO PUT THEM? +#define RDB_OPCODE_FULLSYNC_END 233 + /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_FUNCTION 246 /* engine data */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index a9cdea5508b5..f1c9868aa11b 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -87,6 +87,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { return Sync(args, cntx); } + if (sub_cmd == "STARTSTABLE" && args.size() == 3) { + return StartStable(args, cntx); + } + if (sub_cmd == "EXPIRE") { return Expire(args, cntx); } @@ -258,16 +262,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { return; unique_lock lk(sync_info->mu); - if (sync_info->state != SyncState::PREPARATION) - return rb->SendError(kInvalidState); - - // Check all flows are connected. - // This might happen if a flow abruptly disconnected before sending the SYNC request. - for (const FlowInfo& flow : sync_info->flows) { - if (!flow.conn) { - return rb->SendError(kInvalidState); - } - } + if (!CheckReplicaStateOrReply(*sync_info, SyncState::PREPARATION, rb)) + return; // Start full sync. { @@ -288,6 +284,38 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { return rb->SendOk(); } +void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + string_view sync_id_str = ArgS(args, 2); + + VLOG(0) << "Got DFLY STARTSTABLE " << sync_id_str; + + auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb); + if (!sync_id) + return; + + unique_lock lk(sync_info->mu); + if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb)) + return; + + { + TransactionGuard tg{cntx->transaction}; + AggregateStatus status; + + auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) { + status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal()); + return OpStatus::OK; + }; + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + + if (*status != OpStatus::OK) + return rb->SendError(kInvalidState); + } + + sync_info->state = SyncState::STABLE_SYNC; + return rb->SendOk(); +} + void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) { @@ -305,14 +333,37 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) { flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false)); if (shard != nullptr) { - flow->saver->StartSnapshotInShard(false, shard); + auto ec = sf_->journal()->OpenInThread(false, string_view()); + CHECK(!ec); + flow->saver->StartSnapshotInShard(true, shard); } flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow); return OpStatus::OK; } +OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { + if (shard != nullptr) { + flow->saver->StopSnapshotInShard(shard); + } + + // Wait for full sync to finish. + if (flow->fb.joinable()) { + flow->fb.join(); + } + + if (shard != nullptr) { + flow->saver.reset(); + + // Start stable state fiber. + flow->fb = boost::fibers::fiber(&DflyCmd::StableSyncFb, this, flow); + } + + return OpStatus::OK; +} + void DflyCmd::FullSyncFb(FlowInfo* flow) { + VLOG(0) << "Fullsyncfb"; error_code ec; RdbSaver* saver = flow->saver.get(); @@ -328,22 +379,32 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) { return; } - if (saver->Mode() != SaveMode::SUMMARY) { - // TODO: we should be able to stop earlier if requested. - ec = saver->SaveBody(nullptr); - if (ec) { - LOG(ERROR) << ec; - return; - } + // TODO: we should be able to stop earlier if requested. + ec = saver->SaveBody(nullptr); + if (ec) { + LOG(ERROR) << ec; + return; } + VLOG(0) << "SaveBody finished"; + ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); if (ec) { LOG(ERROR) << ec; return; } - ec = flow->conn->socket()->Shutdown(SHUT_RDWR); + //ec = flow->conn->socket()->Shutdown(SHUT_RDWR); +} + +void DflyCmd::StableSyncFb(FlowInfo* flow) { + auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) { + // TODO: Serialize event. + VLOG(0) << "Got change event " << je.key; + ReqSerializer serializer{flow->conn->socket()}; + serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString())); + CHECK(!serializer.ec()); + }); } uint32_t DflyCmd::CreateSyncSession() { @@ -429,6 +490,24 @@ pair> DflyCmd::GetSyncInfoOrReply(std::s return {sync_id, sync_it->second}; } +bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected, RedisReplyBuilder* rb) { + if (sync_info.state != expected) { + rb->SendError(kInvalidState); + return false; + } + + // Check all flows are connected. + // This might happen if a flow abruptly disconnected before sending the SYNC request. + for (const FlowInfo& flow : sync_info.flows) { + if (!flow.conn) { + rb->SendError(kInvalidState); + return false; + } + } + + return true; +} + void DflyCmd::BreakOnShutdown() { VLOG(1) << "BreakOnShutdown"; } diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 47a77f646f64..ee7400a8db0e 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -31,7 +31,7 @@ class Journal; class DflyCmd { public: - enum class SyncState { PREPARATION, FULL_SYNC, CANCELLED }; + enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED }; struct FlowInfo { FlowInfo() = default; @@ -80,11 +80,14 @@ class DflyCmd { // Register connection as flow for sync session. void Flow(CmdArgList args, ConnectionContext* cntx); - // SYNC - // Migrate connection to required flow thread. - // Stub: will be replcaed with full sync. + // SYNC + // Initiate full sync. void Sync(CmdArgList args, ConnectionContext* cntx); + // STARTSTABLE + // Switch to stable state replication. + void StartStable(CmdArgList args, ConnectionContext* cntx); + // EXPIRE // Check all keys for expiry. void Expire(CmdArgList args, ConnectionContext* cntx); @@ -92,9 +95,15 @@ class DflyCmd { // Start full sync in thread. Start FullSyncFb. Called for each flow. facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard); + // Start stable sync in thread. Start StableSyncFB. Called for each flow. + facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard); + // Fiber that runs full sync for each flow. void FullSyncFb(FlowInfo* flow); + // Fiber that runs stable sync for each flow. + void StableSyncFb(FlowInfo* flow); + // Unregister flow. Must be called when flow disconnects. void UnregisterFlow(FlowInfo*); @@ -108,6 +117,8 @@ class DflyCmd { std::pair> GetSyncInfoOrReply(std::string_view id, facade::RedisReplyBuilder* rb); + bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected, facade::RedisReplyBuilder* rb); + ServerFamily* sf_; util::ListenerInterface* listener_; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 958089e3e671..6ff6afe8e301 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1553,6 +1553,14 @@ error_code RdbLoader::Load(io::Source* src) { break; } + if (type == RDB_OPCODE_FULLSYNC_END) { + VLOG(0) << "GOT FULLSYNC OPCODE"; + if (fullsyncb) + fullsyncb(); + // notify full sync end + continue; + } + if (type == RDB_OPCODE_SELECTDB) { unsigned dbid = 0; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index ff1994e82b85..3acc582f6319 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -172,6 +172,8 @@ class RdbLoader : protected RdbLoaderBase { return load_time_; } + std::function fullsyncb; + private: struct ObjSettings; std::error_code LoadKeyValPair(int type, ObjSettings* settings); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 45382fb1dbde..89f14e5a89a1 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -581,6 +581,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { return error_code{}; } +error_code RdbSerializer::SendFullSyncCut() { + RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); + return FlushMem(); +} + // TODO: if buf is large enough, it makes sense to write both mem_buf and buf // directly to sink_. error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { @@ -918,12 +923,15 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) { error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { RETURN_ON_ERR(impl_->serializer()->FlushMem()); - VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); - - error_code io_error = impl_->ConsumeChannel(); - if (io_error) { - VLOG(1) << "io error " << io_error; - return io_error; + if (save_mode_ == SaveMode::SUMMARY) { + return impl_->serializer()->SendFullSyncCut(); + } else { + VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); + error_code io_error = impl_->ConsumeChannel(); + if (io_error) { + VLOG(1) << "io error " << io_error; + return io_error; + } } RETURN_ON_ERR(SaveEpilog()); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 66d5ea6a35fb..0a6b62e4fa03 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -140,6 +140,8 @@ class RdbSerializer { // for the dump command - thus it is public function std::error_code SaveValue(const PrimeValue& pv); + std::error_code SendFullSyncCut(); + private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveObject(const PrimeValue& pv); diff --git a/src/server/replica.cc b/src/server/replica.cc index f61495006456..60c7ab6a5201 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -204,13 +204,13 @@ void Replica::MainReplicationFb() { this_fiber::sleep_for(50ms); } + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); if (ec) { LOG(WARNING) << "Error syncing " << ec << " " << ec.message(); state_mask_ &= R_ENABLED; // reset all flags besides R_ENABLED continue; } - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); VLOG(1) << "Replica greet ok"; } @@ -422,17 +422,18 @@ error_code Replica::InitiateDflySync() { shard_flows_[i].reset(new Replica(master_context_, i, &service_)); } + SyncBlock sb{num_df_flows_}; + AggregateError ec; auto partition = Partition(num_df_flows_); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { for (auto id : partition[index]) { - if (ec = shard_flows_[id]->StartAsDflyFlow()) + if (ec = shard_flows_[id]->StartFullSyncFlow(&sb)) break; } }); - if (ec) - return *ec; + RETURN_ON_ERR(*ec); ReqSerializer serializer{sock_.get()}; @@ -447,8 +448,13 @@ error_code Replica::InitiateDflySync() { return make_error_code(errc::bad_message); } - for (auto& flow : shard_flows_) - flow->sync_fb_.join(); + // Wait for all flows to receive full sync cut. + { + VLOG(0) << "BEFORE FULL SYNC WAIT"; + std::unique_lock lk(sb.mu_); + sb.cv_.wait(lk, [&]() { return sb.flows_left == 0; }); + VLOG(0) << "AFTER FULL SYNC WAIT"; + } LOG(INFO) << "Full sync finished"; state_mask_ |= R_SYNC_OK; @@ -503,20 +509,45 @@ error_code Replica::ConsumeRedisStream() { } error_code Replica::ConsumeDflyStream() { - ReqSerializer serializer{sock_.get()}; - // TBD - serializer.SendCommand("QUIT"); - state_mask_ &= ~R_ENABLED; // disable further - TODO: not finished. - RETURN_ON_ERR(serializer.ec()); + // Request master to transition to stable sync. + { + ReqSerializer serializer{sock_.get()}; + serializer.SendCommand(StrCat("DFLY STARTSTABLE ", master_context_.dfly_session_id)); + RETURN_ON_ERR(serializer.ec()); + } - base::IoBuf io_buf{128}; + // Wait for all flows to finish full sync. + for (auto& sub_repl : shard_flows_) + sub_repl->sync_fb_.join(); + + VLOG(0) << "FULL SYNC FIBERS JOINED"; + + AggregateError all_ec; + vector> partition = Partition(num_df_flows_); + shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { + const auto& local_ids = partition[index]; + for (unsigned id : local_ids) { + all_ec = shard_flows_[id]->StartStableSyncFlow(); + if (all_ec) + break; + } + }); + + RETURN_ON_ERR(*all_ec); - RETURN_ON_ERR(Recv(sock_.get(), &io_buf)); + base::IoBuf io_buf(16_KB); + std::error_code ec; + while (!ec) { + io::MutableBytes buf = io_buf.AppendBuffer(); + io::Result size_res = sock_->Recv(buf); + if (!size_res) + return size_res.error(); + } return error_code{}; } -error_code Replica::StartAsDflyFlow() { +error_code Replica::StartFullSyncFlow(SyncBlock* sb) { CHECK(!sock_); DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); @@ -558,32 +589,89 @@ error_code Replica::StartAsDflyFlow() { // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = - ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, std::move(io_buf), move(eof_token)); + sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, sb, std::move(io_buf), + move(eof_token)); return error_code{}; } -void Replica::FullSyncDflyFb(unique_ptr io_buf, string eof_token) { +error_code Replica::StartStableSyncFlow() { + VLOG(0) << "Starting consume flow"; + + DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); + ProactorBase* mythread = ProactorBase::me(); + CHECK(mythread); + + CHECK(sock_->IsOpen()); + // sock_.reset(mythread->CreateSocket()); + // RETURN_ON_ERR(sock_->Connect(master_context_.master_ep)); + sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this); + + return std::error_code{}; +} + +void Replica::FullSyncDflyFb(SyncBlock* sb, unique_ptr io_buf, string eof_token) { + VLOG(0) << "Start Fullsyncfb"; SocketSource ss{sock_.get()}; io::PrefixSource ps{io_buf->InputBuffer(), &ss}; RdbLoader loader(NULL); + loader.fullsyncb = [this, sb, ran = false]() mutable { + { + std::unique_lock lk(sb->mu_); + if (!ran) // TODO: Remove this!!!!. + sb->flows_left--; + ran = true; + } + sb->cv_.notify_all(); + }; loader.Load(&ps); + VLOG(0) << "Done Fullsyncfb"; + if (!eof_token.empty()) { unique_ptr buf(new uint8_t[eof_token.size()]); // pass leftover data from the loader. io::PrefixSource chained(loader.Leftover(), &ps); - VLOG(1) << "Before reading from chained stream"; - io::Result eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()}); - if (!eof_res || *eof_res != eof_token.size()) { - LOG(ERROR) << "Error finding eof token in the stream"; - } + VLOG(0) << "Before reading from chained stream"; + //io::Result eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()}); + //if (!eof_res || *eof_res != eof_token.size()) { + // LOG(ERROR) << "Error finding eof token in the stream"; + //} // TODO - to compare tokens } - VLOG(1) << "ReplicateDFFb finished after reading " << loader.bytes_read() << " bytes"; + VLOG(0) << "ReplicateDFFb finished after reading " << loader.bytes_read() << " bytes"; +} + +void Replica::StableSyncDflyFb() { + VLOG(0) << "RUNNING STABLE SYNC"; + base::IoBuf io_buf(16_KB); + parser_.reset(new RedisParser); + + error_code ec; + time_t last_ack = time(nullptr); + string ack_cmd; + + while (!ec) { + VLOG(0) << "StableSyncDflyFb iteration"; + io::MutableBytes buf = io_buf.AppendBuffer(); + io::Result size_res = sock_->Recv(buf); + if (!size_res) + return; + // return size_res.error(); + + VLOG(0) << "Read replication stream of " << *size_res << " bytes"; + last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); + + io_buf.CommitWrite(*size_res); + repl_offs_ += *size_res; + + ec = ParseAndExecute(&io_buf); + } + + VLOG(0) << "StableSyncDflyFb finished"; + return; } error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) { diff --git a/src/server/replica.h b/src/server/replica.h index 4de130f6a412..29f3ca9be71a 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -3,7 +3,9 @@ // #pragma once +#include #include +#include #include #include "base/io_buf.h" @@ -44,6 +46,14 @@ class Replica { R_SYNC_OK = 0x10, }; + struct SyncBlock { + SyncBlock(unsigned flows) : flows_left{flows} { + } + unsigned flows_left; + ::boost::fibers::mutex mu_; + ::boost::fibers::condition_variable cv_; + }; + public: Replica(std::string master_host, uint16_t port, Service* se); ~Replica(); @@ -75,10 +85,14 @@ class Replica { Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service); // Start replica initialized as dfly flow. - std::error_code StartAsDflyFlow(); + std::error_code StartFullSyncFlow(SyncBlock* block); + + std::error_code StartStableSyncFlow(); + + // Single flow Dragonfly full sync fiber spawned by StartFullSyncFlow. + void FullSyncDflyFb(SyncBlock* block, std::unique_ptr io_buf, std::string eof_token); - // Sindle flow Dragonfly full sync fiber spawned by StartAsDflyFlow. - void FullSyncDflyFb(std::unique_ptr io_buf, std::string eof_token); + void StableSyncDflyFb(); private: /* Utility */ struct PSyncResponse { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 7ce91ba007c2..545f54f7d56d 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -126,6 +126,10 @@ void SliceSnapshot::SerializeEntriesFb() { mu_.lock(); mu_.unlock(); + for (int i = 10; i >= 1; i--) + CHECK(!rdb_serializer_->SendFullSyncCut()); + FlushSfile(true); + VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/" << side_saved_ << "/" << savecb_calls_; } From da7c901c9def56e381d9f972a0e930c7aee0afed Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 10 Nov 2022 11:34:19 +0300 Subject: [PATCH 2/8] feat(server): Full pipeline replication test Signed-off-by: Vladislav Oleshko --- src/server/rdb_load.cc | 4 +++ tests/dragonfly/replication_test.py | 47 +++++++++++++++++++++++++++++ tests/dragonfly/utility.py | 18 +++++------ 3 files changed, 60 insertions(+), 9 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 6ff6afe8e301..a334021845de 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1823,6 +1823,10 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) continue; + auto [fit, _] = db_slice.FindExt(db_cntx, item.key); + if (IsValid(fit)) + db_slice.Del(db_cntx.db_index, fit); + auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms); if (!added) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 96703a2fa768..f48ce5defda2 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -6,10 +6,57 @@ import time from .utility import * +from . import dfly_args BASE_PORT = 1111 +""" +Test full replication pipeline. Test full sync with streaming changes and stable state streaming. +""" + +replication_cases = [ + (2, 2, 1500, 200) +] + + +@pytest.mark.asyncio +@pytest.mark.parametrize("t_master, t_replica, n_keys, n_stream_keys", replication_cases) +@dfly_args({"logtostdout": ""}) +async def test_replication_all(df_local_factory, t_master, t_replica, n_keys, n_stream_keys): + master = df_local_factory.create(port=1111, proactor_threads=t_master) + replica = df_local_factory.create(port=1112, proactor_threads=t_replica) + + # Start master and fill with test data + master.start() + c_master = aioredis.Redis(port=master.port) + await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1)) + + # Start replica + replica.start() + c_replica = aioredis.Redis(port=replica.port) + + async def stream_data(): + for k, v in gen_test_data(n_stream_keys, seed=2): + await c_master.set(k, v) + + # Start streaming data and run REPLICAOF in parallel + stream_fut = asyncio.create_task(stream_data()) + await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) + await stream_fut + + await wait_available_async(c_replica) + # Check range [n_stream_keys, n_keys] is of seed 1 + await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) + # Check range [0, n_stream_keys] is of seed 2 + await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) + + # Check stable state streaming + await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3)) + await asyncio.sleep(0.1) + await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=3)) + + """ Test simple full sync on one replica without altering data during replication. """ diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 57b0b142ff8e..25d569e5d8f2 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -18,9 +18,9 @@ def grouper(n, iterable): BATCH_SIZE = 100 -def gen_test_data(n): - for i in range(n): - yield "k-"+str(i), "v-"+str(i) +def gen_test_data(n, start=0, seed=None): + for i in range(start, n): + yield "k-"+str(i), "v-"+str(i) + ("-"+str(seed) if seed else "") def batch_fill_data(client: redis.Redis, gen): @@ -44,15 +44,15 @@ def as_str_val(v) -> str: def batch_check_data(client: redis.Redis, gen): for group in grouper(BATCH_SIZE, gen): - vals = client.mget(k for k, _ in group) - assert all(as_str_val(vals[i]) == v for i, (_, v) in enumerate(group)) - + vals = [as_str_val(v) for v in client.mget(k for k, _ in group)] + gvals = [v for _, v in group] + assert vals == gvals async def batch_check_data_async(client: aioredis.Redis, gen): for group in grouper(BATCH_SIZE, gen): - vals = await client.mget(k for k, _ in group) - assert all(as_str_val(vals[i]) == v for i, (_, v) in enumerate(group)) - + vals = [as_str_val(v) for v in await client.mget(k for k, _ in group)] + gvals = [v for _, v in group] + assert vals == gvals def wait_available(client: redis.Redis): its = 0 From e495985594bbd92cef62a2c619a5446deafa887f Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Sun, 13 Nov 2022 14:09:27 +0300 Subject: [PATCH 3/8] feat(server): Transfer leftover bytes between repl stages Signed-off-by: Vladislav Oleshko --- src/server/dflycmd.cc | 47 +++++++++++++---- src/server/rdb_save.cc | 2 +- src/server/replica.cc | 79 ++++++++++++++++------------- src/server/replica.h | 3 +- src/server/snapshot.cc | 4 +- src/server/transaction.cc | 2 +- tests/dragonfly/replication_test.py | 11 ++-- 7 files changed, 95 insertions(+), 53 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index f1c9868aa11b..279158eead49 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -105,6 +105,8 @@ void DflyCmd::OnClose(ConnectionContext* cntx) { if (!session_id) return; + VLOG(0) << "Disconnected !!! " << flow_id; + if (flow_id == kuint32max) { DeleteSyncSession(session_id); } else { @@ -298,20 +300,42 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb)) return; + // TODO: Temporary solution { - TransactionGuard tg{cntx->transaction}; AggregateStatus status; - auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) { + auto cb = [this, &status, sync_info = sync_info](Transaction* t, EngineShard* shard) { + unsigned index = shard->shard_id(); + status = StartStableSyncInThread(&sync_info->flows[index], shard); + return OpStatus::OK; + }; + cntx->transaction->ScheduleSingleHop(std::move(cb)); + + auto cb2 = [this, &status, sync_info = sync_info](unsigned index, auto*) { + if (EngineShard::tlocal() != nullptr) return OpStatus::OK; status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal()); return OpStatus::OK; }; - shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + shard_set->pool()->AwaitFiberOnAll(std::move(cb2)); if (*status != OpStatus::OK) return rb->SendError(kInvalidState); } + //{ + // TransactionGuard tg{cntx->transaction}; + // AggregateStatus status; + + // auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) { + // status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal()); + // return OpStatus::OK; + // }; + // shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + + // if (*status != OpStatus::OK) + // return rb->SendError(kInvalidState); + //} + sync_info->state = SyncState::STABLE_SYNC; return rb->SendOk(); } @@ -357,13 +381,15 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { // Start stable state fiber. flow->fb = boost::fibers::fiber(&DflyCmd::StableSyncFb, this, flow); + + // TODO: Temporary solution + flow->fb.join(); } return OpStatus::OK; } void DflyCmd::FullSyncFb(FlowInfo* flow) { - VLOG(0) << "Fullsyncfb"; error_code ec; RdbSaver* saver = flow->saver.get(); @@ -386,7 +412,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) { return; } - VLOG(0) << "SaveBody finished"; + VLOG(1) << "Sending full sync EOF"; ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); if (ec) { @@ -394,16 +420,18 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) { return; } - //ec = flow->conn->socket()->Shutdown(SHUT_RDWR); + // ec = flow->conn->socket()->Shutdown(SHUT_RDWR); } void DflyCmd::StableSyncFb(FlowInfo* flow) { auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) { + // TODO: Temporary solution + if (flow->conn == nullptr) return; + // TODO: Serialize event. - VLOG(0) << "Got change event " << je.key; ReqSerializer serializer{flow->conn->socket()}; serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString())); - CHECK(!serializer.ec()); + //CHECK(!serializer.ec()); }); } @@ -490,7 +518,8 @@ pair> DflyCmd::GetSyncInfoOrReply(std::s return {sync_id, sync_it->second}; } -bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected, RedisReplyBuilder* rb) { +bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected, + RedisReplyBuilder* rb) { if (sync_info.state != expected) { rb->SendError(kInvalidState); return false; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 89f14e5a89a1..45ad8f59ba41 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -924,7 +924,7 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { RETURN_ON_ERR(impl_->serializer()->FlushMem()); if (save_mode_ == SaveMode::SUMMARY) { - return impl_->serializer()->SendFullSyncCut(); + impl_->serializer()->SendFullSyncCut(); } else { VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); error_code io_error = impl_->ConsumeChannel(); diff --git a/src/server/replica.cc b/src/server/replica.cc index 60c7ab6a5201..2d76e95c13ba 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -567,12 +567,12 @@ error_code Replica::StartFullSyncFlow(SyncBlock* sb) { parser_.reset(new RedisParser{false}); // client mode - std::unique_ptr io_buf{new base::IoBuf(128)}; + leftover_buf_.reset(new base::IoBuf(128)); unsigned consumed = 0; - RETURN_ON_ERR(ReadRespReply(io_buf.get(), &consumed)); // uses parser_ + RETURN_ON_ERR(ReadRespReply(leftover_buf_.get(), &consumed)); // uses parser_ if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})) { - LOG(ERROR) << "Bad FLOW response " << ToSV(io_buf->InputBuffer()); + LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer()); return make_error_code(errc::bad_message); } @@ -581,23 +581,20 @@ error_code Replica::StartFullSyncFlow(SyncBlock* sb) { if (flow_directive == "FULL") { eof_token = ToSV(resp_args_[1].GetBuf()); } else { - LOG(ERROR) << "Bad FLOW response " << ToSV(io_buf->InputBuffer()); + LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer()); } - io_buf->ConsumeInput(consumed); + leftover_buf_->ConsumeInput(consumed); state_mask_ = R_ENABLED | R_TCP_CONNECTED; // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, sb, std::move(io_buf), - move(eof_token)); + sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, sb, move(eof_token)); return error_code{}; } error_code Replica::StartStableSyncFlow() { - VLOG(0) << "Starting consume flow"; - DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); @@ -610,58 +607,71 @@ error_code Replica::StartStableSyncFlow() { return std::error_code{}; } -void Replica::FullSyncDflyFb(SyncBlock* sb, unique_ptr io_buf, string eof_token) { - VLOG(0) << "Start Fullsyncfb"; +void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { + DCHECK(leftover_buf_); SocketSource ss{sock_.get()}; - io::PrefixSource ps{io_buf->InputBuffer(), &ss}; + io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss}; RdbLoader loader(NULL); loader.fullsyncb = [this, sb, ran = false]() mutable { + if (ran) return; { std::unique_lock lk(sb->mu_); - if (!ran) // TODO: Remove this!!!!. - sb->flows_left--; + sb->flows_left--; ran = true; } sb->cv_.notify_all(); }; loader.Load(&ps); - VLOG(0) << "Done Fullsyncfb"; - + // Try finding eof token. + io::PrefixSource chained_tail{loader.Leftover(), &ps}; if (!eof_token.empty()) { - unique_ptr buf(new uint8_t[eof_token.size()]); - // pass leftover data from the loader. - io::PrefixSource chained(loader.Leftover(), &ps); - VLOG(0) << "Before reading from chained stream"; - //io::Result eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()}); - //if (!eof_res || *eof_res != eof_token.size()) { - // LOG(ERROR) << "Error finding eof token in the stream"; - //} - - // TODO - to compare tokens - } - VLOG(0) << "ReplicateDFFb finished after reading " << loader.bytes_read() << " bytes"; + unique_ptr buf{new uint8_t[eof_token.size()]}; + + io::Result res = + chained_tail.ReadAtLeast(io::MutableBytes{buf.get(), eof_token.size()}, eof_token.size()); + + if (!res || *res != eof_token.size()) { + LOG(ERROR) << "Error finding eof token in the stream"; + } + } + + // Keep laoder leftover. + io::Bytes unused = chained_tail.unused_prefix(); + if (unused.size() > 0) { + leftover_buf_.reset(new base::IoBuf{unused.size()}); + auto mut_bytes = leftover_buf_->AppendBuffer(); + memcpy(mut_bytes.data(), unused.data(), unused.size()); + leftover_buf_->CommitWrite(unused.size()); + } else { + leftover_buf_.reset(); + } + + VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes"; } void Replica::StableSyncDflyFb() { - VLOG(0) << "RUNNING STABLE SYNC"; base::IoBuf io_buf(16_KB); parser_.reset(new RedisParser); + // Check leftover from stable state. + if (leftover_buf_ && leftover_buf_->InputLen() > 0) { + size_t len = leftover_buf_->InputLen(); + leftover_buf_->ReadAndConsume(len, io_buf.AppendBuffer().data()); + io_buf.CommitWrite(len); + leftover_buf_.reset(); + } + error_code ec; - time_t last_ack = time(nullptr); string ack_cmd; while (!ec) { - VLOG(0) << "StableSyncDflyFb iteration"; io::MutableBytes buf = io_buf.AppendBuffer(); io::Result size_res = sock_->Recv(buf); if (!size_res) return; - // return size_res.error(); - VLOG(0) << "Read replication stream of " << *size_res << " bytes"; last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); io_buf.CommitWrite(*size_res); @@ -670,7 +680,8 @@ void Replica::StableSyncDflyFb() { ec = ParseAndExecute(&io_buf); } - VLOG(0) << "StableSyncDflyFb finished"; + VLOG(0) << "GOT EC " << ec.message(); + return; } diff --git a/src/server/replica.h b/src/server/replica.h index 29f3ca9be71a..aaa82b87342d 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -90,7 +90,7 @@ class Replica { std::error_code StartStableSyncFlow(); // Single flow Dragonfly full sync fiber spawned by StartFullSyncFlow. - void FullSyncDflyFb(SyncBlock* block, std::unique_ptr io_buf, std::string eof_token); + void FullSyncDflyFb(SyncBlock* block, std::string eof_token); void StableSyncDflyFb(); @@ -156,6 +156,7 @@ class Replica { ::boost::fibers::fiber sync_fb_; std::vector> shard_flows_; + std::unique_ptr leftover_buf_; std::unique_ptr parser_; facade::RespVec resp_args_; facade::CmdArgVec cmd_str_args_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 545f54f7d56d..66047a1bd973 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -126,8 +126,7 @@ void SliceSnapshot::SerializeEntriesFb() { mu_.lock(); mu_.unlock(); - for (int i = 10; i >= 1; i--) - CHECK(!rdb_serializer_->SendFullSyncCut()); + CHECK(!rdb_serializer_->SendFullSyncCut()); FlushSfile(true); VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/" @@ -254,6 +253,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { CHECK(!ec && !sfile.val.empty()); DbRecord rec = GetDbRecord(entry.db_ind, std::move(sfile.val), 1); + dest_->Push(std::move(rec)); } } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7f38c2aaf8ad..f35d3561a811 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -560,7 +560,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then // call PollExecute that runs the callback which calls DecreaseRunCnt. // As a result WaitForShardCallbacks below is unblocked. - auto schedule_cb = [&] { + auto schedule_cb = [this] { bool run_eager = ScheduleUniqueShard(EngineShard::tlocal()); if (run_eager) { // it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned. diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index f48ce5defda2..a094a12b7091 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -3,7 +3,6 @@ import asyncio import aioredis import redis -import time from .utility import * from . import dfly_args @@ -16,16 +15,17 @@ """ replication_cases = [ - (2, 2, 1500, 200) + (2, 2, 500, 200), + (8, 8, 5000, 1000), + (12, 8, 20000, 4000) ] @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replica, n_keys, n_stream_keys", replication_cases) -@dfly_args({"logtostdout": ""}) async def test_replication_all(df_local_factory, t_master, t_replica, n_keys, n_stream_keys): - master = df_local_factory.create(port=1111, proactor_threads=t_master) - replica = df_local_factory.create(port=1112, proactor_threads=t_replica) + master = df_local_factory.create(port=1111, proactor_threads=t_master, logtostdout="") + replica = df_local_factory.create(port=1112, proactor_threads=t_replica, logtostdout="") # Start master and fill with test data master.start() @@ -49,6 +49,7 @@ async def stream_data(): # Check range [n_stream_keys, n_keys] is of seed 1 await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) # Check range [0, n_stream_keys] is of seed 2 + await asyncio.sleep(0.1) await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) # Check stable state streaming From 62b6f48c72f6f74b82bc156bc5e87c60ea2f831a Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Sun, 13 Nov 2022 23:41:52 +0300 Subject: [PATCH 4/8] feat(server): Multi instance repl tests + small fixes Signed-off-by: Vladislav Oleshko --- src/redis/rdb.h | 3 - src/server/dflycmd.cc | 53 ++------- src/server/dflycmd.h | 8 +- src/server/rdb_extensions.h | 13 +++ src/server/rdb_load.cc | 7 +- src/server/rdb_load.h | 6 +- src/server/rdb_save.cc | 1 + src/server/rdb_save.h | 4 +- src/server/replica.cc | 7 +- src/server/transaction.cc | 10 +- tests/dragonfly/replication_test.py | 160 ++++++++-------------------- 11 files changed, 87 insertions(+), 185 deletions(-) create mode 100644 src/server/rdb_extensions.h diff --git a/src/redis/rdb.h b/src/redis/rdb.h index 3919929850aa..78770e59b517 100644 --- a/src/redis/rdb.h +++ b/src/redis/rdb.h @@ -103,9 +103,6 @@ /* Test if a type is an object type. */ #define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) -// DFLY EXTENSIONS: WHAT FILE TO PUT THEM? -#define RDB_OPCODE_FULLSYNC_END 233 - /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_FUNCTION 246 /* engine data */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 279158eead49..86f1062a0dbf 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -300,42 +300,20 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb)) return; - // TODO: Temporary solution { + TransactionGuard tg{cntx->transaction}; AggregateStatus status; - auto cb = [this, &status, sync_info = sync_info](Transaction* t, EngineShard* shard) { - unsigned index = shard->shard_id(); - status = StartStableSyncInThread(&sync_info->flows[index], shard); - return OpStatus::OK; - }; - cntx->transaction->ScheduleSingleHop(std::move(cb)); - - auto cb2 = [this, &status, sync_info = sync_info](unsigned index, auto*) { - if (EngineShard::tlocal() != nullptr) return OpStatus::OK; + auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) { status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal()); return OpStatus::OK; }; - shard_set->pool()->AwaitFiberOnAll(std::move(cb2)); + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); if (*status != OpStatus::OK) return rb->SendError(kInvalidState); } - //{ - // TransactionGuard tg{cntx->transaction}; - // AggregateStatus status; - - // auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) { - // status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal()); - // return OpStatus::OK; - // }; - // shard_set->pool()->AwaitFiberOnAll(std::move(cb)); - - // if (*status != OpStatus::OK) - // return rb->SendError(kInvalidState); - //} - sync_info->state = SyncState::STABLE_SYNC; return rb->SendOk(); } @@ -379,11 +357,12 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { if (shard != nullptr) { flow->saver.reset(); - // Start stable state fiber. - flow->fb = boost::fibers::fiber(&DflyCmd::StableSyncFb, this, flow); - - // TODO: Temporary solution - flow->fb.join(); + // TODO: Add cancellation. + auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) { + // TODO: Serialize event. + ReqSerializer serializer{flow->conn->socket()}; + serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString())); + }); } return OpStatus::OK; @@ -419,20 +398,6 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) { LOG(ERROR) << ec; return; } - - // ec = flow->conn->socket()->Shutdown(SHUT_RDWR); -} - -void DflyCmd::StableSyncFb(FlowInfo* flow) { - auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) { - // TODO: Temporary solution - if (flow->conn == nullptr) return; - - // TODO: Serialize event. - ReqSerializer serializer{flow->conn->socket()}; - serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString())); - //CHECK(!serializer.ec()); - }); } uint32_t DflyCmd::CreateSyncSession() { diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index ee7400a8db0e..abbfbf5ab4b4 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -95,15 +95,12 @@ class DflyCmd { // Start full sync in thread. Start FullSyncFb. Called for each flow. facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard); - // Start stable sync in thread. Start StableSyncFB. Called for each flow. + // Start stable sync in thread. Called for each flow. facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard); // Fiber that runs full sync for each flow. void FullSyncFb(FlowInfo* flow); - // Fiber that runs stable sync for each flow. - void StableSyncFb(FlowInfo* flow); - // Unregister flow. Must be called when flow disconnects. void UnregisterFlow(FlowInfo*); @@ -117,7 +114,8 @@ class DflyCmd { std::pair> GetSyncInfoOrReply(std::string_view id, facade::RedisReplyBuilder* rb); - bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected, facade::RedisReplyBuilder* rb); + bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected, + facade::RedisReplyBuilder* rb); ServerFamily* sf_; diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h new file mode 100644 index 000000000000..65044db3440f --- /dev/null +++ b/src/server/rdb_extensions.h @@ -0,0 +1,13 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +/* +Opcode range 230-240 is used by DF extensions. +*/ + +const uint8_t RDB_OPCODE_FULLSYNC_END = 230; + + diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index a334021845de..61a51a6ca5a9 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -27,6 +27,7 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/hset_family.h" +#include "server/rdb_extensions.h" #include "server/script_mgr.h" #include "server/server_state.h" #include "server/set_family.h" @@ -1554,10 +1555,8 @@ error_code RdbLoader::Load(io::Source* src) { } if (type == RDB_OPCODE_FULLSYNC_END) { - VLOG(0) << "GOT FULLSYNC OPCODE"; - if (fullsyncb) - fullsyncb(); - // notify full sync end + if (full_sync_cut_cb) + full_sync_cut_cb(); continue; } diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 3acc582f6319..5b998e8880d8 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -172,7 +172,9 @@ class RdbLoader : protected RdbLoaderBase { return load_time_; } - std::function fullsyncb; + void SetFullSyncCutCb(std::function cb) { + full_sync_cut_cb = std::move(cb); + } private: struct ObjSettings; @@ -196,6 +198,8 @@ class RdbLoader : protected RdbLoaderBase { ::boost::fibers::mutex mu_; std::error_code ec_; // guarded by mu_ std::atomic_bool stop_early_{false}; + + std::function full_sync_cut_cb; }; } // namespace dfly diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 45ad8f59ba41..c8109968c609 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -25,6 +25,7 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/snapshot.h" +#include "server/rdb_extensions.h" #include "util/fibers/simple_channel.h" namespace dfly { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 0a6b62e4fa03..02185eb3beab 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -85,7 +85,9 @@ class RdbSaver { // freq_map can optionally be null. std::error_code SaveBody(RdbTypeFreqMap* freq_map); - SaveMode Mode() const { return save_mode_; } + SaveMode Mode() const { + return save_mode_; + } private: class Impl; diff --git a/src/server/replica.cc b/src/server/replica.cc index 2d76e95c13ba..66833a10f1cd 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -613,15 +613,14 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss}; RdbLoader loader(NULL); - loader.fullsyncb = [this, sb, ran = false]() mutable { - if (ran) return; - { + loader.SetFullSyncCutCb([this, sb, ran = false]() mutable { + if (!ran) { std::unique_lock lk(sb->mu_); sb->flows_left--; ran = true; } sb->cv_.notify_all(); - }; + }); loader.Load(&ps); // Try finding eof token. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f35d3561a811..848e65a53393 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -341,7 +341,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // if transaction is suspended (blocked in watched queue), then it's a noop. OpStatus status = OpStatus::OK; - if (!was_suspended) { + if (!was_suspended) { status = cb_(this, shard); } @@ -802,7 +802,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // Fast path - for uncontended keys, just run the callback. // That applies for single key operations like set, get, lpush etc. - if (shard->db_slice().CheckLock(mode, lock_args)) { + if (shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode)) { RunQuickie(shard); return true; } @@ -814,7 +814,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); sd.local_mask |= KEYLOCK_ACQUIRED; - DCHECK(!lock_acquired); // Because CheckLock above failed. + // DCHECK(!lock_acquired); // Because CheckLock above failed. DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); @@ -1137,8 +1137,8 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } - DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask - << " by " << committed_txid; + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by " + << committed_txid; // local_mask could be awaked (i.e. not suspended) if the transaction has been // awakened by another key or awakened by the same key multiple times. diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a094a12b7091..d412ae0b949f 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -15,144 +15,68 @@ """ replication_cases = [ - (2, 2, 500, 200), - (8, 8, 5000, 1000), - (12, 8, 20000, 4000) + (8, [8], 20000, 5000), + (8, [8], 10000, 10000), + (8, [2, 2, 2, 2], 20000, 5000), + (6, [6, 6, 6], 30000, 15000), + (4, [1] * 12, 10000, 4000), ] - @pytest.mark.asyncio -@pytest.mark.parametrize("t_master, t_replica, n_keys, n_stream_keys", replication_cases) -async def test_replication_all(df_local_factory, t_master, t_replica, n_keys, n_stream_keys): - master = df_local_factory.create(port=1111, proactor_threads=t_master, logtostdout="") - replica = df_local_factory.create(port=1112, proactor_threads=t_replica, logtostdout="") +@pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases) +async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys): + master = df_local_factory.create(port=1111, proactor_threads=t_master) + replicas = [ + df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) + for i, t in enumerate(t_replicas) + ] # Start master and fill with test data master.start() c_master = aioredis.Redis(port=master.port) await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1)) - # Start replica - replica.start() - c_replica = aioredis.Redis(port=replica.port) + # Start replicas + for replica in replicas: + replica.start() + + c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] async def stream_data(): - for k, v in gen_test_data(n_stream_keys, seed=2): - await c_master.set(k, v) + """ Stream data during stable state replication phase and afterwards """ + gen = gen_test_data(n_stream_keys, seed=2) + for chunk in grouper(3, gen): + await c_master.mset({k:v for k,v in chunk}) + + async def run_replication(c_replica): + await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) + + async def check_replication(c_replica): + """ Check that static and streamed data arrived """ + await wait_available_async(c_replica) + # Check range [n_stream_keys, n_keys] is of seed 1 + await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) + # Check range [0, n_stream_keys] is of seed 2 + await asyncio.sleep(0.2) + await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) # Start streaming data and run REPLICAOF in parallel stream_fut = asyncio.create_task(stream_data()) - await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) + await asyncio.gather(*(asyncio.create_task(run_replication(c)) + for c in c_replicas)) + + assert not stream_fut.done(), "Weak testcase. Increase number of streamed keys to surpass full sync" await stream_fut - await wait_available_async(c_replica) - # Check range [n_stream_keys, n_keys] is of seed 1 - await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) - # Check range [0, n_stream_keys] is of seed 2 - await asyncio.sleep(0.1) - await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) + # Check full sync results + await asyncio.gather(*(check_replication(c) for c in c_replicas)) # Check stable state streaming await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3)) - await asyncio.sleep(0.1) - await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=3)) - - -""" -Test simple full sync on one replica without altering data during replication. -""" - -# (threads_master, threads_replica, n entries) -simple_full_sync_cases = [ - (2, 2, 100), - (8, 2, 500), - (2, 8, 500), - (6, 4, 500) -] - - -@pytest.mark.parametrize("t_master, t_replica, n_keys", simple_full_sync_cases) -def test_simple_full_sync(df_local_factory, t_master, t_replica, n_keys): - master = df_local_factory.create(port=1111, proactor_threads=t_master) - replica = df_local_factory.create(port=1112, proactor_threads=t_replica) - - # Start master and fill with test data - master.start() - c_master = redis.Redis(port=master.port) - batch_fill_data(c_master, gen_test_data(n_keys)) - - # Start replica and run REPLICAOF - replica.start() - c_replica = redis.Redis(port=replica.port) - c_replica.replicaof("localhost", str(master.port)) - - # Check replica received test data - wait_available(c_replica) - batch_check_data(c_replica, gen_test_data(n_keys)) - - # Stop replication manually - c_replica.replicaof("NO", "ONE") - assert c_replica.set("writeable", "true") - - # Check test data persisted - batch_check_data(c_replica, gen_test_data(n_keys)) - - -""" -Test simple full sync on multiple replicas without altering data during replication. -The replicas start running in parallel. -""" - -# (threads_master, threads_replicas, n entries) -simple_full_sync_multi_cases = [ - (4, [3, 2], 500), - (8, [6, 5, 4], 500), - (8, [2] * 5, 100), - (4, [1] * 20, 500) -] - -@pytest.mark.asyncio -@pytest.mark.parametrize("t_master, t_replicas, n_keys", simple_full_sync_multi_cases) -async def test_simple_full_sync_multi(df_local_factory, t_master, t_replicas, n_keys): - def data_gen(): return gen_test_data(n_keys) - - master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master) - replicas = [ - df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) - for i, t in enumerate(t_replicas) - ] - - # Start master and fill with test data - master.start() - c_master = aioredis.Redis(port=master.port, single_connection_client=True) - await batch_fill_data_async(c_master, data_gen()) - - # Start replica tasks in parallel - tasks = [ - asyncio.create_task(run_sfs_replica( - replica, master, data_gen), name="replica-"+str(replica.port)) - for replica in replicas - ] - - for task in tasks: - assert await task - - await c_master.connection_pool.disconnect() - - -async def run_sfs_replica(replica, master, data_gen): - replica.start() - c_replica = aioredis.Redis( - port=replica.port, single_connection_client=None) - - await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) - - await wait_available_async(c_replica) - await batch_check_data_async(c_replica, data_gen()) - - await c_replica.connection_pool.disconnect() - return True + await asyncio.sleep(0.5) + await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3)) + for c in c_replicas)) """ From aa18ee2ff0ba7afe793c1b998adb91ce5117cdea Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Mon, 14 Nov 2022 17:20:57 +0300 Subject: [PATCH 5/8] feat(server): Add AddOrUpdate to db slice Signed-off-by: Vladislav Oleshko --- src/server/db_slice.cc | 24 ++++++++++++++++++++---- src/server/db_slice.h | 9 +++++++++ src/server/rdb_extensions.h | 2 -- src/server/rdb_load.cc | 7 +------ 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 6ccc841cadd0..69bd71e0336e 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -571,12 +571,13 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, return OpStatus::OK; } -pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false) { +std::pair DbSlice::AddOrUpdateInternal(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms, + bool force_update) noexcept(false) { DCHECK(!obj.IsRef()); pair res = AddOrFind(cntx, key); - if (!res.second) // have not inserted. + if (!res.second && !force_update) // have not inserted. return res; auto& db = *db_arr_[cntx.db_index]; @@ -588,12 +589,27 @@ pair DbSlice::AddEntry(const Context& cntx, string_view key if (expire_at_ms) { it->second.SetExpire(true); uint64_t delta = expire_at_ms - expire_base_[0]; - CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); + auto [eit, inserted] = db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)); + CHECK(inserted || force_update); + if (!inserted) { + eit->second = ExpirePeriod(delta); + } } return res; } +pair DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj, + uint64_t expire_at_ms) noexcept(false) { + return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true); +} + + +pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, + uint64_t expire_at_ms) noexcept(false) { + return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false); +} + size_t DbSlice::DbSize(DbIndex db_ind) const { DCHECK_LT(db_ind, db_array_size()); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 295af37ee2a6..64c6512594d2 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -150,6 +150,11 @@ class DbSlice { std::tuple AddOrFind2(const Context& cntx, std::string_view key) noexcept(false); + // Same as AddEntry, but overwrites in case entry exists. Returns second=true + // if insertion took place. + std::pair AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, + uint64_t expire_at_ms) noexcept(false); + // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. // throws: bad_alloc is insertion could not happen due to out of memory. @@ -285,6 +290,10 @@ class DbSlice { void InvalidateDbWatches(DbIndex db_indx); private: + std::pair AddOrUpdateInternal(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms, + bool force_update) noexcept(false); + void CreateDb(DbIndex index); size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table); diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index 65044db3440f..cc482e9596c6 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -9,5 +9,3 @@ Opcode range 230-240 is used by DF extensions. */ const uint8_t RDB_OPCODE_FULLSYNC_END = 230; - - diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 61a51a6ca5a9..63c1a41fbecc 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1822,12 +1822,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) continue; - auto [fit, _] = db_slice.FindExt(db_cntx, item.key); - if (IsValid(fit)) - db_slice.Del(db_cntx.db_index, fit); - - auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms); - + auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms); if (!added) { LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind; } From d57a17e9597c840d8f332df324f04f81ae08ecdc Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Tue, 15 Nov 2022 01:15:34 +0300 Subject: [PATCH 6/8] fix(server): Small fixes in replication code Signed-off-by: Vladislav Oleshko --- src/redis/rdb.h | 2 ++ src/server/db_slice.cc | 10 +++++----- src/server/db_slice.h | 8 ++++---- src/server/dflycmd.cc | 4 +--- src/server/rdb_extensions.h | 4 ++-- src/server/rdb_save.cc | 2 +- src/server/replica.cc | 9 ++------- src/server/replica.h | 6 +++++- tests/dragonfly/replication_test.py | 8 ++++---- 9 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/redis/rdb.h b/src/redis/rdb.h index 78770e59b517..c9d58eb36689 100644 --- a/src/redis/rdb.h +++ b/src/redis/rdb.h @@ -103,6 +103,8 @@ /* Test if a type is an object type. */ #define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) +/* Range 200-240 is used by Dragonfly specific opcodes */ + /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_FUNCTION 246 /* engine data */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 69bd71e0336e..2b19afa09590 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -571,9 +571,10 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, return OpStatus::OK; } -std::pair DbSlice::AddOrUpdateInternal(const Context& cntx, std::string_view key, - PrimeValue obj, uint64_t expire_at_ms, - bool force_update) noexcept(false) { +std::pair DbSlice::AddOrUpdateInternal(const Context& cntx, + std::string_view key, PrimeValue obj, + uint64_t expire_at_ms, + bool force_update) noexcept(false) { DCHECK(!obj.IsRef()); pair res = AddOrFind(cntx, key); @@ -600,11 +601,10 @@ std::pair DbSlice::AddOrUpdateInternal(const Context& cntx, } pair DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false) { + uint64_t expire_at_ms) noexcept(false) { return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true); } - pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 64c6512594d2..d84cb793d654 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -150,10 +150,10 @@ class DbSlice { std::tuple AddOrFind2(const Context& cntx, std::string_view key) noexcept(false); - // Same as AddEntry, but overwrites in case entry exists. Returns second=true - // if insertion took place. - std::pair AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false); + // Same as AddEntry, but overwrites in case entry exists. + // Returns second=true if insertion took place. + std::pair AddOrUpdate(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 86f1062a0dbf..f7d70a0fff3d 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -105,8 +105,6 @@ void DflyCmd::OnClose(ConnectionContext* cntx) { if (!session_id) return; - VLOG(0) << "Disconnected !!! " << flow_id; - if (flow_id == kuint32max) { DeleteSyncSession(session_id); } else { @@ -290,7 +288,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); string_view sync_id_str = ArgS(args, 2); - VLOG(0) << "Got DFLY STARTSTABLE " << sync_id_str; + VLOG(1) << "Got DFLY STARTSTABLE " << sync_id_str; auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb); if (!sync_id) diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index cc482e9596c6..f41076859c69 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -5,7 +5,7 @@ #pragma once /* -Opcode range 230-240 is used by DF extensions. +Range 200-240 is used by DF extensions. */ -const uint8_t RDB_OPCODE_FULLSYNC_END = 230; +const uint8_t RDB_OPCODE_FULLSYNC_END = 200; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index c8109968c609..ef093aa31a40 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -24,8 +24,8 @@ extern "C" { #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/error.h" -#include "server/snapshot.h" #include "server/rdb_extensions.h" +#include "server/snapshot.h" #include "util/fibers/simple_channel.h" namespace dfly { diff --git a/src/server/replica.cc b/src/server/replica.cc index 66833a10f1cd..611c47eaad80 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -450,10 +450,9 @@ error_code Replica::InitiateDflySync() { // Wait for all flows to receive full sync cut. { - VLOG(0) << "BEFORE FULL SYNC WAIT"; + VLOG(1) << "Blocking before full sync cut"; std::unique_lock lk(sb.mu_); sb.cv_.wait(lk, [&]() { return sb.flows_left == 0; }); - VLOG(0) << "AFTER FULL SYNC WAIT"; } LOG(INFO) << "Full sync finished"; @@ -520,8 +519,6 @@ error_code Replica::ConsumeDflyStream() { for (auto& sub_repl : shard_flows_) sub_repl->sync_fb_.join(); - VLOG(0) << "FULL SYNC FIBERS JOINED"; - AggregateError all_ec; vector> partition = Partition(num_df_flows_); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { @@ -636,7 +633,7 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { } } - // Keep laoder leftover. + // Keep loader leftover. io::Bytes unused = chained_tail.unused_prefix(); if (unused.size() > 0) { leftover_buf_.reset(new base::IoBuf{unused.size()}); @@ -679,8 +676,6 @@ void Replica::StableSyncDflyFb() { ec = ParseAndExecute(&io_buf); } - VLOG(0) << "GOT EC " << ec.message(); - return; } diff --git a/src/server/replica.h b/src/server/replica.h index aaa82b87342d..6c2500306e45 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -46,6 +46,8 @@ class Replica { R_SYNC_OK = 0x10, }; + // A generic barrier that is used for waiting for + // flow fibers to become ready for the stable state switch. struct SyncBlock { SyncBlock(unsigned flows) : flows_left{flows} { } @@ -87,11 +89,13 @@ class Replica { // Start replica initialized as dfly flow. std::error_code StartFullSyncFlow(SyncBlock* block); + // Transition into stable state mode as dfly flow. std::error_code StartStableSyncFlow(); - // Single flow Dragonfly full sync fiber spawned by StartFullSyncFlow. + // Single flow full sync fiber spawned by StartFullSyncFlow. void FullSyncDflyFb(SyncBlock* block, std::string eof_token); + // Single flow stable state sync fiber spawned by StartStableSyncFlow. void StableSyncDflyFb(); private: /* Utility */ diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d412ae0b949f..785ac96f75cb 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2,10 +2,8 @@ import pytest import asyncio import aioredis -import redis from .utility import * -from . import dfly_args BASE_PORT = 1111 @@ -22,6 +20,7 @@ (4, [1] * 12, 10000, 4000), ] + @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases) async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys): @@ -46,7 +45,7 @@ async def stream_data(): """ Stream data during stable state replication phase and afterwards """ gen = gen_test_data(n_stream_keys, seed=2) for chunk in grouper(3, gen): - await c_master.mset({k:v for k,v in chunk}) + await c_master.mset({k: v for k, v in chunk}) async def run_replication(c_replica): await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) @@ -65,7 +64,8 @@ async def check_replication(c_replica): await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas)) - assert not stream_fut.done(), "Weak testcase. Increase number of streamed keys to surpass full sync" + assert not stream_fut.done( + ), "Weak testcase. Increase number of streamed keys to surpass full sync" await stream_fut # Check full sync results From 8f4ff43b26b33701ba535db2b2dee3fdcba173d2 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 16 Nov 2022 15:19:21 +0300 Subject: [PATCH 7/8] fix(server): Small replication code fixes Signed-off-by: Vladislav Oleshko --- src/server/db_slice.cc | 4 ++-- src/server/db_slice.h | 4 ++-- src/server/dflycmd.cc | 2 ++ src/server/generic_family.cc | 2 +- src/server/rdb_extensions.h | 7 ++++--- src/server/rdb_load.h | 4 ++++ src/server/transaction.cc | 1 - 7 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 2b19afa09590..e899929d02d2 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -542,7 +542,7 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const { PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { - auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms); + auto [it, added] = AddOrSkip(cntx, key, std::move(obj), expire_at_ms); CHECK(added); return it; @@ -605,7 +605,7 @@ pair DbSlice::AddOrUpdate(const Context& cntx, string_view return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true); } -pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, +pair DbSlice::AddOrSkip(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index d84cb793d654..0ae2d7e0b2de 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -150,7 +150,7 @@ class DbSlice { std::tuple AddOrFind2(const Context& cntx, std::string_view key) noexcept(false); - // Same as AddEntry, but overwrites in case entry exists. + // Same as AddOrSkip, but overwrites in case entry exists. // Returns second=true if insertion took place. std::pair AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); @@ -158,7 +158,7 @@ class DbSlice { // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddEntry(const Context& cntx, std::string_view key, PrimeValue obj, + std::pair AddOrSkip(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Adds a new entry. Requires: key does not exist in this slice. diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index f7d70a0fff3d..6531d27997a8 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -332,6 +332,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) { SaveMode save_mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD; flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false)); + // Shard can be null for io thread. if (shard != nullptr) { auto ec = sf_->journal()->OpenInThread(false, string_view()); CHECK(!ec); @@ -343,6 +344,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) { } OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { + // Shard can be null for io thread. if (shard != nullptr) { flow->saver->StopSnapshotInShard(shard); } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index c743966a395f..ceb9f22e46ef 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -161,7 +161,7 @@ bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice& return false; } DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()}; - auto [it, added] = db_slice.AddEntry(context, key, std::move(pv), item.expire_ms); + auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms); return added; } diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index f41076859c69..2a9243fd911c 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -4,8 +4,9 @@ #pragma once -/* -Range 200-240 is used by DF extensions. -*/ +// Range 200-240 is used by DF extensions. +// This opcode is sent by the master Dragonfly instance to a replica +// to notify that it finished streaming static data and is ready +// to switch to the stable state replication phase. const uint8_t RDB_OPCODE_FULLSYNC_END = 200; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 5b998e8880d8..b8d04bb6429c 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -172,6 +172,9 @@ class RdbLoader : protected RdbLoaderBase { return load_time_; } + // Set callback for receiving RDB_OPCODE_FULLSYNC_END. + // This opcode is used by a master instance to notify it finished streaming static data + // and is ready to switch to stable state sync. void SetFullSyncCutCb(std::function cb) { full_sync_cut_cb = std::move(cb); } @@ -199,6 +202,7 @@ class RdbLoader : protected RdbLoaderBase { std::error_code ec_; // guarded by mu_ std::atomic_bool stop_early_{false}; + // Callback when receiving RDB_OPCODE_FULLSYNC_END std::function full_sync_cut_cb; }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 848e65a53393..0da74e5988a0 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -814,7 +814,6 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); sd.local_mask |= KEYLOCK_ACQUIRED; - // DCHECK(!lock_acquired); // Because CheckLock above failed. DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); From 2acfe2298241bb7af3bc43ec9aae5e6f80d2982f Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 17 Nov 2022 18:57:05 +0300 Subject: [PATCH 8/8] fix(server): Fix formatting & pre-commit hook Signed-off-by: Vladislav Oleshko --- .pre-commit-config.yaml | 1 + src/server/db_slice.cc | 2 +- src/server/db_slice.h | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0bf3585ba4e1..2384c99e54f6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,5 @@ default_stages: [commit] +exclude: 'src\/redis\/.*' repos: - repo: local hooks: diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index dfadfbf34161..eb1d14abe840 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -606,7 +606,7 @@ pair DbSlice::AddOrUpdate(const Context& cntx, string_view } pair DbSlice::AddOrSkip(const Context& cntx, string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false) { + uint64_t expire_at_ms) noexcept(false) { return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 0ae2d7e0b2de..97c0893018ec 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -158,8 +158,8 @@ class DbSlice { // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddOrSkip(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false); + std::pair AddOrSkip(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry.