Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement robust error & cancellation on replica #531

Merged
merged 6 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <absl/strings/str_cat.h>
#include <mimalloc.h>

#include <system_error>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
Expand Down Expand Up @@ -250,4 +252,38 @@ std::string GenericError::Format() const {
return absl::StrCat(ec_.message(), ":", details_);
}

Context::operator dfly::GenericError() {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard lk(mu_);
return err_;
}

Context::operator std::error_code() {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard lk(mu_);
return err_.GetError();
}

Context::operator const dfly::Cancellation*() {
return this;
}

void Context::Cancel() {
Error(std::make_error_code(errc::operation_canceled), "Context cancelled");
}

void Context::Reset(ErrHandler handler) {
std::lock_guard lk{mu_};
err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
}

bool Context::Switch(ErrHandler handler) {
std::lock_guard lk{mu_};
if (Cancellation::IsCancelled())
return true;

err_handler_ = std::move(handler);
return false;
}

} // namespace dfly
64 changes: 47 additions & 17 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,19 @@ template <typename T> struct AggregateValue {
T current_{};
};

// Thread safe utility to store the first non null error.
using AggregateError = AggregateValue<std::error_code>;

// Thread safe utility to store the first non OK status.
using AggregateStatus = AggregateValue<facade::OpStatus>;
static_assert(facade::OpStatus::OK == facade::OpStatus{},
"Default intitialization should be OK value");

// Re-usable component for signaling cancellation.
// Simple wrapper around atomic flag.
// Simple wrapper interface around atomic cancellation flag.
struct Cancellation {
Cancellation() : flag_{false} {
}

void Cancel() {
flag_.store(true, std::memory_order_relaxed);
}
Expand All @@ -209,7 +213,7 @@ struct Cancellation {
return flag_.load(std::memory_order_relaxed);
}

private:
protected:
std::atomic_bool flag_;
};

Expand All @@ -226,10 +230,6 @@ class GenericError {
return ec_;
}

const std::string& GetDetails() const {
return details_;
}

operator bool() const {
return bool(ec_);
}
Expand All @@ -242,31 +242,61 @@ class GenericError {
std::string details_;
};

// Thread safe utility to store the first non null generic error.
using AggregateGenericError = AggregateValue<GenericError>;

// Contest combines Cancellation and AggregateGenericError in one class.
// Allows setting an error_handler to run on errors.
class Context : public Cancellation {
// Context is a utility for managing error reporting and cancellation for complex tasks.
//
// When submitting an error with `Error`, only the first is stored (as in aggregate values).
// Then a special error handler is run, if present, and the context is cancelled.
//
// Manual cancellation with `Cancel` is simulated by reporting an `errc::operation_canceled` error.
// This allows running the error handler and representing this scenario as an error.
class Context : protected Cancellation {
public:
// The error handler should return false if this error is ignored.
using ErrHandler = std::function<bool(const GenericError&)>;
using ErrHandler = std::function<void(const GenericError&)>;

Context() = default;
Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} {
Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} {
}

operator GenericError();
operator std::error_code();

void Cancel(); // Cancels the context by submitting an `errc::operation_canceled` error.
using Cancellation::IsCancelled;
operator const Cancellation*();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to introduce an implicit casting from object D to B* when D derives from B?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I need to forbid getting non-const Cancellation* so that its real Cancel function cannot be called.

Otherwise, we can replace the Cancellation type by the Context everywhere, or make Cancellation an interface

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you try to void virtual inheritance, then I think it will be clearer than using these operators

Copy link
Contributor Author

@dranikpg dranikpg Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can just add explicit GetCancellation (I wouldn't say its unusual to hide inheritance for classes and re-expose the interface in a limited way)


// Report an error by submitting arguments for GenericError.
// If this is the first error that occured, then the error handler is run
// and the context is cancelled.
//
// Note: this function blocks when called from inside an error handler.
template <typename... T> void Error(T... ts) {
std::lock_guard lk{mu_};
if (err_)
return;

GenericError new_err{std::forward<T>(ts)...};
if (!err_handler_ || err_handler_(new_err)) {
err_ = std::move(new_err);
Cancel();
}
if (err_handler_)
err_handler_(new_err);

err_ = std::move(new_err);
Cancellation::Cancel();
}

// Reset the error and cancellation flag, assign the new error handler.
void Reset(ErrHandler handler);

// Check for cancellation and replace the error handler atomically.
// Returns whether the context is cancelled. This function can be used
// to transfer cleanup resposibility safely.
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
//
// Beware, never do this manually in two steps. If you check for cancellation,
// set the error handler and initialize resources, then the new error handler
// will never run if the context was cancelled beteween the first two steps.
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
bool Switch(ErrHandler handler);

romange marked this conversation as resolved.
Show resolved Hide resolved
private:
GenericError err_;
ErrHandler err_handler_;
Expand Down
26 changes: 19 additions & 7 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx,
EngineShard::tlocal());
};
Expand Down Expand Up @@ -283,7 +283,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
EngineShard* shard = EngineShard::tlocal();
FlowInfo* flow = &replica_ptr->flows[index];

Expand Down Expand Up @@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, *cntx, shard);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not understand this either. I am guessing it's related somehow to the cast you added?

}

flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
Expand Down Expand Up @@ -383,7 +383,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
return cntx->Error(ec);
}

if (ec = saver->SaveBody(cntx, nullptr); ec) {
if (ec = saver->SaveBody(*cntx, nullptr); ec) {
return cntx->Error(ec);
}

Expand All @@ -405,8 +405,6 @@ uint32_t DflyCmd::CreateSyncSession() {
// StopReplication needs to run async to prevent blocking
// the error handler.
::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach();

return true; // Cancel context
};

auto replica_ptr = make_shared<ReplicaInfo>(flow_count, std::move(err_handler));
Expand Down Expand Up @@ -532,7 +530,21 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState e
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}

void DflyCmd::Shutdown() {
vector<pair<uint32_t, shared_ptr<ReplicaInfo>>> pending;

// Copy all sync infos to prevent blocking.
{
std::lock_guard lk(mu_);
pending.resize(replica_infos_.size());
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
std::copy(replica_infos_.begin(), replica_infos_.end(), pending.begin());
}

for (auto [sync_id, replica_ptr] : pending) {
CancelReplication(sync_id, replica_ptr);
}
}

void DflyCmd::FlowInfo::TryShutdownSocket() {
Expand Down
4 changes: 3 additions & 1 deletion src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ class DflyCmd {

void OnClose(ConnectionContext* cntx);

// Stop all background processes so we can exit in orderly manner.
void BreakOnShutdown();
romange marked this conversation as resolved.
Show resolved Hide resolved

// Stop all background processes so we can exit in orderly manner.
void Shutdown();

// Create new sync session.
uint32_t CreateSyncSession();

Expand Down
3 changes: 2 additions & 1 deletion src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <atomic>
#include <string>
#include <system_error>

#include "facade/error.h"

Expand All @@ -21,7 +22,7 @@ using facade::kWrongTypeErr;

#define RETURN_ON_ERR(x) \
do { \
auto __ec = (x); \
std::error_code __ec = (x); \
if (__ec) { \
LOG(ERROR) << "Error " << __ec << " while calling " #x; \
return __ec; \
Expand Down
Loading