Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update up-client-zenoh-rust with up-spec 1.5.7 #21

Merged
merged 27 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1ff329a
Port to up-rust latest version (up-spec 1.5.7)
evshary Mar 26, 2024
dcb3fb0
Use better way to create uAttributes.
evshary Mar 26, 2024
c21b99c
Update how to log the errors.
evshary Mar 26, 2024
93826d9
Fix running test errors.
evshary Mar 26, 2024
1dce54b
Add document to the contstructor.
evshary Mar 27, 2024
4a9cf24
Add some comments to the code for easy understanding.
evshary Mar 27, 2024
51a1bd2
Reorganize the test files.
evshary Mar 27, 2024
c1b5c14
Fix clippy warning in tests.
evshary Mar 27, 2024
d9262db
Update notification test.
evshary Mar 27, 2024
da3ca4d
Update the comments in tests.
evshary Mar 27, 2024
2e97832
gse correct source address from the constructor.
evshary Mar 28, 2024
d067c6c
Add request and response test.
evshary Mar 28, 2024
bde5232
Use API to simplify creating UMessage.
evshary Mar 28, 2024
347743d
Fix the comments.
evshary Mar 29, 2024
8a9fb6f
Remove some wrong TODO items.
evshary Mar 29, 2024
dee0951
Show the minimum UAttributes version.
evshary Mar 29, 2024
7e9bccb
ghange the publish & notification tests.
evshary Mar 29, 2024
4b0d0e7
Update the notification test.
evshary Mar 30, 2024
784ed68
Comment some code to pass the test for the time being.
evshary Mar 30, 2024
c92aa17
Rename register_publish_listener to register_publish_notification_lis…
evshary Mar 30, 2024
6a2796d
Rename uuri to source_uuri.
evshary Mar 30, 2024
21c32db
Use UAuthority and UEntity instead of UUri.
evshary Mar 30, 2024
c71af31
Update the constructors.
evshary Mar 30, 2024
64dec84
Remove Zenoh key in send_response to avoid the misunderstanding.
evshary Mar 30, 2024
c8c718c
Bump up-rust to the latest version with notification validator patch.
evshary Apr 2, 2024
07eb32f
Add validation of UAuthority and UEntity.
evshary Apr 3, 2024
547b9ac
Rename send function and also remove the TODO.
evshary Apr 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ log = "0.4.17"
prost = "0.12"
prost-types = "0.12"
protobuf = { version = "3.3" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "68c8a1d94f0006daf4ba135c9cbbfddcd793108d" }
rand = "0.8.5"
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "953ac0e8da3607330d6df183437114b916fd2879" }
zenoh = { version = "0.10.1-rc", features = ["unstable"]}
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Rust UPClient implementation for the Zenoh transport

# Build
## Build

```shell
# Check clippy
Expand All @@ -13,6 +13,6 @@ cargo build
cargo test
```

# Note
## Note

