Skip to content

Commit

Permalink
remove obsolete zenoh::internal::Value
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 31, 2024
1 parent bb24e95 commit ccb0964
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 212 deletions.
22 changes: 18 additions & 4 deletions plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use std::collections::{hash_map::Entry, HashMap};

use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult};
use zenoh::{
bytes::{Encoding, ZBytes},
key_expr::OwnedKeyExpr,
time::Timestamp,
Result as ZResult,
};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down Expand Up @@ -77,17 +82,26 @@ impl Storage for ExampleStorage {
async fn put(
&mut self,
key: Option<OwnedKeyExpr>,
value: Value,
payload: ZBytes,
encoding: Encoding,
timestamp: Timestamp,
) -> ZResult<StorageInsertionResult> {
let mut map = self.map.write().await;
match map.entry(key) {
Entry::Occupied(mut e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Replaced);
}
Entry::Vacant(e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Inserted);
}
}
Expand Down
8 changes: 5 additions & 3 deletions plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
use async_trait::async_trait;
use const_format::concatcp;
use zenoh::{
internal::Value,
bytes::{Encoding, ZBytes},
key_expr::{keyexpr, OwnedKeyExpr},
time::Timestamp,
Result as ZResult,
Expand Down Expand Up @@ -176,7 +176,8 @@ pub enum StorageInsertionResult {

#[derive(Debug, Clone)]
pub struct StoredData {
pub value: Value,
pub payload: ZBytes,
pub encoding: Encoding,
pub timestamp: Timestamp,
}

Expand Down Expand Up @@ -227,7 +228,8 @@ pub trait Storage: Send + Sync {
async fn put(
&mut self,
key: Option<OwnedKeyExpr>,
value: Value,
payload: ZBytes,
encoding: Encoding,
timestamp: Timestamp,
) -> ZResult<StorageInsertionResult>;

Expand Down
22 changes: 18 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use tokio::sync::RwLock;
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp, Result as ZResult};
use zenoh::{
bytes::{Encoding, ZBytes},
key_expr::OwnedKeyExpr,
time::Timestamp,
Result as ZResult,
};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
*,
Expand Down Expand Up @@ -92,18 +97,27 @@ impl Storage for MemoryStorage {
async fn put(
&mut self,
key: Option<OwnedKeyExpr>,
value: Value,
payload: ZBytes,
encoding: Encoding,
timestamp: Timestamp,
) -> ZResult<StorageInsertionResult> {
tracing::trace!("put for {:?}", key);
let mut map = self.map.write().await;
match map.entry(key) {
std::collections::hash_map::Entry::Occupied(mut e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Replaced);
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(StoredData { value, timestamp });
e.insert(StoredData {
payload,
encoding,
timestamp,
});
return Ok(StorageInsertionResult::Inserted);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};
use zenoh::{bytes::ZBytes, internal::Value, key_expr::keyexpr_tree::IKeyExprTree, query::Query};
use zenoh::{
bytes::{Encoding, ZBytes},
key_expr::keyexpr_tree::IKeyExprTree,
query::Query,
};

use super::aligner_reply::AlignmentReply;
use crate::replication::{
Expand Down Expand Up @@ -304,7 +308,7 @@ impl Replication {
.into_iter()
.find(|data| data.timestamp == *event_to_retrieve.timestamp());
match requested_data {
Some(data) => Some(data.value),
Some(data) => Some((data.payload, data.encoding)),
None => {
// NOTE: This is not necessarily an error. There is a possibility that the
// data associated with this specific key was updated between the time
Expand All @@ -326,7 +330,7 @@ impl Replication {
let wildcard_puts_guard = self.storage_service.wildcard_puts.read().await;

if let Some(update) = wildcard_puts_guard.weight_at(wildcard_ke) {
Some(update.value().clone())
Some((update.payload().clone(), update.encoding().clone()))
} else {
tracing::error!(
"Ignoring Wildcard Update < {wildcard_ke} >: found no associated `Update`."
Expand All @@ -340,9 +344,9 @@ impl Replication {
}
}

/// Replies to a Query, adding the [AlignmentReply] as an attachment and, if provided, the [Value]
/// as the payload (not forgetting to set the Encoding!).
async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option<Value>) {
/// Replies to a Query, adding the [AlignmentReply] as an attachment and, if provided, the payload
/// with the corresponding [zenoh::bytes::Encoding].
async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option<(ZBytes, Encoding)>) {
let attachment = match bincode::serialize(&reply) {
Ok(attachment) => attachment,
Err(e) => {
Expand All @@ -353,8 +357,8 @@ async fn reply_to_query(query: &Query, reply: AlignmentReply, value: Option<Valu

let reply_fut = if let Some(value) = value {
query
.reply(query.key_expr(), value.payload)
.encoding(value.encoding)
.reply(query.key_expr(), value.0)
.encoding(value.1)
.attachment(attachment)
} else {
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use std::collections::{HashMap, HashSet};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLockWriteGuard;
use zenoh::{
internal::Value,
bytes::{Encoding, ZBytes},
key_expr::{format::keformat, keyexpr_tree::IKeyExprTreeMut, OwnedKeyExpr},
sample::{Sample, SampleKind},
sample::{Sample, SampleFields, SampleKind},
session::ZenohId,
Result as ZResult,
};
use zenoh_backend_traits::StorageInsertionResult;
use zenoh_backend_traits::{StorageInsertionResult, StoredData};

use crate::{
replication::{
Expand Down Expand Up @@ -295,7 +295,8 @@ impl Replication {
self.apply_wildcard_update(
&mut replication_log_guard,
&replica_event,
Value::empty(),
ZBytes::default(),
Encoding::default(),
)
.await;
}
Expand Down Expand Up @@ -358,19 +359,24 @@ impl Replication {
wildcard_delete_ke.clone(),
SampleKind::Delete,
replica_event.timestamp,
zenoh::internal::Value::empty(),
ZBytes::default(),
Encoding::default(),
)
.await;
}
Action::Put => {
let SampleFields {
payload, encoding, ..
} = sample.into();
if matches!(
self.storage_service
.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
payload,
encoding,
replica_event.timestamp,
)
.await,
Expand All @@ -385,10 +391,14 @@ impl Replication {
}
}
Action::WildcardPut(_) => {
let SampleFields {
payload, encoding, ..
} = sample.into();
self.apply_wildcard_update(
&mut replication_log_guard,
&replica_event,
sample.into(),
payload,
encoding,
)
.await;
}
Expand Down Expand Up @@ -544,7 +554,8 @@ impl Replication {
&self,
replication_log_guard: &mut RwLockWriteGuard<'_, LogLatest>,
replica_event: &EventMetadata,
value: Value,
payload: ZBytes,
encoding: Encoding,
) {
let (wildcard_ke, wildcard_kind) = match &replica_event.action {
Action::Put | Action::Delete => unreachable!(),
Expand Down Expand Up @@ -603,7 +614,8 @@ impl Replication {
.await
.put(
overridden_event.key_expr().clone(),
value.clone(),
payload.clone(),
encoding.clone(),
replica_event.timestamp,
)
.await,
Expand Down Expand Up @@ -664,7 +676,8 @@ impl Replication {
wildcard_ke.clone(),
(&replica_event.action).into(),
replica_event.timestamp,
value,
payload,
encoding,
)
.await;
}
Expand Down Expand Up @@ -724,7 +737,12 @@ impl Replication {
wildcard_ke: OwnedKeyExpr,
wildcard_update: Update,
) {
let wildcard_timestamp = *wildcard_update.timestamp();
let kind = wildcard_update.kind();
let StoredData {
payload,
encoding,
timestamp,
} = wildcard_update.into();

// A Wildcard Update overrides another Wildcard Update, we have nothing to do.
if matches!(
Expand All @@ -736,26 +754,27 @@ impl Replication {

tracing::trace!(
"Overriding < {replica_event:?} > with < {wildcard_ke} {} >",
wildcard_timestamp
timestamp
);

// Generate the action that will be used to override the metadata of the Event. We do it
// now to avoid having to clone the `wildcard_update` because we move it below.
let wildcard_action = match wildcard_update.kind() {
let wildcard_action = match kind {
SampleKind::Put => Action::WildcardPut(wildcard_ke),
SampleKind::Delete => Action::WildcardDelete(wildcard_ke),
};

if wildcard_update.kind() == SampleKind::Put
if kind == SampleKind::Put
&& matches!(
self.storage_service
.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
wildcard_update.into_value(),
wildcard_timestamp
payload,
encoding,
timestamp
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
Expand All @@ -772,7 +791,7 @@ impl Replication {
// `timestamp_last_non_wildcard_update`.
let mut event: Event = replica_event.into();
// Then update the Event with the values of the Wildcard Update.
event.set_timestamp_and_action(wildcard_timestamp, wildcard_action);
event.set_timestamp_and_action(timestamp, wildcard_action);

replication_log_guard.insert_event(event);
}
Expand Down
Loading

0 comments on commit ccb0964

Please sign in to comment.