Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(snapshot): Fix unwriten entries in multiple snapshotting Fixes #823 #825

Merged
merged 2 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,22 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
return ver;
}

void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it,
uint64_t upper_bound) {
uint64_t bucket_version = it.GetVersion();
// change_cb_ is ordered by vesion.
for (const auto& ccb : change_cb_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug here if ccb.second preempts and some other snapshot (un)registers to change_cb_.
changing change_cb_ type to map might fix the issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vesion -> version

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our callbacks do not preempt. We should not allow for preemptable callbacks as they are getting iterator and it could be invalidated if there is a preempt.

Copy link
Contributor

@dranikpg dranikpg Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our callbacks can preempt when writing to the channel, when calling PushBytesToChannel

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dranikpg you are right. They can preempt when pushing to channel. In this case we should check if we dont have a bug in the flow, as we are using iterator..

uint64_t cb_vesrion = ccb.first;
DCHECK_LE(cb_vesrion, upper_bound);
if (cb_vesrion == upper_bound) {
return;
}
if (bucket_version < cb_vesrion) {
ccb.second(db_ind, ChangeReq{it});
}
}
}

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
Expand Down
3 changes: 3 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ class DbSlice {
//! at a time of the call.
uint64_t RegisterOnChange(ChangeCallback cb);

// Call registered callbacks with vesrion less than upper_bound.
void FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, uint64_t upper_bound);

//! Unregisters the callback.
void UnregisterOnChange(uint64_t id);

Expand Down
1 change: 1 addition & 0 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
++stats_.skipped;
return false;
}
db_slice_->FlushChangeToEarlierCallbacks(current_db_, it, snapshot_version_);

stats_.loop_serialized += SerializeBucket(current_db_, it);
return false;
Expand Down