Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement robust error & cancellation on replica #531

Merged
merged 6 commits into from
Dec 11, 2022

Conversation

dranikpg
Copy link
Contributor

@dranikpg dranikpg commented Dec 4, 2022

Implements robust error handling and cancellation for the replica.

tests/dragonfly/replication_test.py Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/replica.h Outdated Show resolved Hide resolved
@dranikpg dranikpg force-pushed the replica-cancellation branch from 39d86c7 to d8ac791 Compare December 4, 2022 21:23
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/common.h Outdated Show resolved Hide resolved
src/server/dflycmd.cc Show resolved Hide resolved
@@ -325,7 +325,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, *cntx, shard);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not understand this either. I am guessing it's related somehow to the cast you added?

src/server/replica.cc Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
@dranikpg dranikpg force-pushed the replica-cancellation branch 2 times, most recently from b3a186e to debed54 Compare December 7, 2022 20:18
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
@dranikpg dranikpg force-pushed the replica-cancellation branch from debed54 to 97b146e Compare December 7, 2022 20:24
src/server/dflycmd.h Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
Comment on lines 438 to 444
auto err_handler = [this, sync_block](const auto& ge) {
sync_block->Add(num_df_flows_); // Unblock sync_block.
DefaultErrorHandler(ge); // Close sockets to unblock flows.
};
if (cntx_.Switch(std::move(err_handler)))
return cntx_;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added Switch to the context that atomically checks for error and exchanges the error handler

src/server/common.h Outdated Show resolved Hide resolved
tests/dragonfly/replication_test.py Show resolved Hide resolved
@dranikpg dranikpg marked this pull request as ready for review December 7, 2022 20:34
@dranikpg dranikpg requested a review from romange December 7, 2022 20:34
@dranikpg
Copy link
Contributor Author

dranikpg commented Dec 7, 2022

This should be a working and more or less simple version


void Cancel(); // Cancels the context by submitting an `errc::operation_canceled` error.
using Cancellation::IsCancelled;
operator const Cancellation*();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to introduce an implicit casting from object D to B* when D derives from B?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I need to forbid getting non-const Cancellation* so that its real Cancel function cannot be called.

Otherwise, we can replace the Cancellation type by the Context everywhere, or make Cancellation an interface

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you try to void virtual inheritance, then I think it will be clearer than using these operators

Copy link
Contributor Author

@dranikpg dranikpg Dec 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can just add explicit GetCancellation (I wouldn't say its unusual to hide inheritance for classes and re-expose the interface in a limited way)

src/server/common.h Outdated Show resolved Hide resolved
src/server/common.h Outdated Show resolved Hide resolved
src/server/dflycmd.cc Show resolved Hide resolved
src/server/dflycmd.cc Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/replica.cc Outdated Show resolved Hide resolved
src/server/common.cc Outdated Show resolved Hide resolved
src/server/common.cc Outdated Show resolved Hide resolved
@romange
Copy link
Collaborator

romange commented Dec 8, 2022

Can you please make sure that replica status is reflected by "info replication" command?
i.e. be able to read easily what stage are we at and also make sure that other stats make sense

Copy link
Contributor Author

@dranikpg dranikpg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an explicit GetError function to the context and fixed other small issues

Comment on lines +1538 to +1542
if (!cntx->replica_conn) {
ServerState::tl_connection_stats()->num_replicas += 1;
}
cntx->replica_conn = true;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do this inside PSYNC for Redis.

Does a replica count if its still setting up flows and preparing to do full sync (which should be a very short phase)? I guess, yes

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
@dranikpg dranikpg force-pushed the replica-cancellation branch from a91ceca to d437950 Compare December 8, 2022 10:00
@dranikpg dranikpg requested a review from romange December 9, 2022 21:02
::boost::fibers::mutex mu_;
::boost::fibers::condition_variable cv_;

void Add(unsigned delta);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why BlockingCounter does not fit it?

Copy link
Contributor Author

@dranikpg dranikpg Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I need to change it though a little

In case of an error, I need a way to unblock it. I can add a function like Reset() which resets the count to 0. However, flows still might decrement it and we'll get an underflow.

So I either:

  • Use an int inside and <= in comparison
  • Use a CAS decrement inside the blocking counter
  • Allow locking the context externally, so I can do
with cntx.Lock()
  if not cntx.Error():
    counter.Dec()

What do you think?

Copy link
Collaborator

@romange romange Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do it differently. You do not really need 64 bit for count.
You can use the high bit as cancel bit. I do not like Reset name but you can use

Cancel()  {
 count_.fetch_or(1ULL << 63, std::memory_order_acquire);
  ec_.notify();  // releases inside
}

void Wait() {
  ec_.await([this] { 
    auto cnt =  count_.load(std::memory_order_acquire); 
     return cnt == 0 || (cnt &  (1 << 63));
  });
}

@romange
Copy link
Collaborator

romange commented Dec 9, 2022

LGTM. you probably need to rebase and start using fiber_ext::Fiber

Signed-off-by: Vladislav <vlad@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
@dranikpg dranikpg merged commit f98d6f3 into dragonflydb:main Dec 11, 2022
@dranikpg dranikpg deleted the replica-cancellation branch December 25, 2022 12:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants