diff --git a/zbus/src/connection/builder.rs b/zbus/src/connection/builder.rs index 28ffd18b6..3b9d71500 100644 --- a/zbus/src/connection/builder.rs +++ b/zbus/src/connection/builder.rs @@ -2,14 +2,11 @@ use async_io::Async; use event_listener::Event; use static_assertions::assert_impl_all; +use std::collections::{HashMap, HashSet, VecDeque}; #[cfg(not(feature = "tokio"))] use std::net::TcpStream; #[cfg(all(unix, not(feature = "tokio")))] use std::os::unix::net::UnixStream; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - sync::Arc, -}; #[cfg(feature = "tokio")] use tokio::net::TcpStream; #[cfg(all(unix, feature = "tokio"))] @@ -25,7 +22,6 @@ use zvariant::{ObjectPath, Str}; use crate::{ address::{self, Address}, - async_lock::RwLock, names::{InterfaceName, WellKnownName}, object_server::{ArcInterface, Interface}, Connection, Error, Executor, Guid, OwnedGuid, Result, @@ -304,6 +300,9 @@ impl<'a> Builder<'a> { /// interfaces available immediately after the connection is established. Typically, this is /// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will /// replace any previously added interface with the same name at the same path. + /// + /// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you + /// attempt to add yours, [`Builder::build()`] will fail. pub fn serve_at(mut self, path: P, iface: I) -> Result where I: Interface, @@ -312,8 +311,7 @@ impl<'a> Builder<'a> { { let path = path.try_into().map_err(Into::into)?; let entry = self.interfaces.entry(path).or_default(); - entry.insert(I::name(), ArcInterface(Arc::new(RwLock::new(iface)))); - + entry.insert(I::name(), ArcInterface::new(iface)); Ok(self) } @@ -452,12 +450,13 @@ impl<'a> Builder<'a> { let object_server = conn.sync_object_server(false, None); for (path, interfaces) in self.interfaces { for (name, iface) in interfaces { - let future = object_server + let added = object_server .inner() - .at_ready(path.to_owned(), name, || iface.0); - let added = future.await?; - // Duplicates shouldn't happen. - assert!(added); + .add_arc_interface(path.clone(), name.clone(), iface.clone()) + .await?; + if !added { + return Err(Error::InterfaceExists(name.clone(), path.to_owned())); + } } } diff --git a/zbus/src/connection/mod.rs b/zbus/src/connection/mod.rs index 81ce355d3..51998e810 100644 --- a/zbus/src/connection/mod.rs +++ b/zbus/src/connection/mod.rs @@ -971,41 +971,25 @@ impl Connection { // destination doesn't matter if no name has been registered // (probably means name it's registered through external means). if !names.is_empty() && !names.contains_key(dest) { - trace!("Got a method call for a different destination: {}", dest); + trace!( + "Got a method call for a different destination: {}", + dest + ); continue; } } } - let member = match hdr.member() { - Some(member) => member, - None => { - warn!("Got a method call with no `MEMBER` field: {}", msg); - - continue; - } - }; - trace!("Got `{}`. Will spawn a task for dispatch..", msg); - let executor = conn.inner.executor.clone(); - let task_name = format!("`{member}` method dispatcher"); - executor - .spawn( - async move { - trace!("spawned a task to dispatch `{}`.", msg); - let server = conn.object_server(); - if let Err(e) = server.dispatch_message(&msg).await { - debug!( - "Error dispatching message. Message: {:?}, error: {:?}", - msg, e - ); - } - } - .instrument(trace_span!("{}", task_name)), - &task_name, - ) - .detach(); + let server = conn.object_server(); + if let Err(e) = server.dispatch_call(&msg, &hdr).await { + debug!( + "Error dispatching message. Message: {:?}, error: {:?}", + msg, e + ); + } } else { - // If connection is completely gone, no reason to keep running the task anymore. + // If connection is completely gone, no reason to keep running the task + // anymore. trace!("Connection is gone, stopping associated object server task"); break; } diff --git a/zbus/src/error.rs b/zbus/src/error.rs index dfee91039..ded39d466 100644 --- a/zbus/src/error.rs +++ b/zbus/src/error.rs @@ -1,7 +1,7 @@ use static_assertions::assert_impl_all; use std::{convert::Infallible, error, fmt, io, sync::Arc}; -use zbus_names::{Error as NamesError, OwnedErrorName}; -use zvariant::Error as VariantError; +use zbus_names::{Error as NamesError, InterfaceName, OwnedErrorName}; +use zvariant::{Error as VariantError, ObjectPath}; use crate::{ fdo, @@ -59,6 +59,8 @@ pub enum Error { MissingParameter(&'static str), /// Serial number in the message header is 0 (which is invalid). InvalidSerial, + /// The given interface already exists at the given path. + InterfaceExists(InterfaceName<'static>, ObjectPath<'static>), } assert_impl_all!(Error: Send, Sync, Unpin); @@ -85,6 +87,7 @@ impl PartialEq for Error { (Self::NameTaken, Self::NameTaken) => true, (Error::InputOutput(_), Self::InputOutput(_)) => false, (Self::Failure(s1), Self::Failure(s2)) => s1 == s2, + (Self::InterfaceExists(s1, s2), Self::InterfaceExists(o1, o2)) => s1 == o1 && s2 == o2, (_, _) => false, } } @@ -113,6 +116,7 @@ impl error::Error for Error { Error::Failure(_) => None, Error::MissingParameter(_) => None, Error::InvalidSerial => None, + Error::InterfaceExists(_, _) => None, } } } @@ -147,6 +151,7 @@ impl fmt::Display for Error { write!(f, "Parameter `{}` was not specified but it is required", p) } Error::InvalidSerial => write!(f, "Serial number in the message header is 0"), + Error::InterfaceExists(i, p) => write!(f, "Interface `{i}` already exists at `{p}`"), } } } @@ -176,6 +181,7 @@ impl Clone for Error { Error::Failure(e) => Error::Failure(e.clone()), Error::MissingParameter(p) => Error::MissingParameter(p), Error::InvalidSerial => Error::InvalidSerial, + Error::InterfaceExists(i, p) => Error::InterfaceExists(i.clone(), p.clone()), } } } diff --git a/zbus/src/fdo.rs b/zbus/src/fdo.rs index 4258b3759..ebd39fc20 100644 --- a/zbus/src/fdo.rs +++ b/zbus/src/fdo.rs @@ -134,7 +134,7 @@ impl Properties { Error::UnknownInterface(format!("Unknown interface '{interface_name}'")) })?; - let res = iface.read().await.get(property_name).await; + let res = iface.instance.read().await.get(property_name).await; res.unwrap_or_else(|| { Err(Error::UnknownProperty(format!( "Unknown property '{property_name}'" @@ -160,7 +160,12 @@ impl Properties { Error::UnknownInterface(format!("Unknown interface '{interface_name}'")) })?; - match iface.read().await.set(property_name, &value, &ctxt) { + match iface + .instance + .read() + .await + .set(property_name, &value, &ctxt) + { zbus::object_server::DispatchResult::RequiresMut => {} zbus::object_server::DispatchResult::NotFound => { return Err(Error::UnknownProperty(format!( @@ -172,6 +177,7 @@ impl Properties { } } let res = iface + .instance .write() .await .set_mut(property_name, &value, &ctxt) @@ -198,7 +204,7 @@ impl Properties { Error::UnknownInterface(format!("Unknown interface '{interface_name}'")) })?; - let res = iface.read().await.get_all().await?; + let res = iface.instance.read().await.get_all().await?; Ok(res) } diff --git a/zbus/src/message/field.rs b/zbus/src/message/field.rs index 90df58c28..e6427fe0a 100644 --- a/zbus/src/message/field.rs +++ b/zbus/src/message/field.rs @@ -32,7 +32,7 @@ pub(super) enum FieldCode { ErrorName = 4, /// Code for [`Field::ReplySerial`](enum.Field.html#variant.ReplySerial) ReplySerial = 5, - /// Code for [`Field::Destinatione`](enum.Field.html#variant.Destination) + /// Code for [`Field::Destination`](enum.Field.html#variant.Destination) Destination = 6, /// Code for [`Field::Sender`](enum.Field.html#variant.Sender) Sender = 7, diff --git a/zbus/src/message/mod.rs b/zbus/src/message/mod.rs index de2f4b21c..46f8c5ab8 100644 --- a/zbus/src/message/mod.rs +++ b/zbus/src/message/mod.rs @@ -295,6 +295,7 @@ impl fmt::Debug for Message { let mut msg = f.debug_struct("Msg"); let h = self.header(); msg.field("type", &h.message_type()); + msg.field("serial", &self.primary_header().serial_num()); if let Some(sender) = h.sender() { msg.field("sender", &sender); } diff --git a/zbus/src/object_server/interface.rs b/zbus/src/object_server/interface.rs index 810ae58dd..3eaedcb19 100644 --- a/zbus/src/object_server/interface.rs +++ b/zbus/src/object_server/interface.rs @@ -72,6 +72,14 @@ pub trait Interface: Any + Send + Sync { where Self: Sized; + /// Whether each method call will be handled from a different spawned task. + /// + /// Note: When methods are called from separate tasks, they may not be run in the order in which + /// they were called. + fn spawn_tasks_for_methods(&self) -> bool { + true + } + /// Get a property value. Returns `None` if the property doesn't exist. async fn get(&self, property_name: &str) -> Option>; @@ -134,9 +142,26 @@ pub trait Interface: Any + Send + Sync { fn introspect_to_writer(&self, writer: &mut dyn Write, level: usize); } -/// A newtype for a reference counted Interface trait-object, with a manual Debug impl. +/// A type for a reference counted Interface trait-object, with associated run-time details and a +/// manual Debug impl. #[derive(Clone)] -pub(crate) struct ArcInterface(pub(crate) Arc>); +pub(crate) struct ArcInterface { + pub instance: Arc>, + pub spawn_tasks_for_methods: bool, +} + +impl ArcInterface { + pub fn new(iface: I) -> Self + where + I: Interface, + { + let spawn_tasks_for_methods = iface.spawn_tasks_for_methods(); + Self { + instance: Arc::new(RwLock::new(iface)), + spawn_tasks_for_methods, + } + } +} impl fmt::Debug for ArcInterface { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/zbus/src/object_server/mod.rs b/zbus/src/object_server/mod.rs index 098508409..cf03a11fe 100644 --- a/zbus/src/object_server/mod.rs +++ b/zbus/src/object_server/mod.rs @@ -9,7 +9,7 @@ use std::{ ops::{Deref, DerefMut}, sync::Arc, }; -use tracing::{debug, instrument, trace}; +use tracing::{debug, instrument, trace, trace_span, Instrument}; use static_assertions::assert_impl_all; use zbus_names::InterfaceName; @@ -20,7 +20,7 @@ use crate::{ connection::WeakConnection, fdo, fdo::{Introspectable, ManagedObjects, ObjectManager, Peer, Properties}, - message::Message, + message::{Header, Message}, Connection, Error, Result, }; @@ -194,11 +194,9 @@ impl Node { path, ..Default::default() }; - node.at(Peer::name(), || Arc::new(RwLock::new(Peer))); - node.at(Introspectable::name(), || { - Arc::new(RwLock::new(Introspectable)) - }); - node.at(Properties::name(), || Arc::new(RwLock::new(Properties))); + debug_assert!(node.add_interface(Peer)); + debug_assert!(node.add_interface(Introspectable)); + debug_assert!(node.add_interface(Properties)); node } @@ -258,11 +256,8 @@ impl Node { (Some(node), obj_manager_path) } - pub(crate) fn interface_lock( - &self, - interface_name: InterfaceName<'_>, - ) -> Option>> { - self.interfaces.get(&interface_name).map(|x| x.0.clone()) + pub(crate) fn interface_lock(&self, interface_name: InterfaceName<'_>) -> Option { + self.interfaces.get(&interface_name).cloned() } fn remove_interface(&mut self, interface_name: InterfaceName<'static>) -> bool { @@ -282,18 +277,21 @@ impl Node { self.children.remove(node).is_some() } - // Takes a closure so caller can avoid having to create an Arc & RwLock in case interface was - // already added. - fn at(&mut self, name: InterfaceName<'static>, iface_creator: F) -> bool - where - F: FnOnce() -> Arc>, - { + fn add_arc_interface(&mut self, name: InterfaceName<'static>, arc_iface: ArcInterface) -> bool { match self.interfaces.entry(name) { - Entry::Vacant(e) => e.insert(ArcInterface(iface_creator())), - Entry::Occupied(_) => return false, - }; + Entry::Vacant(e) => { + e.insert(arc_iface); + true + } + Entry::Occupied(_) => false, + } + } - true + fn add_interface(&mut self, iface: I) -> bool + where + I: Interface, + { + self.add_arc_interface(I::name(), ArcInterface::new(iface)) } async fn introspect_to_writer(&self, writer: &mut W) { @@ -352,7 +350,11 @@ impl Node { } for iface in node.interfaces.values() { - iface.0.read().await.introspect_to_writer(writer, level + 2); + iface + .instance + .read() + .await + .introspect_to_writer(writer, level + 2); } } Fragment::End { level } => { @@ -400,6 +402,7 @@ impl Node { ) -> fdo::Result> { self.interface_lock(interface_name) .expect("Interface was added but not found") + .instance .read() .await .get_all() @@ -499,28 +502,25 @@ impl ObjectServer { P: TryInto>, P::Error: Into, { - self.at_ready(path, I::name(), move || Arc::new(RwLock::new(iface))) + self.add_arc_interface(path, I::name(), ArcInterface::new(iface)) .await } - /// Same as `at` but expects an interface already in `Arc>` form. - // FIXME: Better name? - pub(crate) async fn at_ready<'node, 'p, P, F>( - &'node self, + pub(crate) async fn add_arc_interface<'p, P>( + &self, path: P, name: InterfaceName<'static>, - iface_creator: F, + arc_iface: ArcInterface, ) -> Result where P: TryInto>, P::Error: Into, - F: FnOnce() -> Arc>, { let path = path.try_into().map_err(Into::into)?; let mut root = self.root().write().await; let (node, manager_path) = root.get_child_mut(&path, true); let node = node.unwrap(); - let added = node.at(name.clone(), iface_creator); + let added = node.add_arc_interface(name.clone(), arc_iface); if added { if name == ObjectManager::name() { // Just added an object manager. Need to signal all managed objects under it. @@ -647,6 +647,7 @@ impl ObjectServer { let lock = node .interface_lock(I::name()) .ok_or(Error::InterfaceNotFound)? + .instance .clone(); // Ensure what we return can later be dowcasted safely. @@ -666,40 +667,19 @@ impl ObjectServer { }) } - #[instrument(skip(self, connection))] - async fn dispatch_method_call_try( + async fn dispatch_call_to_iface( &self, + iface: Arc>, connection: &Connection, msg: &Message, - ) -> fdo::Result> { - let hdr = msg.header(); - let path = hdr - .path() - .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?; - let iface_name = hdr - .interface() - // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same - // object have a method with the same name, it is undefined which of those - // methods will be invoked. Implementations may choose to either return an - // error, or deliver the message as though it had an arbitrary one of those - // interfaces. - .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?; + hdr: &Header<'_>, + ) -> fdo::Result<()> { let member = hdr .member() .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?; - - // Ensure the root lock isn't held while dispatching the message. That - // way, the object server can be mutated during that time. - let iface = { - let root = self.root.read().await; - let node = root - .get_child(path) - .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?; - - node.interface_lock(iface_name.as_ref()).ok_or_else(|| { - fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'")) - })? - }; + let iface_name = hdr + .interface() + .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?; trace!("acquiring read lock on interface `{}`", iface_name); let read_lock = iface.read().await; @@ -711,7 +691,10 @@ impl ObjectServer { ))); } DispatchResult::Async(f) => { - return Ok(f.await); + return f.await.map_err(|e| match e { + Error::FDO(e) => *e, + e => fdo::Error::Failed(format!("{e}")), + }); } DispatchResult::RequiresMut => {} } @@ -723,7 +706,10 @@ impl ObjectServer { DispatchResult::NotFound => {} DispatchResult::RequiresMut => {} DispatchResult::Async(f) => { - return Ok(f.await); + return f.await.map_err(|e| match e { + Error::FDO(e) => *e, + e => fdo::Error::Failed(format!("{e}")), + }); } } drop(write_lock); @@ -732,16 +718,67 @@ impl ObjectServer { ))) } - #[instrument(skip(self, connection))] - async fn dispatch_method_call(&self, connection: &Connection, msg: &Message) -> Result<()> { - match self.dispatch_method_call_try(connection, msg).await { - Err(e) => { - let hdr = msg.header(); - debug!("Returning error: {}", e); - connection.reply_dbus_error(&hdr, e).await?; - Ok(()) - } - Ok(r) => r, + async fn dispatch_method_call_try( + &self, + connection: &Connection, + msg: &Message, + hdr: &Header<'_>, + ) -> fdo::Result<()> { + let path = hdr + .path() + .ok_or_else(|| fdo::Error::Failed("Missing object path".into()))?; + let iface_name = hdr + .interface() + // TODO: In the absence of an INTERFACE field, if two or more interfaces on the same + // object have a method with the same name, it is undefined which of those + // methods will be invoked. Implementations may choose to either return an + // error, or deliver the message as though it had an arbitrary one of those + // interfaces. + .ok_or_else(|| fdo::Error::Failed("Missing interface".into()))?; + // Check that the message has a member before spawning. + // Note that an unknown member will still spawn a task. We should instead gather + // all the details for the call before spawning. + // See also https://github.com/dbus2/zbus/issues/674 for future of Interface. + let _ = hdr + .member() + .ok_or_else(|| fdo::Error::Failed("Missing member".into()))?; + + // Ensure the root lock isn't held while dispatching the message. That + // way, the object server can be mutated during that time. + let (iface, with_spawn) = { + let root = self.root.read().await; + let node = root + .get_child(path) + .ok_or_else(|| fdo::Error::UnknownObject(format!("Unknown object '{path}'")))?; + + let iface = node.interface_lock(iface_name.as_ref()).ok_or_else(|| { + fdo::Error::UnknownInterface(format!("Unknown interface '{iface_name}'")) + })?; + (iface.instance, iface.spawn_tasks_for_methods) + }; + + if with_spawn { + let executor = connection.executor().clone(); + let task_name = format!("`{msg}` method dispatcher"); + let connection = connection.clone(); + let msg = msg.clone(); + executor + .spawn( + async move { + let server = connection.object_server(); + let hdr = msg.header(); + server + .dispatch_call_to_iface(iface, &connection, &msg, &hdr) + .await + } + .instrument(trace_span!("{}", task_name)), + &task_name, + ) + .detach(); + Ok(()) + } else { + self.dispatch_call_to_iface(iface, connection, msg, hdr) + .await } } @@ -756,14 +793,18 @@ impl ObjectServer { /// - returning a message (responding to the caller with either a return or error message) to /// the caller through the associated server connection. /// - /// Returns an error if the message is malformed, true if it's handled, false otherwise. + /// Returns an error if the message is malformed. #[instrument(skip(self))] - pub(crate) async fn dispatch_message(&self, msg: &Message) -> Result { + pub(crate) async fn dispatch_call(&self, msg: &Message, hdr: &Header<'_>) -> Result<()> { let conn = self.connection(); - self.dispatch_method_call(&conn, msg).await?; + + if let Err(e) = self.dispatch_method_call_try(&conn, msg, hdr).await { + debug!("Returning error: {}", e); + conn.reply_dbus_error(hdr, e).await?; + } trace!("Handled: {}", msg); - Ok(true) + Ok(()) } pub(crate) fn connection(&self) -> Connection { diff --git a/zbus_macros/src/iface.rs b/zbus_macros/src/iface.rs index 6e86f69e9..0fae06a8d 100644 --- a/zbus_macros/src/iface.rs +++ b/zbus_macros/src/iface.rs @@ -18,7 +18,8 @@ pub mod old { pub TraitAttributes("trait") { interface str, - name str + name str, + spawn bool }; pub MethodAttributes("method") { @@ -39,7 +40,8 @@ def_attrs! { pub TraitAttributes("trait") { interface str, - name str + name str, + spawn bool }; pub MethodAttributes("method") { @@ -300,13 +302,13 @@ pub fn expand, M: AttrParse + Into> _ => return Err(Error::new_spanned(&input.self_ty, "Invalid type")), }; - let iface_name = - { - let (name, interface) = match T::parse_nested_metas(&args)?.into() { - TraitAttrs::New(new) => (new.name, new.interface), - TraitAttrs::Old(old) => (old.name, old.interface), - }; + let (iface_name, with_spawn) = { + let (name, interface, spawn) = match T::parse_nested_metas(&args)?.into() { + TraitAttrs::New(new) => (new.name, new.interface, new.spawn), + TraitAttrs::Old(old) => (old.name, old.interface, old.spawn), + }; + let name = match (name, interface) { (Some(name), None) | (None, Some(name)) => name, (None, None) => format!("org.freedesktop.{ty}"), @@ -314,8 +316,10 @@ pub fn expand, M: AttrParse + Into> input.span(), "`name` and `interface` attributes should not be specified at the same time", )), - } - }; + }; + + (name, !spawn.unwrap_or(true)) + }; // Store parsed information about each method let mut methods = vec![]; @@ -713,6 +717,10 @@ pub fn expand, M: AttrParse + Into> #zbus::names::InterfaceName::from_static_str_unchecked(#iface_name) } + fn spawn_tasks_for_methods(&self) -> bool { + #with_spawn + } + async fn get( &self, property_name: &str, diff --git a/zbus_macros/src/lib.rs b/zbus_macros/src/lib.rs index 973189686..78126bd10 100644 --- a/zbus_macros/src/lib.rs +++ b/zbus_macros/src/lib.rs @@ -215,6 +215,25 @@ pub fn dbus_proxy(attr: TokenStream, item: TokenStream) -> TokenStream { /// properties or signal depending on the item attributes. It will implement the [`Interface`] trait /// `for T` on your behalf, to handle the message dispatching and introspection support. /// +/// The trait accepts the `interface` attributes: +/// +/// * `name` - the D-Bus interface name +/// +/// * `spawn` - Controls the spawning of tasks for method calls. By default, `true`, allowing zbus +/// to spawn a separate task for each method call. This default behavior can lead to methods being +/// handled out of their received order, which might not always align with expected or desired +/// behavior. +/// +/// - **When True (Default):** Suitable for interfaces where method calls are independent of each +/// other or can be processed asynchronously without strict ordering. In scenarios where a client +/// must wait for a reply before making further dependent calls, this default behavior is +/// appropriate. +/// +/// - **When False:** Use this setting to ensure methods are handled in the order they are +/// received, which is crucial for interfaces requiring sequential processing of method calls. +/// However, care must be taken to avoid making D-Bus method calls from within your interface +/// methods when this setting is false, as it may lead to deadlocks under certain conditions. +/// /// The methods accepts the `interface` attributes: /// /// * `name` - override the D-Bus name (pascal case form of the method by default) diff --git a/zbus_macros/tests/tests.rs b/zbus_macros/tests/tests.rs index 3ba2f2e75..c2ee628b9 100644 --- a/zbus_macros/tests/tests.rs +++ b/zbus_macros/tests/tests.rs @@ -139,7 +139,7 @@ fn test_interface() { #[derive(Serialize, Deserialize, Type, Value)] struct MyCustomPropertyType(u32); - #[interface(name = "org.freedesktop.zbus.Test")] + #[interface(name = "org.freedesktop.zbus.Test", spawn = false)] impl Test where T: serde::ser::Serialize + zbus::zvariant::Type + Send + Sync,