Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement robust error & cancellation on replica #531

Merged
merged 6 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <absl/strings/str_cat.h>
#include <mimalloc.h>

#include <system_error>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
Expand Down Expand Up @@ -240,6 +242,14 @@ bool ScanOpts::Matches(std::string_view val_name) const {
return stringmatchlen(pattern.data(), pattern.size(), val_name.data(), val_name.size(), 0) == 1;
}

GenericError::operator std::error_code() const {
return ec_;
}

GenericError::operator bool() const {
return bool(ec_);
}

std::string GenericError::Format() const {
if (!ec_)
return "";
Expand All @@ -250,4 +260,31 @@ std::string GenericError::Format() const {
return absl::StrCat(ec_.message(), ":", details_);
}

GenericError Context::GetError() {
std::lock_guard lk(mu_);
return err_;
}

const Cancellation* Context::GetCancellation() const {
return this;
}

void Context::Cancel() {
Error(std::make_error_code(errc::operation_canceled), "Context cancelled");
}

void Context::Reset(ErrHandler handler) {
std::lock_guard lk{mu_};
err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
}

GenericError Context::Switch(ErrHandler handler) {
std::lock_guard lk{mu_};
if (!err_)
err_handler_ = std::move(handler);
return err_;
}

} // namespace dfly
94 changes: 60 additions & 34 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ template <typename RandGen> std::string GetRandomHex(RandGen& gen, size_t len) {
}

// AggregateValue is a thread safe utility to store the first
// non-default value.
// truthy value;
template <typename T> struct AggregateValue {
bool operator=(T val) {
std::lock_guard l{mu_};
if (current_ == T{} && val != T{}) {
if (!bool(current_) && bool(val)) {
current_ = val;
}
return val != T{};
return bool(val);
}

T operator*() {
Expand All @@ -184,23 +184,27 @@ template <typename T> struct AggregateValue {
}

operator bool() {
return **this != T{};
return bool(**this);
}

private:
::boost::fibers::mutex mu_{};
T current_{};
};

// Thread safe utility to store the first non null error.
using AggregateError = AggregateValue<std::error_code>;

// Thread safe utility to store the first non OK status.
using AggregateStatus = AggregateValue<facade::OpStatus>;
static_assert(facade::OpStatus::OK == facade::OpStatus{},
"Default intitialization should be OK value");
static_assert(bool(facade::OpStatus::OK) == false,
"Default intitialization should be a falsy OK value");

// Re-usable component for signaling cancellation.
// Simple wrapper around atomic flag.
// Simple wrapper interface around atomic cancellation flag.
struct Cancellation {
Cancellation() : flag_{false} {
}

void Cancel() {
flag_.store(true, std::memory_order_relaxed);
}
Expand All @@ -209,7 +213,7 @@ struct Cancellation {
return flag_.load(std::memory_order_relaxed);
}

private:
protected:
std::atomic_bool flag_;
};

Expand All @@ -222,51 +226,73 @@ class GenericError {
GenericError(std::error_code ec, std::string details) : ec_{ec}, details_{std::move(details)} {
}

std::error_code GetError() const {
return ec_;
}
operator std::error_code() const;
operator bool() const;

const std::string& GetDetails() const {
return details_;
}

operator bool() const {
return bool(ec_);
}

// Get string representation of error.
std::string Format() const;
std::string Format() const; // Get string representation of error.

private:
std::error_code ec_;
std::string details_;
};

// Thread safe utility to store the first non null generic error.
using AggregateGenericError = AggregateValue<GenericError>;

// Contest combines Cancellation and AggregateGenericError in one class.
// Allows setting an error_handler to run on errors.
class Context : public Cancellation {
// Context is a utility for managing error reporting and cancellation for complex tasks.
//
// When submitting an error with `Error`, only the first is stored (as in aggregate values).
// Then a special error handler is run, if present, and the context is cancelled.
//
// Manual cancellation with `Cancel` is simulated by reporting an `errc::operation_canceled` error.
// This allows running the error handler and representing this scenario as an error.
class Context : protected Cancellation {
public:
// The error handler should return false if this error is ignored.
using ErrHandler = std::function<bool(const GenericError&)>;
using ErrHandler = std::function<void(const GenericError&)>;

Context() = default;
Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} {
Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} {
}

template <typename... T> void Error(T... ts) {
// Cancels the context by submitting an `errc::operation_canceled` error.
void Cancel();
using Cancellation::IsCancelled;

const Cancellation* GetCancellation() const;

GenericError GetError();

// Report an error by submitting arguments for GenericError.
// If this is the first error that occured, then the error handler is run
// and the context is cancelled.
//
// Note: this function blocks when called from inside an error handler.
template <typename... T> GenericError Error(T... ts) {
std::lock_guard lk{mu_};
if (err_)
return;
return err_;

GenericError new_err{std::forward<T>(ts)...};
if (!err_handler_ || err_handler_(new_err)) {
err_ = std::move(new_err);
Cancel();
}
if (err_handler_)
err_handler_(new_err);

err_ = std::move(new_err);
Cancellation::Cancel();

return err_;
}

// Reset error and cancellation flag, assign new error handler.
void Reset(ErrHandler handler);

// Atomically replace the error handler if no error is present, and return the
// current stored error. This function can be used to transfer cleanup responsibility safely
//
// Beware, never do this manually in two steps. If you check for cancellation,
// set the error handler and initialize resources, then the new error handler
// will never run if the context was cancelled between the first two steps.
GenericError Switch(ErrHandler handler);

private:
GenericError err_;
ErrHandler err_handler_;
Expand Down
33 changes: 23 additions & 10 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
// Use explicit assignment for replica_ptr, because capturing structured bindings is C++20.
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx,
EngineShard::tlocal());
};
Expand Down Expand Up @@ -283,7 +284,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
EngineShard* shard = EngineShard::tlocal();
FlowInfo* flow = &replica_ptr->flows[index];

Expand Down Expand Up @@ -325,7 +326,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
}

flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
Expand Down Expand Up @@ -380,16 +381,19 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
}

