Skip to content

Commit

Permalink
feat(server): Implement robust error & cancellation on replica
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Dec 7, 2022
1 parent 74d1839 commit 97b146e
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 124 deletions.
36 changes: 36 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <absl/strings/str_cat.h>
#include <mimalloc.h>

#include <system_error>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
Expand Down Expand Up @@ -250,4 +252,38 @@ std::string GenericError::Format() const {
return absl::StrCat(ec_.message(), ":", details_);
}

Context::operator dfly::GenericError() {
std::lock_guard lk(mu_);
return err_;
}

Context::operator std::error_code() {
std::lock_guard lk(mu_);
return err_.GetError();
}

Context::operator const dfly::Cancellation*() {
return this;
}

void Context::Cancel() {
Error(std::make_error_code(errc::operation_canceled), "Context cancelled");
}

void Context::Reset(ErrHandler handler) {
std::lock_guard lk{mu_};
err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
}

bool Context::Switch(ErrHandler handler) {
std::lock_guard lk{mu_};
if (Cancellation::IsCancelled())
return true;

err_handler_ = std::move(handler);
return false;
}

} // namespace dfly
64 changes: 47 additions & 17 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,19 @@ template <typename T> struct AggregateValue {
T current_{};
};

// Thread safe utility to store the first non null error.
using AggregateError = AggregateValue<std::error_code>;

// Thread safe utility to store the first non OK status.
using AggregateStatus = AggregateValue<facade::OpStatus>;
static_assert(facade::OpStatus::OK == facade::OpStatus{},
"Default intitialization should be OK value");

// Re-usable component for signaling cancellation.
// Simple wrapper around atomic flag.
// Simple wrapper interface around atomic cancellation flag.
struct Cancellation {
Cancellation() : flag_{false} {
}

void Cancel() {
flag_.store(true, std::memory_order_relaxed);
}
Expand All @@ -209,7 +213,7 @@ struct Cancellation {
return flag_.load(std::memory_order_relaxed);
}

private:
protected:
std::atomic_bool flag_;
};

Expand All @@ -226,10 +230,6 @@ class GenericError {
return ec_;
}

const std::string& GetDetails() const {
return details_;
}

operator bool() const {
return bool(ec_);
}
Expand All @@ -242,31 +242,61 @@ class GenericError {
std::string details_;
};

// Thread safe utility to store the first non null generic error.
using AggregateGenericError = AggregateValue<GenericError>;

// Contest combines Cancellation and AggregateGenericError in one class.
// Allows setting an error_handler to run on errors.
class Context : public Cancellation {
// Context is a utility for managing error reporting and cancellation for complex tasks.
//
// When submitting an error with `Error`, only the first is stored (as in aggregate values).
// Then a special error handler is run, if present, and the context is cancelled.
//
// Manual cancellation with `Cancel` is simulated by reporting an `errc::operation_canceled` error.
// This allows running the error handler and representing this scenario as an error.
class Context : protected Cancellation {
public:
// The error handler should return false if this error is ignored.
using ErrHandler = std::function<bool(const GenericError&)>;
using ErrHandler = std::function<void(const GenericError&)>;

Context() = default;
Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} {
Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} {
}

operator GenericError();
operator std::error_code();

void Cancel(); // Cancels the context by submitting an `errc::operation_canceled` error.
using Cancellation::IsCancelled;
operator const Cancellation*();

// Report an error by submitting arguments for GenericError.
// If this is the first error that occured, then the error handler is run
// and the context is cancelled.
//
// Note: this function blocks when called from inside an error handler.
template <typename... T> void Error(T... ts) {
std::lock_guard lk{mu_};
if (err_)
return;

GenericError new_err{std::forward<T>(ts)...};
if (!err_handler_ || err_handler_(new_err)) {
err_ = std::move(new_err);
Cancel();
}
if (err_handler_)
err_handler_(new_err);

err_ = std::move(new_err);
Cancellation::Cancel();
}

// Reset the error and cancellation flag, assign the new error handler.
void Reset(ErrHandler handler);

// Check for cancellation and replace the error handler atomically.
// Returns whether the context is cancelled. This function can be used
// to transfer cleanup resposibility safely.
//
// Beware, never do this manually in two steps. If you check for cancellation,
// set the error handler and initialize resources, then the new error handler
// will never run if the context was cancelled beteween the first two steps.
bool Switch(ErrHandler handler);

private:
GenericError err_;
ErrHandler err_handler_;
Expand Down
26 changes: 19 additions & 7 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx,
EngineShard::tlocal());
};
Expand Down Expand Up @@ -283,7 +283,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
EngineShard* shard = EngineShard::tlocal();
FlowInfo* flow = &replica_ptr->flows[index];

Expand Down Expand Up @@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, *cntx, shard);
}

flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
Expand Down Expand Up @@ -383,7 +383,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
return cntx->Error(ec);
}

if (ec = saver->SaveBody(cntx, nullptr); ec) {
if (ec = saver->SaveBody(*cntx, nullptr); ec) {
return cntx->Error(ec);
}

Expand All @@ -405,8 +405,6 @@ uint32_t DflyCmd::CreateSyncSession() {
// StopReplication needs to run async to prevent blocking
// the error handler.
::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach();

return true; // Cancel context
};

auto replica_ptr = make_shared<ReplicaInfo>(flow_count, std::move(err_handler));
Expand Down Expand Up @@ -532,7 +530,21 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState e
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}

void DflyCmd::Shutdown() {
vector<pair<uint32_t, shared_ptr<ReplicaInfo>>> pending;

// Copy all sync infos to prevent blocking.
{
std::lock_guard lk(mu_);
pending.resize(replica_infos_.size());
std::copy(replica_infos_.begin(), replica_infos_.end(), pending.begin());
}

for (auto [sync_id, replica_ptr] : pending) {
CancelReplication(sync_id, replica_ptr);
}
}

void DflyCmd::FlowInfo::TryShutdownSocket() {
Expand Down
4 changes: 3 additions & 1 deletion src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ class DflyCmd {

void OnClose(ConnectionContext* cntx);

// Stop all background processes so we can exit in orderly manner.
void BreakOnShutdown();

// Stop all background processes so we can exit in orderly manner.
void Shutdown();

// Create new sync session.
uint32_t CreateSyncSession();

Expand Down
3 changes: 2 additions & 1 deletion src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <atomic>
#include <string>
#include <system_error>

#include "facade/error.h"

Expand All @@ -21,7 +22,7 @@ using facade::kWrongTypeErr;

#define RETURN_ON_ERR(x) \
do { \
auto __ec = (x); \
std::error_code __ec = (x); \
if (__ec) { \
LOG(ERROR) << "Error " << __ec << " while calling " #x; \
return __ec; \
Expand Down
Loading

0 comments on commit 97b146e

Please sign in to comment.