Skip to content

Commit

Permalink
feat(server): Implement robust error & cancellation on replica
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Dec 4, 2022
1 parent 74d1839 commit d8ac791
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 77 deletions.
6 changes: 6 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <absl/strings/str_cat.h>
#include <mimalloc.h>

#include <system_error>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
Expand Down Expand Up @@ -250,4 +252,8 @@ std::string GenericError::Format() const {
return absl::StrCat(ec_.message(), ":", details_);
}

void Context::Cancel() {
Error(std::make_error_code(errc::operation_canceled), "Context cancelled");
}

} // namespace dfly
22 changes: 20 additions & 2 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ using AggregateGenericError = AggregateValue<GenericError>;

// Contest combines Cancellation and AggregateGenericError in one class.
// Allows setting an error_handler to run on errors.
class Context : public Cancellation {
class Context : private Cancellation {
public:
// The error handler should return false if this error is ignored.
using ErrHandler = std::function<bool(const GenericError&)>;
Expand All @@ -255,6 +255,24 @@ class Context : public Cancellation {
Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} {
}

operator GenericError() {
return err_;
}

operator std::error_code() {
return err_.GetError();
}

// Cancelling the internal context is only possible through the context directly,
// because it needs to emit an cancellation error.
operator const Cancellation*() {
return this;
}

using Cancellation::IsCancelled;

void Cancel();

template <typename... T> void Error(T... ts) {
std::lock_guard lk{mu_};
if (err_)
Expand All @@ -263,7 +281,7 @@ class Context : public Cancellation {
GenericError new_err{std::forward<T>(ts)...};
if (!err_handler_ || err_handler_(new_err)) {
err_ = std::move(new_err);
Cancel();
Cancellation::Cancel();
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx,
EngineShard::tlocal());
};
Expand Down Expand Up @@ -283,7 +283,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
EngineShard* shard = EngineShard::tlocal();
FlowInfo* flow = &replica_ptr->flows[index];

Expand Down Expand Up @@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, *cntx, shard);
}

flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
Expand Down Expand Up @@ -383,7 +383,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
return cntx->Error(ec);
}

if (ec = saver->SaveBody(cntx, nullptr); ec) {
if (ec = saver->SaveBody(*cntx, nullptr); ec) {
return cntx->Error(ec);
}

Expand Down
Loading

0 comments on commit d8ac791

Please sign in to comment.