Skip to content

Commit

Permalink
feat(storage-manager): Replication supports Wildcard Updates
Browse files Browse the repository at this point in the history
This commit adds support for Wildcard Updates (be it Delete or Put) when
aligning replicated Storage.

To make this feature possible the following changes were made:

- A new enumeration `Action` was introduced. This enumeration builds on
  top of `SampleKind` and adds two variants: `WildcardPut` and
  `WildcardDelete`, each of these variants contains their full key
  expression.
  We track the full key expression to not have to deal with `Option` and
  the `strip_prefix`: there are cases where the `strip_prefix` cannot be
  removed from a Wildcard Update — yet the Wildcard Update should be
  recorded.
  For instance, if a storage subscribes to "test/replication/**" and has
  its strip prefix set to "test/replication", a delete on "test/**"
  should be recorded while the `strip_prefix` cannot be removed.

- The `Event` and `EventMetadata` structures were reworked to avoid
  repetitions and to prevent modifications without updating the
  Fingerprint.

- The keys of the Bloom Filter, `LogLatest` and `LatestUpdates` was
  changed from `Option<OwnedKeyExpr>` to the new structure
  `LogLatestKey`. This is because we need to remember, for Wildcard
  Updates, the last Put and the last Delete on the exact same key
  expression. This modification impacted many parts of the code base,
  the signature of many functions that were accepting a key expression
  and a Timestamp now requires an `Event` such that we can generate the
  associated `LogLatestKey` -- through the dedicated method `log_key()`.

- In addition to the change above, and to still support keeping track of
  the last Put and last Delete on the same Wildcard Update, the
  `tombstones` and `wildcard_updates` fields of the `StorageService`
  were, respectively, renamed `wildcard_deletes` and
  `wildcard_puts`. The purpose of the former `tombstones` was changed to
  keep track of the Wildcard Delete.
  As a consequence the `is_deleted` method was removed. This futuristic
  deletion check will however still be performed, by checking the
  `wildcard_deletes`.

- As a consequence of the change above, the logic of the method
  `overriding_wild_update` was completely reworked: the check for an
  overriding Wildcard Delete is different to the check for an overriding
  Wildcard Put. Plus, they don't apply to all `Action`.

- The `LogLatest`, `Interval` and `SubInterval` structures now have the
  the method `remove_events_overridden_by_wildcard_update`: this method
  removes all the Events that are impacted by a Wildcard Update and
  updates the Fingerprint of the impacted Interval / SubInterval.

- The core `Replication` structure now keeps an `Arc` over the
  `StorageService` such that it can add Wildcard Updates to it when it
  receives some.

- The visibility of the following field / structures were changed to
  `pub(crate)` such that the Replication can access them:
  - The `Update` structure that contains the payload and timestamp
    associated with a Wildcard Update.
  - The `configuration`, `wildcard_puts`, and `wildcard_deletes` fields
    of the `StorageService` structure.

- The signature of the method `register_wildcard_update` was changed: it
  no longer extracts all the metadata from the `sample` argument as,
  when called from the Replication Aligner, these values are not
  "correct" (they are linked to a reply and not to the contained
  payload).

* plugins/zenoh-plugin-storage-manager/src/replication/classification.rs:
  - Added the method `remove_events_overridden_by_wildcard_update` to
    the `Interval` and `SubInterval` structures.
  - Added the method `remove_event` to the `Interval` and `SubInterval`
    structures.
  - Updated the keys of the Hash-based structures to use the
    `LogLatestKey` instead of an `Option<OwnedKeyExpr>`.
  - Updated methods to pass an `&EventMetadata` or `&Event` to be able
    to obtain their `LogLatestKey`.

