Skip to content

Commit

Permalink
feat(server): Implement robust error & cancellation on replica
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Dec 7, 2022
1 parent 74d1839 commit b3a186e
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 93 deletions.
17 changes: 17 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <absl/strings/str_cat.h>
#include <mimalloc.h>

#include <system_error>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
Expand Down Expand Up @@ -250,4 +252,19 @@ std::string GenericError::Format() const {
return absl::StrCat(ec_.message(), ":", details_);
}

void Context::Cancel() {
Error(std::make_error_code(errc::operation_canceled), "Context cancelled");
}

bool Context::Reset(ErrHandler handler) {
std::lock_guard lk{mu_};
if (Cancellation::IsCancelled())
return true;

err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
return false;
}

} // namespace dfly
32 changes: 28 additions & 4 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ static_assert(facade::OpStatus::OK == facade::OpStatus{},
// Re-usable component for signaling cancellation.
// Simple wrapper around atomic flag.
struct Cancellation {
Cancellation() : flag_{false} {}

void Cancel() {
flag_.store(true, std::memory_order_relaxed);
}
Expand All @@ -209,7 +211,7 @@ struct Cancellation {
return flag_.load(std::memory_order_relaxed);
}

private:
protected:
std::atomic_bool flag_;
};

Expand Down Expand Up @@ -246,15 +248,35 @@ using AggregateGenericError = AggregateValue<GenericError>;

// Contest combines Cancellation and AggregateGenericError in one class.
// Allows setting an error_handler to run on errors.
class Context : public Cancellation {
class Context : protected Cancellation {
public:
// The error handler should return false if this error is ignored.
using ErrHandler = std::function<bool(const GenericError&)>;

Context() = default;
Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} {
Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} {
}

operator GenericError() {
std::lock_guard lk(mu_);
return err_;
}

operator std::error_code() {
std::lock_guard lk(mu_);
return err_.GetError();
}

// Cancelling the internal context is only possible through the context directly,
// because it needs to emit an cancellation error.
operator const Cancellation*() {
return this;
}

using Cancellation::IsCancelled;

void Cancel();

template <typename... T> void Error(T... ts) {
std::lock_guard lk{mu_};
if (err_)
Expand All @@ -263,10 +285,12 @@ class Context : public Cancellation {
GenericError new_err{std::forward<T>(ts)...};
if (!err_handler_ || err_handler_(new_err)) {
err_ = std::move(new_err);
Cancel();
Cancellation::Cancel();
}
}

bool Reset(ErrHandler handler);

private:
GenericError err_;
ErrHandler err_handler_;
Expand Down
24 changes: 19 additions & 5 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx,
EngineShard::tlocal());
};
Expand Down Expand Up @@ -283,7 +283,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
EngineShard* shard = EngineShard::tlocal();
FlowInfo* flow = &replica_ptr->flows[index];

Expand Down Expand Up @@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, *cntx, shard);
}

flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
Expand Down Expand Up @@ -383,7 +383,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
return cntx->Error(ec);
}

if (ec = saver->SaveBody(cntx, nullptr); ec) {
if (ec = saver->SaveBody(*cntx, nullptr); ec) {
return cntx->Error(ec);
}

Expand Down Expand Up @@ -532,7 +532,21 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState e
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}

void DflyCmd::Shutdown() {
vector<pair<uint32_t, shared_ptr<ReplicaInfo>>> pending;

// Copy all sync infos to prevent blocking.
{
std::lock_guard lk(mu_);
pending.resize(replica_infos_.size());
std::copy(replica_infos_.begin(), replica_infos_.end(), pending.begin());
}

for (auto [sync_id, replica_ptr]: pending) {
CancelReplication(sync_id, replica_ptr);
}
}

void DflyCmd::FlowInfo::TryShutdownSocket() {
Expand Down
4 changes: 3 additions & 1 deletion src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ class DflyCmd {

void OnClose(ConnectionContext* cntx);

// Stop all background processes so we can exit in orderly manner.
void BreakOnShutdown();

// Stop all background processes so we can exit in orderly manner.
void Shutdown();

// Create new sync session.
uint32_t CreateSyncSession();

Expand Down
Loading

0 comments on commit b3a186e

Please sign in to comment.