diff --git a/src/server/common.cc b/src/server/common.cc index 62046a5c4c6b..2030bf505e42 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -9,6 +9,8 @@ #include #include +#include + extern "C" { #include "redis/object.h" #include "redis/rdb.h" @@ -250,4 +252,19 @@ std::string GenericError::Format() const { return absl::StrCat(ec_.message(), ":", details_); } +void Context::Cancel() { + Error(std::make_error_code(errc::operation_canceled), "Context cancelled"); +} + +bool Context::Reset(ErrHandler handler) { + std::lock_guard lk{mu_}; + if (Cancellation::IsCancelled()) + return true; + + err_ = {}; + err_handler_ = std::move(handler); + Cancellation::flag_.store(false, std::memory_order_relaxed); + return false; +} + } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index e0e74bdfbfb5..fead2fe98efe 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -201,6 +201,8 @@ static_assert(facade::OpStatus::OK == facade::OpStatus{}, // Re-usable component for signaling cancellation. // Simple wrapper around atomic flag. struct Cancellation { + Cancellation() : flag_{false} {} + void Cancel() { flag_.store(true, std::memory_order_relaxed); } @@ -209,7 +211,7 @@ struct Cancellation { return flag_.load(std::memory_order_relaxed); } - private: + protected: std::atomic_bool flag_; }; @@ -246,15 +248,35 @@ using AggregateGenericError = AggregateValue; // Contest combines Cancellation and AggregateGenericError in one class. // Allows setting an error_handler to run on errors. -class Context : public Cancellation { +class Context : protected Cancellation { public: // The error handler should return false if this error is ignored. using ErrHandler = std::function; Context() = default; - Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} { + Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} { + } + + operator GenericError() { + std::lock_guard lk(mu_); + return err_; + } + + operator std::error_code() { + std::lock_guard lk(mu_); + return err_.GetError(); } + // Cancelling the internal context is only possible through the context directly, + // because it needs to emit an cancellation error. + operator const Cancellation*() { + return this; + } + + using Cancellation::IsCancelled; + + void Cancel(); + template void Error(T... ts) { std::lock_guard lk{mu_}; if (err_) @@ -263,10 +285,12 @@ class Context : public Cancellation { GenericError new_err{std::forward(ts)...}; if (!err_handler_ || err_handler_(new_err)) { err_ = std::move(new_err); - Cancel(); + Cancellation::Cancel(); } } + bool Reset(ErrHandler handler); + private: GenericError err_; ErrHandler err_handler_; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 544119203fb6..824b3d080b6c 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -250,7 +250,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { TransactionGuard tg{cntx->transaction}; AggregateStatus status; - auto cb = [this, &status, replica_ptr](unsigned index, auto*) { + auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) { status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx, EngineShard::tlocal()); }; @@ -283,7 +283,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { TransactionGuard tg{cntx->transaction}; AggregateStatus status; - auto cb = [this, &status, replica_ptr](unsigned index, auto*) { + auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) { EngineShard* shard = EngineShard::tlocal(); FlowInfo* flow = &replica_ptr->flows[index]; @@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha // Shard can be null for io thread. if (shard != nullptr) { CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode. - flow->saver->StartSnapshotInShard(true, cntx, shard); + flow->saver->StartSnapshotInShard(true, *cntx, shard); } flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx); @@ -383,7 +383,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) { return cntx->Error(ec); } - if (ec = saver->SaveBody(cntx, nullptr); ec) { + if (ec = saver->SaveBody(*cntx, nullptr); ec) { return cntx->Error(ec); } @@ -532,7 +532,21 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState e } void DflyCmd::BreakOnShutdown() { - VLOG(1) << "BreakOnShutdown"; +} + +void DflyCmd::Shutdown() { + vector>> pending; + + // Copy all sync infos to prevent blocking. + { + std::lock_guard lk(mu_); + pending.resize(replica_infos_.size()); + std::copy(replica_infos_.begin(), replica_infos_.end(), pending.begin()); + } + + for (auto [sync_id, replica_ptr]: pending) { + CancelReplication(sync_id, replica_ptr); + } } void DflyCmd::FlowInfo::TryShutdownSocket() { diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index f05a4c9bfae2..aed5fb8bd975 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -118,9 +118,11 @@ class DflyCmd { void OnClose(ConnectionContext* cntx); - // Stop all background processes so we can exit in orderly manner. void BreakOnShutdown(); + // Stop all background processes so we can exit in orderly manner. + void Shutdown(); + // Create new sync session. uint32_t CreateSyncSession(); diff --git a/src/server/replica.cc b/src/server/replica.cc index c5562ea7c325..9a3d56bb64f2 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -7,6 +7,7 @@ extern "C" { #include "redis/rdb.h" } +#include #include #include #include @@ -134,7 +135,14 @@ bool Replica::Start(ConnectionContext* cntx) { return false; } - // 3. Spawn main coordination fiber. + // 3. Init basic context, which is used only for cancellation. + // Full and stable sync context will be used for error propagation as well. + if (cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this))) { + (*cntx)->SendError(StrCat("Cancelled on setup")); + return false; + } + + // 4. Spawn main coordination fiber. sync_fb_ = ::boost::fibers::fiber(&Replica::MainReplicationFb, this); (*cntx)->SendOk(); @@ -142,24 +150,17 @@ bool Replica::Start(ConnectionContext* cntx) { } void Replica::Stop() { + // Mark disabled, prevent from retrying. if (sock_) { sock_->proactor()->Await([this] { state_mask_ = 0; // Specifically ~R_ENABLED. - auto ec = sock_->Shutdown(SHUT_RDWR); - LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; + cntx_.Cancel(); // Context is fully resposible. }); } - // Close sub flows. - auto partition = Partition(num_df_flows_); - shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { - for (auto id : partition[index]) { - shard_flows_[id]->Stop(); - } - }); - - if (sync_fb_.joinable()) - sync_fb_.join(); + // Make sure the replica fully stopped and did all cleanups, + // so we can freely release resources (connections) + sync_fb_.join(); } void Replica::Pause(bool pause) { @@ -169,6 +170,9 @@ void Replica::Pause(bool pause) { void Replica::MainReplicationFb() { error_code ec; while (state_mask_ & R_ENABLED) { + // Switch to a fresh context. + cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this)); + // 1. Connect socket. if ((state_mask_ & R_TCP_CONNECTED) == 0) { this_fiber::sleep_for(500ms); @@ -431,43 +435,60 @@ error_code Replica::InitiateDflySync() { shard_flows_[i].reset(new Replica(master_context_, i, &service_)); } - SyncBlock sb{num_df_flows_}; + // Allocate shared, because the error handler might outlive the whole task. + auto sync_block = std::make_shared(); + + // Init context. + { + auto err_handler = [this, sync_block](const auto& ge) { + // Unblock sync block. + { + lock_guard lk(sync_block->mu_); + sync_block->flows_done = num_df_flows_; + } + sync_block->cv_.notify_all(); - AggregateError ec; + // Close sockets to unblock flows in case of cancellation. + CloseAllSockets(); + return true; + }; + if (cntx_.Reset(std::move(err_handler))) + return cntx_; + } + + // Start full sync flows. auto partition = Partition(num_df_flows_); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { for (auto id : partition[index]) { - if ((ec = shard_flows_[id]->StartFullSyncFlow(&sb))) - break; + auto ec = shard_flows_[id]->StartFullSyncFlow(sync_block.get(), &cntx_); + if (ec) + cntx_.Error(ec); } }); - RETURN_ON_ERR(*ec); - - ReqSerializer serializer{sock_.get()}; - - // Master waits for this command in order to start sending replication stream. - RETURN_ON_ERR(SendCommand(StrCat("DFLY SYNC ", master_context_.dfly_session_id), &serializer)); - - base::IoBuf io_buf{128}; - unsigned consumed = 0; - RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed)); - if (!CheckRespIsSimpleReply("OK")) { - LOG(ERROR) << "Sync failed " << ToSV(io_buf.InputBuffer()); - return make_error_code(errc::bad_message); + // Send DFLY SYNC. + if (auto ec = SendNextPhaseRequest(); ec) { + cntx_.Error(ec); } // Wait for all flows to receive full sync cut. - { - VLOG(1) << "Blocking before full sync cut"; - std::unique_lock lk(sb.mu_); - sb.cv_.wait(lk, [&]() { return sb.flows_left == 0; }); + // In case of an error, this is unblocked by the error handler. + if (!cntx_.IsCancelled()) { + LOG(INFO) << "Waiting for all full sync cut confirmations"; + std::unique_lock lk(sync_block->mu_); + sync_block->cv_.wait(lk, [&]() { return sync_block->flows_done >= num_df_flows_; }); + } + + // If we had an error, then we must guarantee that JoinAllFlows has run. + // So we fetch the error only with once access. + if (std::error_code ec = cntx_; ec) { + JoinAllFlows(); + return ec; + } else { + LOG(INFO) << "Full sync finished"; + state_mask_ |= R_SYNC_OK; + return {}; } - - LOG(INFO) << "Full sync finished"; - state_mask_ |= R_SYNC_OK; - - return error_code{}; } error_code Replica::ConsumeRedisStream() { @@ -517,43 +538,82 @@ error_code Replica::ConsumeRedisStream() { } error_code Replica::ConsumeDflyStream() { - // Request master to transition to stable sync. - { - ReqSerializer serializer{sock_.get()}; - serializer.SendCommand(StrCat("DFLY STARTSTABLE ", master_context_.dfly_session_id)); - RETURN_ON_ERR(serializer.ec()); + // Send DFLY STARTSTABLE. + if (auto ec = SendNextPhaseRequest(); ec) { + cntx_.Error(ec); + // Don't return because we must join flows. } // Wait for all flows to finish full sync. - for (auto& sub_repl : shard_flows_) - sub_repl->sync_fb_.join(); + JoinAllFlows(); + + // Replace full sync cleanup handler. + if (cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this))) { + return cntx_; + } - AggregateError all_ec; vector> partition = Partition(num_df_flows_); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { const auto& local_ids = partition[index]; for (unsigned id : local_ids) { - all_ec = shard_flows_[id]->StartStableSyncFlow(); - if (all_ec) - break; + auto ec = shard_flows_[id]->StartStableSyncFlow(&cntx_); + if (ec) + cntx_.Error(ec); } }); - RETURN_ON_ERR(*all_ec); + // Wait for all shard flows to join. + JoinAllFlows(); - base::IoBuf io_buf(16_KB); - std::error_code ec; - while (!ec) { - io::MutableBytes buf = io_buf.AppendBuffer(); - io::Result size_res = sock_->Recv(buf); - if (!size_res) - return size_res.error(); + return cntx_; +} + +void Replica::CloseAllSockets() { + if (sock_) { + sock_->proactor()->Await([this] { + auto ec = sock_->Shutdown(SHUT_RDWR); + LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; + }); } - return error_code{}; + for (auto& flow : shard_flows_) { + flow->CloseAllSockets(); + } +} + +void Replica::JoinAllFlows() { + for (auto& flow : shard_flows_) { + if (flow->sync_fb_.joinable()) { + flow->sync_fb_.join(); + } + } +} + +bool Replica::DefaultErrorHandler(const GenericError& err) { + CloseAllSockets(); + return true; +} + +error_code Replica::SendNextPhaseRequest() { + ReqSerializer serializer{sock_.get()}; + + // Ask master to start sending replication stream + string request = (state_mask_ & R_SYNC_OK) ? "STARTSTABLE" : "SYNC"; + RETURN_ON_ERR( + SendCommand(StrCat("DFLY ", request, " ", master_context_.dfly_session_id), &serializer)); + + base::IoBuf io_buf{128}; + unsigned consumed = 0; + RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed)); + if (!CheckRespIsSimpleReply("OK")) { + LOG(ERROR) << "Phase transition failed " << ToSV(io_buf.InputBuffer()); + return make_error_code(errc::bad_message); + } + + return std::error_code{}; } -error_code Replica::StartFullSyncFlow(SyncBlock* sb) { +error_code Replica::StartFullSyncFlow(SyncBlock* sb, Context* cntx) { CHECK(!sock_); DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); @@ -595,12 +655,12 @@ error_code Replica::StartFullSyncFlow(SyncBlock* sb) { // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, sb, move(eof_token)); + sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx); return error_code{}; } -error_code Replica::StartStableSyncFlow() { +error_code Replica::StartStableSyncFlow(Context* cntx) { DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); @@ -608,12 +668,12 @@ error_code Replica::StartStableSyncFlow() { CHECK(sock_->IsOpen()); // sock_.reset(mythread->CreateSocket()); // RETURN_ON_ERR(sock_->Connect(master_context_.master_ep)); - sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this); + sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this, cntx); return std::error_code{}; } -void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { +void Replica::FullSyncDflyFb(string eof_token, SyncBlock* sb, Context* cntx) { DCHECK(leftover_buf_); SocketSource ss{sock_.get()}; io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss}; @@ -622,12 +682,15 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { loader.SetFullSyncCutCb([sb, ran = false]() mutable { if (!ran) { std::unique_lock lk(sb->mu_); - sb->flows_left--; + sb->flows_done++; ran = true; sb->cv_.notify_all(); } }); - loader.Load(&ps); + + // Load incoming rdb stream. + if (std::error_code ec = loader.Load(&ps); ec) + return cntx->Error(ec, "Error loading rdb format"); // Try finding eof token. io::PrefixSource chained_tail{loader.Leftover(), &ps}; @@ -638,7 +701,8 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { chained_tail.ReadAtLeast(io::MutableBytes{buf.get(), eof_token.size()}, eof_token.size()); if (!res || *res != eof_token.size()) { - LOG(ERROR) << "Error finding eof token in the stream"; + return cntx->Error(std::make_error_code(errc::protocol_error), + "Error finding eof token in stream"); } } @@ -656,7 +720,7 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes"; } -void Replica::StableSyncDflyFb() { +void Replica::StableSyncDflyFb(Context* cntx) { base::IoBuf io_buf(16_KB); parser_.reset(new RedisParser); @@ -668,24 +732,22 @@ void Replica::StableSyncDflyFb() { leftover_buf_.reset(); } - error_code ec; string ack_cmd; - while (!ec) { + while (!cntx->IsCancelled()) { io::MutableBytes buf = io_buf.AppendBuffer(); io::Result size_res = sock_->Recv(buf); if (!size_res) - return; + return cntx->Error(size_res.error()); last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); io_buf.CommitWrite(*size_res); repl_offs_ += *size_res; - ec = ParseAndExecute(&io_buf); + if (auto ec = ParseAndExecute(&io_buf); ec) + return cntx->Error(ec); } - - return; } error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) { diff --git a/src/server/replica.h b/src/server/replica.h index 6c2500306e45..889794169369 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -11,6 +11,7 @@ #include "base/io_buf.h" #include "facade/facade_types.h" #include "facade/redis_parser.h" +#include "server/common.h" #include "util/fiber_socket_base.h" namespace facade { @@ -49,11 +50,9 @@ class Replica { // A generic barrier that is used for waiting for // flow fibers to become ready for the stable state switch. struct SyncBlock { - SyncBlock(unsigned flows) : flows_left{flows} { - } - unsigned flows_left; - ::boost::fibers::mutex mu_; - ::boost::fibers::condition_variable cv_; + unsigned flows_done{0}; + ::boost::fibers::mutex mu_{}; + ::boost::fibers::condition_variable cv_{}; }; public: @@ -82,21 +81,28 @@ class Replica { std::error_code ConsumeRedisStream(); // Redis stable state. std::error_code ConsumeDflyStream(); // Dragonfly stable state. + void CloseAllSockets(); // Close all sockets. + void JoinAllFlows(); // Join all flows if possible. + + std::error_code SendNextPhaseRequest(); // Send DFLY SYNC or DFLY STARTSTABLE. + + bool DefaultErrorHandler(const GenericError& err); + private: /* Main dlfly flow mode functions */ // Initialize as single dfly flow. Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service); // Start replica initialized as dfly flow. - std::error_code StartFullSyncFlow(SyncBlock* block); + std::error_code StartFullSyncFlow(SyncBlock* block, Context* cntx); // Transition into stable state mode as dfly flow. - std::error_code StartStableSyncFlow(); + std::error_code StartStableSyncFlow(Context* cntx); // Single flow full sync fiber spawned by StartFullSyncFlow. - void FullSyncDflyFb(SyncBlock* block, std::string eof_token); + void FullSyncDflyFb(std::string eof_token, SyncBlock* block, Context* cntx); // Single flow stable state sync fiber spawned by StartStableSyncFlow. - void StableSyncDflyFb(); + void StableSyncDflyFb(Context* cntx); private: /* Utility */ struct PSyncResponse { @@ -165,6 +171,8 @@ class Replica { facade::RespVec resp_args_; facade::CmdArgVec cmd_str_args_; + Context cntx_; // context for tasks in replica. + // repl_offs - till what offset we've already read from the master. // ack_offs_ last acknowledged offset. size_t repl_offs_ = 0, ack_offs_ = 0; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dcb1d6ac3d7f..7dcdefe8b770 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -420,6 +420,8 @@ void ServerFamily::Shutdown() { if (replica_) { replica_->Stop(); } + + dfly_cmd_->Shutdown(); }); } diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 4f65916a2d24..754586ce84f6 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -313,6 +313,7 @@ unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bu void SliceSnapshot::PushFileToChannel(io::StringFile* sfile, DbIndex db_index, unsigned num_records, bool should_compress) { + should_compress = false; string string_to_push = std::move(sfile->val); if (should_compress) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index b0eab05276d8..520d4a830455 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -226,3 +226,57 @@ async def disconnect(replica, c_replica, crash_type): # Check master survived all disconnects assert await c_master.ping() + + +""" +Test crashing master and letting replicas re-connect to it. +""" + +master_crash_cases = [ + (8, [8], 100), + (6, [6, 6, 6], 500), + (4, [2] * 8, 500), +] + + +@pytest.mark.asyncio +@pytest.mark.parametrize("t_master, t_replicas, n_keys", master_crash_cases) +async def test_master_crash(df_local_factory, t_master, t_replicas, n_keys): + master = df_local_factory.create(port=1111, proactor_threads=t_master) + replicas = [ + df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) + for i, t in enumerate(t_replicas) + ] + + master.start() + for replica in replicas: + replica.start() + + c_master = aioredis.Redis(port=master.port) + c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] + + # Fill master with test data + await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=0)) + + # Do full sync + async def full_sync(c_replica): + await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) + await wait_available_async(c_replica) + + await asyncio.gather(*(full_sync(c) for c in c_replicas)) + + # Crash master + master.stop(kill=True) + + # Start master + master.start() + + # Fill data with new seed\ + c_master = aioredis.Redis(port=master.port) + await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1)) + + # Check replicas received it + await asyncio.sleep(0.5 * len(replicas)) # this takes, time, really + for c_replica in c_replicas: + await wait_available_async(c_replica) + await batch_check_data_async(c_replica, gen_test_data(n_keys, seed=1))