* plugins/zenoh-plugin-storage-manager/src/replication/configuration.rs:
  added the `prefix` accessor.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Changed the `Replication` structure to keep track of the
    `StorageService`. The latter is used to access and add/remove
    Wildcard Updates.
  - Added the method `remove_events_overridden_by_wildcard_updates`
    that takes a `HashMap` as this logic is shared for the latest cache
    and the Replication Log.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs:
  updated the `reply_event_retrieval` method to handle Wildcard Updates.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs:
  - Added some traces to ease debugging.
  - Added the method `needs_further_processing` that, given an
    EventMetadata, checks if it needs to be processed, processes it if it
    can and otherwise returns true -- indicating that further processing
    is needed.
  - Added the method `apply_wildcard_update` that applies a Wildcard
    Update.
  - Added the method `is_overridden_by_wildcard_update` that checks if
    there is a Wildcard Update that overrides the provided key
    expression for the provided timestamp.
  - Added the method `store_event_overridden_by_wildcard_update` that
    overrides the provided EventMetadata with the Wildcard Update and
    stores the result.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs:
  - Added the `Action` enumeration.
  - Added the `ActionKind` enumeration.
  - Added the `LogLatestKey` structure.
  - Reworked the `Event` and `EventMetadata` structures.
  - Added the method `remove_events_overridden_by_wildcard_update` to
    the `LogLatest` structure.

* plugins/zenoh-plugin-storage-manager/src/replication/mod.rs: make the
  `Action` and `LogLatestKey` visible from the `replication` module.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  changed the `spawn_start` function to take an `Arc<StorageService>`
  instead of a reference.

* plugins/zenoh-plugin-storage-manager/src/replication/tests/classification.test.rs:
  updated unit tests due to changes on the methods tested.

* plugins/zenoh-plugin-storage-manager/src/replication/tests/log.test.rs:
  added new unit tests to ensure that the generation of an Event happens
  as expected.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: wrap the
  `StorageService` inside of an `Arc` such that the `Replication` can use
  it in its tasks.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Changed the visibility of the `Update` structure to `pub(crate)` and
    added accessors.
  - Changed the visibility of the `configuration` and `wildcard_updates`
    fields of the `StorageService` structure to `pub(crate)` as the
    `Replication` uses them when dealing with Wildcard Updates.
  - Changed the signature of the method `register_wildcard_update` to
    not extract the metadata from the `Sample` -- as the values are not
    correct when processing an Alignment reply.
  - Changed the visibility of the methods `register_wildcard_update` and
    `overriding_wild_update` to `pub(crate)` as they are used by the
    Replication to deal with Wildcard Updates.
  - Reworked the logic of the `overriding_wild_update` to handle
    Wildcard Delete and Wildcard Put separately. In addition, this
    method no longer queries the database every time an overriding
    Wildcard Update is found.
  - Reworked the `register_wildcard_update` method.
  - Removed the `mark_tombstone` method.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet authored and gabrik committed Oct 11, 2024
1 parent bf7e62b commit ae9719f
Show file tree
Hide file tree
Showing 12 changed files with 1,358 additions and 461 deletions.
139 changes: 111 additions & 28 deletions plugins/zenoh-plugin-storage-manager/src/replication/classification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use std::{
};

use serde::{Deserialize, Serialize};
use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp};
use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind, time::Timestamp};

use super::{digest::Fingerprint, log::Event};
use super::{
digest::Fingerprint,
log::{Event, EventMetadata, LogLatestKey},
};

