diff --git a/Cargo.lock b/Cargo.lock index 94542bf63..4c1a9eec4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2374,6 +2374,15 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65" +[[package]] +name = "nolocal-block-on" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acd62b6605db2ca3eb1448fb02436ff88304555104ae38e74a8dc00ce94a877d" +dependencies = [ + "parking", +] + [[package]] name = "nom" version = "7.1.3" @@ -5446,6 +5455,7 @@ dependencies = [ "itertools 0.13.0", "json5", "lazy_static", + "nolocal-block-on", "once_cell", "paste", "petgraph", diff --git a/Cargo.toml b/Cargo.toml index eef6f1ef4..ad016c4df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,6 +120,7 @@ libloading = "0.8" tracing = "0.1" lockfree = "0.5" lz4_flex = "0.11" +nolocal-block-on = "1.0.1" nix = { version = "0.29.0", features = ["fs"] } num_cpus = "1.16.0" num-traits = { version = "0.2.19", default-features = false } diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 94f6d6eb4..656954561 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -79,6 +79,7 @@ git-version = { workspace = true } itertools = { workspace = true } json5 = { workspace = true } lazy_static = { workspace = true } +nolocal-block-on = { workspace = true } tracing = { workspace = true } paste = { workspace = true } petgraph = { workspace = true } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c5ba24b98..6d5690878 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -17,14 +17,15 @@ use std::{ collections::HashMap, convert::TryInto, fmt, + future::{Future, IntoFuture}, ops::Deref, + pin::Pin, sync::{ atomic::{AtomicU16, Ordering}, Arc, Mutex, RwLock, }, time::{Duration, SystemTime, UNIX_EPOCH}, }; - use tracing::{error, info, trace, warn}; use uhlc::Timestamp; #[cfg(feature = "internal")] @@ -32,7 +33,7 @@ use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; use zenoh_config::{unwrap_or_default, wrappers::ZenohId}; -use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; +use zenoh_core::{zconfigurable, zread, Resolvable, Resolve, ResolveClosure, ResolveFuture, Wait}; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ declare::{DeclareToken, SubscriberId, TokenId, UndeclareToken}, @@ -615,8 +616,8 @@ impl Session { /// subscriber_task.await.unwrap(); /// # } /// ``` - pub fn close(&self) -> impl Resolve> + '_ { - self.0.close() + pub fn close(&self) -> SessionCloseBuilder { + SessionCloseBuilder::new(self.0.clone()) } /// Check if the session has been closed. @@ -1073,38 +1074,39 @@ impl Session { }) } } + impl SessionInner { pub fn zid(&self) -> ZenohId { self.runtime.zid() } - fn close(&self) -> impl Resolve> + '_ { - ResolveFuture::new(async move { - let Some(primitives) = zwrite!(self.state).primitives.take() else { - return Ok(()); - }; - if self.owns_runtime { - info!(zid = %self.zid(), "close session"); - } - self.task_controller.terminate_all(Duration::from_secs(10)); - if self.owns_runtime { - self.runtime.close().await?; - } else { - primitives.send_close(); - } - let mut state = zwrite!(self.state); - state.queryables.clear(); - state.subscribers.clear(); - state.liveliness_subscribers.clear(); - state.local_resources.clear(); - state.remote_resources.clear(); - #[cfg(feature = "unstable")] - { - state.tokens.clear(); - state.matching_listeners.clear(); - } - Ok(()) - }) + async fn close(&self, _timeout: Duration) -> ZResult<()> { + let Some(primitives) = zwrite!(self.state).primitives.take() else { + return Ok(()); + }; + if self.owns_runtime { + info!(zid = %self.zid(), "close session"); + } + self.task_controller + .terminate_all_async(Duration::from_secs(10)) + .await; + if self.owns_runtime { + self.runtime.close().await? + } else { + primitives.send_close(); + } + let mut state = zwrite!(self.state); + state.queryables.clear(); + state.subscribers.clear(); + state.liveliness_subscribers.clear(); + state.local_resources.clear(); + state.remote_resources.clear(); + #[cfg(feature = "unstable")] + { + state.tokens.clear(); + state.matching_listeners.clear(); + } + Ok(()) } pub(crate) fn declare_prefix<'a>( @@ -2880,3 +2882,56 @@ where { OpenBuilder::new(config) } + +/// A builder for closing a [`crate::Session`]. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// session.close() +/// .timeout(std::time::Duration::from_secs(10)) +/// .await +/// .unwrap(); +/// # } +/// ``` + +pub struct SessionCloseBuilder { + inner: Arc, + timeout: Duration, +} + +impl SessionCloseBuilder { + fn new(inner: Arc) -> Self { + Self { + inner, + timeout: Duration::MAX, + } + } + + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +impl Resolvable for SessionCloseBuilder { + type To = ZResult<()>; +} + +impl Wait for SessionCloseBuilder { + fn wait(self) -> Self::To { + nolocal_block_on::block_on(self.into_future()) + } +} + +impl IntoFuture for SessionCloseBuilder { + type Output = ::To; + type IntoFuture = Pin::Output> + Send>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { self.inner.close(self.timeout).await }.into_future()) + } +}