The implementation follows the spec defined in [up-l1/zenoh](https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/zenoh.adoc).
165 changes: 127 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, Mutex},
};
use up_rust::uprotocol::{UAttributes, UCode, UMessage, UPayloadFormat, UPriority, UStatus, UUri};
use up_rust::{
UAttributes, UCode, UEntity, UMessage, UPayloadFormat, UPriority, UResourceBuilder, UStatus,
UUri,
};
use zenoh::{
config::Config,
prelude::r#async::*,
Expand All @@ -43,18 +46,86 @@ pub struct UPClientZenoh {
query_map: Arc<Mutex<HashMap<String, Query>>>,
// Save the callback for RPC response
rpc_callback_map: Arc<Mutex<HashMap<String, Arc<UtransportListener>>>>,
// Used to identify different callback
callback_counter: AtomicU64,
// Source UUri in RPC
source_uuri: UUri,
evshary marked this conversation as resolved.
Show resolved Hide resolved
}

impl UPClientZenoh {
/// Create `UPClientZenoh` by applying the Zenoh configuration.
/// The `UUri` will be generated randomly.
///
/// # Arguments
///
/// * `config` - Zenoh configuration. You can refer to [here](https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5) for more configuration details.
///
/// # Errors
/// Will return `Err` if unable to create Zenoh session
/// Will return `Err` if unable to create `UPClientZenoh`
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// use up_client_zenoh::UPClientZenoh;
/// use zenoh::config::Config;
/// let upclient = UPClientZenoh::new(Config::default()).await.unwrap();
/// # });
/// ```
pub async fn new(config: Config) -> Result<UPClientZenoh, UStatus> {
evshary marked this conversation as resolved.
Show resolved Hide resolved
let uuri = UUri {
entity: Some(UEntity {
name: "default.entity".to_string(),
id: Some(u32::from(rand::random::<u16>())),
version_major: Some(1),
version_minor: None,
..Default::default()
})
.into(),
..Default::default()
};
UPClientZenoh::new_with_uuri(config, uuri).await
}

/// Create `UPClientZenoh` by applying the Zenoh configuration and self-defined `UUri`.
///
/// # Arguments
///
/// * `config` - Zenoh configuration. You can refer to [here](https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5) for more configuration details.
/// * `source_uuri` - The `UUri` which is put in source while sending RPC request.
///
/// # Errors
/// Will return `Err` if unable to create `UPClientZenoh`
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// use up_client_zenoh::UPClientZenoh;
/// use up_rust::{UEntity, UUri};
/// use zenoh::config::Config;
/// let uuri = UUri {
/// entity: Some(UEntity {
/// name: "default.entity".to_string(),
/// id: Some(u32::from(rand::random::<u16>())),
/// version_major: Some(1),
/// version_minor: None,
/// ..Default::default()
/// })
/// .into(),
/// ..Default::default()
/// };
/// let upclient = UPClientZenoh::new_with_uuri(Config::default(), uuri).await.unwrap();
/// # });
/// ```
pub async fn new_with_uuri(
config: Config,
source_uuri: UUri,
) -> Result<UPClientZenoh, UStatus> {
let Ok(session) = zenoh::open(config).res().await else {
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to open Zenoh session",
));
let msg = "Unable to open Zenoh session".to_string();
log::error!("{msg}");
evshary marked this conversation as resolved.
Show resolved Hide resolved
return Err(UStatus::fail_with_code(UCode::INTERNAL, msg));
};
Ok(UPClientZenoh {
session: Arc::new(session),
Expand All @@ -63,25 +134,46 @@ impl UPClientZenoh {
query_map: Arc::new(Mutex::new(HashMap::new())),
rpc_callback_map: Arc::new(Mutex::new(HashMap::new())),
callback_counter: AtomicU64::new(0),
source_uuri,
})
}

/// Get the `UUri` of `UPClientZenoh` in for RPC response
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// use up_client_zenoh::UPClientZenoh;
/// use up_rust::{UUri, UriValidator};
/// use zenoh::config::Config;
/// let upclient = UPClientZenoh::new(Config::default()).await.unwrap();
/// let uuri = upclient.get_response_uuri();
/// assert!(uuri.authority.is_none());
/// assert!(UriValidator::is_rpc_response(&uuri));
/// assert_eq!(uuri.entity.unwrap().name, "default.entity");
/// # });
/// ```
pub fn get_response_uuri(&self) -> UUri {
let mut source = self.source_uuri.clone();
source.resource = Some(UResourceBuilder::for_rpc_response()).into();
source
}

fn get_uauth_from_uuri(uri: &UUri) -> Result<String, UStatus> {
if let Some(authority) = uri.authority.as_ref() {
let buf: Vec<u8> = authority.try_into().map_err(|_| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to transform UAuthority into micro form",
)
let msg = "Unable to transform UAuthority into micro form".to_string();
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
Ok(buf
.iter()
.fold(String::new(), |s, c| s + &format!("{c:02x}")))
} else {
Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Empty UAuthority",
))
let msg = "UAuthority is empty".to_string();
log::error!("{msg}");
Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg))
}
}

Expand All @@ -90,11 +182,10 @@ impl UPClientZenoh {
if uri.authority.is_some() && uri.entity.is_none() && uri.resource.is_none() {
Ok(String::from("upr/") + &UPClientZenoh::get_uauth_from_uuri(uri)? + "/**")
} else {
let micro_uuri: Vec<u8> = uri.try_into().map_err(|_| {
UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to serialize into micro format",
)
let micro_uuri: Vec<u8> = uri.try_into().map_err(|e| {
let msg = format!("Unable to serialize into micro format: {e}");
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
// If the UUri is larger than 8 bytes, then it should be remote UUri with UAuthority
// We should prepend it to the Zenoh key.
Expand Down Expand Up @@ -148,30 +239,28 @@ impl UPClientZenoh {
fn attachment_to_uattributes(attachment: &Attachment) -> anyhow::Result<UAttributes> {
let mut attachment_iter = attachment.iter();
if let Some((_, value)) = attachment_iter.next() {
let version = *value.as_slice().first().ok_or(UStatus::fail_with_code(
UCode::INTERNAL,
"uAttributes version is empty",
))?;
if version != 1 {
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"uAttributes version should be 1",
)
.into());
let version = *value.as_slice().first().ok_or_else(|| {
let msg = format!("UAttributes version is empty (should be {UATTRIBUTE_VERSION})");
log::error!("{msg}");
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg)
})?;
if version != UATTRIBUTE_VERSION {
let msg =
format!("UAttributes version is {version} (should be {UATTRIBUTE_VERSION})");
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg).into());
}
} else {
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to get the uAttributes version",
)
.into());
let msg = "Unable to get the UAttributes version".to_string();
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg).into());
}
let uattributes = if let Some((_, value)) = attachment_iter.next() {
UAttributes::parse_from_bytes(value.as_slice())?
} else {
return Err(
UStatus::fail_with_code(UCode::INTERNAL, "Unable to get the uAttributes").into(),
);
let msg = "Unable to get the UAttributes".to_string();
log::error!("{msg}");
return Err(UStatus::fail_with_code(UCode::INVALID_ARGUMENT, msg).into());
};
Ok(uattributes)
}
Expand All @@ -180,7 +269,7 @@ impl UPClientZenoh {
#[cfg(test)]
mod tests {
use super::*;
use up_rust::uprotocol::{uri::uauthority::Number, UAuthority, UEntity, UResource, UUri};
use up_rust::{Number, UAuthority, UEntity, UResource, UUri};

#[test]
fn test_to_zenoh_key_string() {
Expand Down
Loading