diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs index 6e2350766..566b01f33 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs @@ -23,7 +23,7 @@ use zenoh::{ use zenoh_backend_traits::StorageInsertionResult; use crate::replication::{ - classification::{IntervalIdx, SubIntervalIdx}, + classification::{EventRemoval, IntervalIdx, SubIntervalIdx}, core::{aligner_key_expr_formatter, aligner_query::AlignmentQuery, Replication}, digest::Fingerprint, log::EventMetadata, @@ -254,32 +254,31 @@ impl Replication { } SampleKind::Delete => { let mut replication_log_guard = self.replication_log.write().await; - if let Some(latest_event) = - replication_log_guard.lookup(&replica_event.stripped_key) + match replication_log_guard + .remove_older(&replica_event.stripped_key, &replica_event.timestamp) { - if latest_event.timestamp >= replica_event.timestamp { - return None; + EventRemoval::NotFound => {} + EventRemoval::KeptNewer => return None, + EventRemoval::RemovedOlder(older_event) => { + if older_event.action == SampleKind::Put { + // NOTE: In some of our backend implementation, a deletion on a + // non-existing key will return an error. Given that we cannot + // distinguish an error from a missing key, we will assume + // the latter and move forward. + // + // FIXME: Once the behaviour described above is fixed, check for + // errors. + let _ = self + .storage + .lock() + .await + .delete(replica_event.stripped_key.clone(), replica_event.timestamp) + .await; + } } } - if matches!( - self.storage - .lock() - .await - .delete(replica_event.stripped_key.clone(), replica_event.timestamp) - .await, - // NOTE: In some of our backend implementation, a deletion on a - // non-existing key will return an error. Given that we cannot - // distinguish an error from a missing key, we will assume - // the latter and move forward. - // - // FIXME: Once the behaviour described above is fixed, check for - // errors. - Ok(StorageInsertionResult::Outdated) - ) { - return None; - } - replication_log_guard.insert_event(replica_event.clone().into()); + replication_log_guard.insert_event_unchecked(replica_event.clone().into()); } } @@ -313,10 +312,11 @@ impl Replication { } let mut replication_log_guard = self.replication_log.write().await; - if let Some(latest_event) = replication_log_guard.lookup(&replica_event.stripped_key) { - if latest_event.timestamp >= replica_event.timestamp { - return; - } + match replication_log_guard + .remove_older(&replica_event.stripped_key, &replica_event.timestamp) + { + EventRemoval::KeptNewer => return, + EventRemoval::RemovedOlder(_) | EventRemoval::NotFound => {} } // NOTE: This code can only be called with `action` set to `delete` on an initial @@ -342,6 +342,6 @@ impl Replication { return; } - replication_log_guard.insert_event(replica_event.into()); + replication_log_guard.insert_event_unchecked(replica_event.into()); } } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs index c5864a69f..a757bce18 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/log.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/log.rs @@ -142,7 +142,6 @@ pub enum EventInsertion { New(Event), ReplacedOlder(Event), NotInsertedAsOlder, - NotInsertedAsOutOfBound, } /// The `LogLatest` keeps track of the last publication that happened on a key expression. @@ -215,6 +214,9 @@ impl LogLatest { /// Attempts to insert the provided [Event] in the replication log and return the [Insertion] /// outcome. /// + /// This method will first go through the Replication Log to determine if the provided [Event] + /// is indeed newer. If not then this method will do nothing. + /// /// # Caveat: out of bound /// /// This method will record an error in the Zenoh log if the timestamp associated with the @@ -228,15 +230,38 @@ impl LogLatest { EventRemoval::NotFound => EventInsertion::New(event.clone()), }; - let (interval_idx, sub_interval_idx) = match self + self.insert_event_unchecked(event); + + event_insertion + } + + /// Inserts the provided [Event] in the replication log *without checking if there is another + /// [Event] with the same key expression*. + /// + /// ⚠️ This method is meant to be used *after having called [remove_older] and processed its + /// result*, ensuring that the provided event is indeed more recent and the only one for + /// that key expression. + /// + /// This method will first go through the Replication Log to determine if the provided [Event] + /// is indeed newer. If not then this method will do nothing. + /// + /// # Caveat: out of bound + /// + /// This method will record an error in the Zenoh log if the timestamp associated with the + /// [Event] is so far in the future that the index of its interval is higher than + /// [u64::MAX]. This should not happen unless a specially crafted [Event] is sent to this node + /// or if the internal clock of the host that produced it is (very) far in the future. + pub(crate) fn insert_event_unchecked(&mut self, event: Event) { + let Ok((interval_idx, sub_interval_idx)) = self .configuration .get_time_classification(event.timestamp()) - { - Ok((interval_idx, sub_interval_idx)) => (interval_idx, sub_interval_idx), - Err(e) => { - tracing::error!("{e:?}"); - return EventInsertion::NotInsertedAsOutOfBound; - } + else { + tracing::error!( + "Fatal error: timestamp of Event < {:?} > is out of bounds: {}", + event.maybe_stripped_key, + event.timestamp + ); + return; }; self.bloom_filter_event.set(event.key_expr()); @@ -245,8 +270,6 @@ impl LogLatest { .entry(interval_idx) .or_default() .insert_unchecked(sub_interval_idx, event); - - event_insertion } /// Removes, if there is one, the previous event from the Replication Log for the provided key