Skip to content

Commit

Permalink
refactor(storage-manager): reply_events -> reply_event_retrieval
Browse files Browse the repository at this point in the history
This commit refactors, renames and changes the method `reply_events` to
`reply_event_retrieval`: it now only sends a single
`AlignmentReply::Retrieval` to the Replica that requested data.

The code of the method was also reworked to make it clearer and less
error-prone: there are no `continue;` statement in the middle which
makes the flow less "jumpy".

These changes prepare the introduction of the Wildcard Update.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_query.rs:
  refactor, rename and change the method `reply_events` to
  `reply_event_retrieval`.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet authored and gabrik committed Oct 11, 2024
1 parent 55eb81a commit bf7e62b
Showing 1 changed file with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Replication {
.collect::<Vec<_>>();

for interval_idx in idx_intervals {
let mut events_to_send = Vec::default();
let mut events_to_retrieve = Vec::default();
if let Some(interval) = self
.replication_log
.read()
Expand All @@ -108,14 +108,15 @@ impl Replication {
.get(&interval_idx)
{
interval.sub_intervals().for_each(|sub_interval| {
events_to_send.extend(sub_interval.events().map(Into::into));
events_to_retrieve.extend(sub_interval.events().map(Into::into));
});
}

// NOTE: As we took the lock in the `if let` block, it is released here,
// diminishing contention.

self.reply_events(&query, events_to_send).await;
for event_to_retrieve in events_to_retrieve {
self.reply_event_retrieval(&query, event_to_retrieve).await;
}
}
}
AlignmentQuery::Diff(digest_diff) => {
Expand Down Expand Up @@ -149,8 +150,8 @@ impl Replication {
}
AlignmentQuery::Events(events_to_retrieve) => {
tracing::trace!("Processing `AlignmentQuery::Events`");
if !events_to_retrieve.is_empty() {
self.reply_events(&query, events_to_retrieve).await;
for event_to_retrieve in events_to_retrieve {
self.reply_event_retrieval(&query, event_to_retrieve).await;
}
}
}
Expand Down Expand Up @@ -268,44 +269,40 @@ impl Replication {
reply_to_query(query, reply, None).await;
}

/// Replies to the [Query] with the [EventMetadata] and [Value] that were identified as missing.
/// Replies to the [Query] with the [EventMetadata] and [Value] identified as missing.
///
/// This method will fetch the [StoredData] from the Storage for each provided [EventMetadata],
/// making a distinct reply for each. The fact that multiple replies are sent to the same Query
/// is the reason why we need the consolidation to set to be `None` (⚠️).
pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec<EventMetadata>) {
for event_metadata in events_to_retrieve {
if event_metadata.action == SampleKind::Delete {
reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await;
continue;
}
/// This method will fetch the [StoredData] from the Storage.
pub(crate) async fn reply_event_retrieval(
&self,
query: &Query,
event_to_retrieve: EventMetadata,
) {
let mut value = None;

if event_to_retrieve.action == SampleKind::Put {
let stored_data = {
let mut storage = self.storage.lock().await;
match storage.get(event_metadata.stripped_key.clone(), "").await {
match storage
.get(event_to_retrieve.stripped_key.clone(), "")
.await
{
Ok(stored_data) => stored_data,
Err(e) => {
tracing::error!(
"Failed to retrieve data associated to key < {:?} >: {e:?}",
event_metadata.key_expr()
event_to_retrieve.key_expr()
);
continue;
return;
}
}
};

let requested_data = stored_data
.into_iter()
.find(|data| data.timestamp == *event_metadata.timestamp());
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => {
tracing::trace!("Sending Sample: {:?}", event_metadata.stripped_key);
reply_to_query(
query,
AlignmentReply::Retrieval(event_metadata),
Some(data.value),
)
.await;
value = Some(data.value);
}
None => {
// NOTE: This is not necessarily an error. There is a possibility that the data
Expand All @@ -316,12 +313,14 @@ impl Replication {
tracing::debug!(
"Found no data in the Storage associated to key < {:?} > with a Timestamp \
equal to: {}",
event_metadata.key_expr(),
event_metadata.timestamp()
event_to_retrieve.key_expr(),
event_to_retrieve.timestamp()
);
}
}
}

reply_to_query(query, AlignmentReply::Retrieval(event_to_retrieve), value).await;
}
}

Expand Down

0 comments on commit bf7e62b

Please sign in to comment.