if (ec) {
return cntx->Error(ec);
cntx->Error(ec);
return;
}

if (ec = saver->SaveBody(cntx, nullptr); ec) {
return cntx->Error(ec);
if (ec = saver->SaveBody(cntx->GetCancellation(), nullptr); ec) {
cntx->Error(ec);
return;
}

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
return cntx->Error(ec);
cntx->Error(ec);
return;
}
}

Expand All @@ -405,8 +409,6 @@ uint32_t DflyCmd::CreateSyncSession() {
// StopReplication needs to run async to prevent blocking
// the error handler.
::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach();

return true; // Cancel context
};

auto replica_ptr = make_shared<ReplicaInfo>(flow_count, std::move(err_handler));
Expand Down Expand Up @@ -532,7 +534,18 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState e
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}

void DflyCmd::Shutdown() {
ReplicaInfoMap pending;
{
std::lock_guard lk(mu_);
pending = std::move(replica_infos_);
}

for (auto [sync_id, replica_ptr] : pending) {
CancelReplication(sync_id, replica_ptr);
}
}

void DflyCmd::FlowInfo::TryShutdownSocket() {
Expand Down
8 changes: 6 additions & 2 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ class DflyCmd {

void OnClose(ConnectionContext* cntx);

// Stop all background processes so we can exit in orderly manner.
void BreakOnShutdown();
romange marked this conversation as resolved.
Show resolved Hide resolved

// Stop all background processes so we can exit in orderly manner.
void Shutdown();

// Create new sync session.
uint32_t CreateSyncSession();

Expand Down Expand Up @@ -185,7 +187,9 @@ class DflyCmd {
TxId journal_txid_ = 0;

uint32_t next_sync_id_ = 1;
absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>> replica_infos_;

using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;

::boost::fibers::mutex mu_; // Guard global operations. See header top for locking levels.
};
Expand Down
3 changes: 2 additions & 1 deletion src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <atomic>
#include <string>
#include <system_error>

#include "facade/error.h"

Expand All @@ -21,7 +22,7 @@ using facade::kWrongTypeErr;

#define RETURN_ON_ERR(x) \
do { \
auto __ec = (x); \
std::error_code __ec = (x); \
if (__ec) { \
LOG(ERROR) << "Error " << __ec << " while calling " #x; \
return __ec; \
Expand Down
Loading