diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index c6ba98d86e96..1475830aac08 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -852,6 +852,22 @@ uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { return ver; } +void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, + uint64_t upper_bound) { + uint64_t bucket_version = it.GetVersion(); + // change_cb_ is ordered by vesion. + for (const auto& ccb : change_cb_) { + uint64_t cb_vesrion = ccb.first; + DCHECK_LE(cb_vesrion, upper_bound); + if (cb_vesrion == upper_bound) { + return; + } + if (bucket_version < cb_vesrion) { + ccb.second(db_ind, ChangeReq{it}); + } + } +} + //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 7f94741246e4..9f27ca53470b 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -278,6 +278,9 @@ class DbSlice { //! at a time of the call. uint64_t RegisterOnChange(ChangeCallback cb); + // Call registered callbacks with vesrion less than upper_bound. + void FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it, uint64_t upper_bound); + //! Unregisters the callback. void UnregisterOnChange(uint64_t id); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index d3c929bb2398..173d163a9f77 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -178,6 +178,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) { ++stats_.skipped; return false; } + db_slice_->FlushChangeToEarlierCallbacks(current_db_, it, snapshot_version_); stats_.loop_serialized += SerializeBucket(current_db_, it); return false;