/// The `EventRemoval` enumeration lists the possible outcomes when searching for an older [Event]
/// and removing it if one was found.
Expand Down Expand Up @@ -103,7 +106,7 @@ impl Interval {
#[cfg(debug_assertions)]
pub(crate) fn assert_only_one_event_per_key_expr(
&self,
events: &mut HashSet<Option<OwnedKeyExpr>>,
events: &mut HashSet<LogLatestKey>,
) -> bool {
for sub_interval in self.sub_intervals.values() {
if !sub_interval.assert_only_one_event_per_key_expr(events) {
Expand Down Expand Up @@ -137,9 +140,9 @@ impl Interval {
}

/// Lookup the provided key expression and return, if found, its associated [Event].
pub(crate) fn lookup(&self, stripped_key: &Option<OwnedKeyExpr>) -> Option<&Event> {
pub(crate) fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
for sub_interval in self.sub_intervals.values() {
if let Some(event) = sub_interval.events.get(stripped_key) {
if let Some(event) = sub_interval.lookup(event_to_lookup) {
return Some(event);
}
}
Expand Down Expand Up @@ -187,16 +190,12 @@ impl Interval {
/// The [Fingerprint] of this Interval will be updated accordingly.
///
/// This method returns, through the [EventRemoval] enumeration, the action that was performed.
pub(crate) fn remove_older(
&mut self,
key_expr: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> EventRemoval {
pub(crate) fn remove_older(&mut self, event_to_remove: &EventMetadata) -> EventRemoval {
let mut sub_interval_idx_to_remove = None;
let mut result = EventRemoval::NotFound;

for (sub_interval_idx, sub_interval) in self.sub_intervals.iter_mut() {
result = sub_interval.remove_older(key_expr, timestamp);
result = sub_interval.remove_older(event_to_remove);
if let EventRemoval::RemovedOlder(ref old_event) = result {
self.fingerprint ^= old_event.fingerprint();
if sub_interval.events.is_empty() {
Expand All @@ -216,6 +215,54 @@ impl Interval {

result
}

pub(crate) fn remove_event(
&mut self,
sub_interval_idx: &SubIntervalIdx,
event_to_remove: &EventMetadata,
) -> Option<Event> {
let removed_event = self
.sub_intervals
.get_mut(sub_interval_idx)
.and_then(|sub_interval| sub_interval.remove_event(event_to_remove));

if let Some(event) = &removed_event {
self.fingerprint ^= event.fingerprint();
}

removed_event
}

/// Removes and returns the [Event] present in this `Interval` that are overridden by the
/// provided Wildcard Update.
///
/// If the Wildcard Update should be recorded in this `Interval` then the index of the
/// `SubInterval` should also be provided as the removal can be stopped right after: all the
/// [Event]s contained in greater `SubInterval` will, by construction of the Replication Log,
/// only have greater timestamps and thus cannot be overridden by this Wildcard Update.
pub(crate) fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: &Timestamp,
wildcard_kind: SampleKind,
) -> HashSet<Event> {
let mut overridden_events = HashSet::new();
for sub_interval in self.sub_intervals.values_mut() {
self.fingerprint ^= sub_interval.fingerprint;

overridden_events.extend(sub_interval.remove_events_overridden_by_wildcard_update(
prefix,
wildcard_key_expr,
wildcard_timestamp,
wildcard_kind,
));

self.fingerprint ^= sub_interval.fingerprint;
}

overridden_events
}
}

/// A `SubIntervalIdx` represents the index of a [SubInterval].
Expand Down Expand Up @@ -249,7 +296,7 @@ pub(crate) struct SubInterval {
fingerprint: Fingerprint,
// ⚠️ This field should remain private: we cannot manipulate the `Events` without updating the
// Fingerprint.
events: HashMap<Option<OwnedKeyExpr>, Event>,
events: HashMap<LogLatestKey, Event>,
}

impl<const N: usize> From<[Event; N]> for SubInterval {
Expand All @@ -262,7 +309,7 @@ impl<const N: usize> From<[Event; N]> for SubInterval {
fingerprint,
events: events
.into_iter()
.map(|event| (event.key_expr().clone(), event))
.map(|event| (event.log_key(), event))
.collect(),
}
}
Expand All @@ -276,15 +323,12 @@ impl SubInterval {
///
/// ⚠️ This method will only be called if Zenoh is compiled in Debug mode.
#[cfg(debug_assertions)]
fn assert_only_one_event_per_key_expr(
&self,
events: &mut HashSet<Option<OwnedKeyExpr>>,
) -> bool {
for event_ke in self.events.keys() {
if !events.insert(event_ke.clone()) {
fn assert_only_one_event_per_key_expr(&self, events: &mut HashSet<LogLatestKey>) -> bool {
for event_log_key in self.events.keys() {
if !events.insert(event_log_key.clone()) {
tracing::error!(
"FATAL ERROR, REPLICATION LOG INVARIANT VIOLATED, KEY APPEARS MULTIPLE TIMES: \
< {event_ke:?} >"
< {event_log_key:?} >"
);
return false;
}
Expand Down Expand Up @@ -323,7 +367,7 @@ impl SubInterval {
/// be updated to keep it correct and a warning message will be emitted.
fn insert_unchecked(&mut self, event: Event) {
self.fingerprint ^= event.fingerprint();
if let Some(replaced_event) = self.events.insert(event.key_expr().clone(), event) {
if let Some(replaced_event) = self.events.insert(event.log_key(), event) {
tracing::warn!(
"Call to `insert_unchecked` replaced an Event in the replication Log, this should \
NOT have happened: {replaced_event:?}"
Expand All @@ -339,13 +383,9 @@ impl SubInterval {
/// performed.
///
/// The [Fingerprint] of this SubInterval will be updated accordingly.
fn remove_older(
&mut self,
key_expr: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> EventRemoval {
if let Some((key_expr, event)) = self.events.remove_entry(key_expr) {
if event.timestamp() < timestamp {
fn remove_older(&mut self, event_to_remove: &EventMetadata) -> EventRemoval {
if let Some((key_expr, event)) = self.events.remove_entry(&event_to_remove.log_key()) {
if event.timestamp() < &event_to_remove.timestamp {
self.fingerprint ^= event.fingerprint();
return EventRemoval::RemovedOlder(event);
} else {
Expand All @@ -356,6 +396,49 @@ impl SubInterval {

EventRemoval::NotFound
}

fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
self.events.get(&event_to_lookup.log_key())
}

fn remove_event(&mut self, event_to_remove: &EventMetadata) -> Option<Event> {
let removed_event = self.events.remove(&event_to_remove.log_key());
if let Some(event) = &removed_event {
self.fingerprint ^= event.fingerprint();
}

removed_event
}

/// Removes and returns the [Event] present in this `SubInterval` that are overridden by the
/// provided Wildcard Update.
///
/// The timestamp of the Wildcard Update should only be provided if the considered `SubInterval`
/// is where the Wildcard Update should be recorded.
/// It is only in that specific scenario that we are not sure that all [Event]s have a lower
/// timestamp.
fn remove_events_overridden_by_wildcard_update(
&mut self,
prefix: Option<&OwnedKeyExpr>,
wildcard_key_expr: &OwnedKeyExpr,
wildcard_timestamp: &Timestamp,
wildcard_kind: SampleKind,
) -> HashSet<Event> {
let overridden_events =
crate::replication::core::remove_events_overridden_by_wildcard_update(
&mut self.events,
prefix,
wildcard_key_expr,
wildcard_timestamp,
wildcard_kind,
);

overridden_events
.iter()
.for_each(|overridden_event| self.fingerprint ^= overridden_event.fingerprint());

overridden_events
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Configuration {
}
}

pub fn prefix(&self) -> Option<&OwnedKeyExpr> {
self.prefix.as_ref()
}

/// Returns the [Fingerprint] of the `Configuration`.
///
/// The fingerprint is the hash of all its fields, using the `xxhash_rust` crate.
Expand Down
109 changes: 99 additions & 10 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,31 @@ mod aligner_query;
mod aligner_reply;

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant, SystemTime},
};

use rand::Rng;
use tokio::{
sync::{Mutex, RwLock},
task::JoinHandle,
};
use tokio::{sync::RwLock, task::JoinHandle};
use tracing::{debug_span, Instrument};
use zenoh::{
key_expr::{
format::{kedefine, keformat},
OwnedKeyExpr,
},
query::{ConsolidationMode, Selector},
sample::Locality,
sample::{Locality, SampleKind},
time::Timestamp,
Session,
};
use zenoh_backend_traits::Storage;

use self::aligner_reply::AlignmentReply;
use super::{digest::Digest, log::LogLatest};
use crate::{replication::core::aligner_query::AlignmentQuery, storages_mgt::LatestUpdates};
use super::{digest::Digest, log::LogLatest, Action, Event, LogLatestKey};
use crate::{
replication::core::aligner_query::AlignmentQuery,
storages_mgt::{LatestUpdates, StorageService},
};

kedefine!(
pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}",
Expand All @@ -53,7 +53,7 @@ pub(crate) struct Replication {
pub(crate) replication_log: Arc<RwLock<LogLatest>>,
pub(crate) storage_key_expr: OwnedKeyExpr,
pub(crate) latest_updates: Arc<RwLock<LatestUpdates>>,
pub(crate) storage: Arc<Mutex<Box<dyn Storage>>>,
pub(crate) storage_service: Arc<StorageService>,
}

impl Replication {
Expand Down Expand Up @@ -579,3 +579,92 @@ impl Replication {
})
}
}

pub(crate) fn remove_events_overridden_by_wildcard_update(
events: &mut HashMap<LogLatestKey, Event>,
prefix: Option<&OwnedKeyExpr>,
wildcard_ke: &OwnedKeyExpr,
wildcard_ts: &Timestamp,
wildcard_kind: SampleKind,
) -> HashSet<Event> {
let mut overridden_events = HashSet::default();

events.retain(|_, event| {
// We only provide the timestamp of the Wildcard Update if the Wildcard Update belongs
// in this SubInterval.
//
// Only then do we need to compare its timestamp with the timestamp of the Events
// contained in the SubInterval.
if event.timestamp() >= wildcard_ts {
// Very specific scenario: we are processing a Wildcard Delete that should have been
// applied before another Wildcard Update.
//
// With an example, we had the events:
// - put "a = 1" @t0
// - put "** = 42" @t2
//
// That leads the Event in the Replication Log associated to "a" to be:
// - timestamp = @t2
// - timestamp_last_non_wildcard_update = @t0
//
// And now we receive:
// - delete "**" @t1 (@t0 < @t1 < @t2)
//
// As the Wildcard Delete should have arrived before the Wildcard Update (put "** =
// 42"), "a" should have been deleted and the Wildcard Update Put not applied.
//
// These `if` check that very specific scenario. Basically we should only retain the
// Event if its `timestamp_last_non_wildcard_update` exists and is greater than the
// timestamp of the Wildcard Delete.
if wildcard_kind == SampleKind::Delete && event.action() == &Action::Put {
if let Some(timestamp_last_non_wildcard_update) =
event.timestamp_last_non_wildcard_update
{
if timestamp_last_non_wildcard_update > *wildcard_ts {
return true;
}
}
} else {
return true;
}
}

let full_event_key_expr = match event.action() {
// We do not want to override deleted Events, either with another Wildcard Update or
// with a Wildcard Delete.
Action::Delete => return true,
Action::Put => match crate::prefix(prefix, event.stripped_key.as_ref()) {
Ok(full_ke) => full_ke,
Err(e) => {
tracing::error!(
"Internal error while attempting to prefix < {:?} > with < {:?} >: {e:?}",
event.stripped_key,
prefix
);
return true;
}
},
Action::WildcardPut(wildcard_ke) | Action::WildcardDelete(wildcard_ke) => {
wildcard_ke.clone()
}
};

if wildcard_ke.includes(&full_event_key_expr) {
// A Wildcard Update cannot override a Wildcard Delete. A Wildcard Delete can only be
// overridden by another Wildcard Delete.
//
// The opposite is not true: a Wildcard Delete can override a Wildcard Update.
if wildcard_kind == SampleKind::Put && matches!(event.action, Action::WildcardDelete(_))
{
return true;
}

overridden_events.insert(event.clone());
return false;
}

true
});

overridden_events
}
Loading

0 comments on commit ae9719f

Please sign in to comment.