Skip to content

Commit

Permalink
Merge pull request #675 from elmarco/nospawn
Browse files Browse the repository at this point in the history
Make task spawning for method call optional
  • Loading branch information
zeenix authored Apr 28, 2024
2 parents 1429e77 + 52ae81b commit f4958e3
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 135 deletions.
23 changes: 11 additions & 12 deletions zbus/src/connection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@
use async_io::Async;
use event_listener::Event;
use static_assertions::assert_impl_all;
use std::collections::{HashMap, HashSet, VecDeque};
#[cfg(not(feature = "tokio"))]
use std::net::TcpStream;
#[cfg(all(unix, not(feature = "tokio")))]
use std::os::unix::net::UnixStream;
use std::{
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
};
#[cfg(feature = "tokio")]
use tokio::net::TcpStream;
#[cfg(all(unix, feature = "tokio"))]
Expand All @@ -25,7 +22,6 @@ use zvariant::{ObjectPath, Str};

use crate::{
address::{self, Address},
async_lock::RwLock,
names::{InterfaceName, WellKnownName},
object_server::{ArcInterface, Interface},
Connection, Error, Executor, Guid, OwnedGuid, Result,
Expand Down Expand Up @@ -304,6 +300,9 @@ impl<'a> Builder<'a> {
/// interfaces available immediately after the connection is established. Typically, this is
/// exactly what you'd want. Also in contrast to [`zbus::ObjectServer::at`], this method will
/// replace any previously added interface with the same name at the same path.
///
/// Standard interfaces (Peer, Introspectable, Properties) are added on your behalf. If you
/// attempt to add yours, [`Builder::build()`] will fail.
pub fn serve_at<P, I>(mut self, path: P, iface: I) -> Result<Self>
where
I: Interface,
Expand All @@ -312,8 +311,7 @@ impl<'a> Builder<'a> {
{
let path = path.try_into().map_err(Into::into)?;
let entry = self.interfaces.entry(path).or_default();
entry.insert(I::name(), ArcInterface(Arc::new(RwLock::new(iface))));

entry.insert(I::name(), ArcInterface::new(iface));
Ok(self)
}

Expand Down Expand Up @@ -452,12 +450,13 @@ impl<'a> Builder<'a> {
let object_server = conn.sync_object_server(false, None);
for (path, interfaces) in self.interfaces {
for (name, iface) in interfaces {
let future = object_server
let added = object_server
.inner()
.at_ready(path.to_owned(), name, || iface.0);
let added = future.await?;
// Duplicates shouldn't happen.
assert!(added);
.add_arc_interface(path.clone(), name.clone(), iface.clone())
.await?;
if !added {
return Err(Error::InterfaceExists(name.clone(), path.to_owned()));
}
}
}

Expand Down
42 changes: 13 additions & 29 deletions zbus/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,41 +971,25 @@ impl Connection {
// destination doesn't matter if no name has been registered
// (probably means name it's registered through external means).
if !names.is_empty() && !names.contains_key(dest) {
trace!("Got a method call for a different destination: {}", dest);
trace!(
"Got a method call for a different destination: {}",
dest
);

continue;
}
}
}
let member = match hdr.member() {
Some(member) => member,
None => {
warn!("Got a method call with no `MEMBER` field: {}", msg);

continue;
}
};
trace!("Got `{}`. Will spawn a task for dispatch..", msg);
let executor = conn.inner.executor.clone();
let task_name = format!("`{member}` method dispatcher");
executor
.spawn(
async move {
trace!("spawned a task to dispatch `{}`.", msg);
let server = conn.object_server();
if let Err(e) = server.dispatch_message(&msg).await {
debug!(
"Error dispatching message. Message: {:?}, error: {:?}",
msg, e
);
}
}
.instrument(trace_span!("{}", task_name)),
&task_name,
)
.detach();
let server = conn.object_server();
if let Err(e) = server.dispatch_call(&msg, &hdr).await {
debug!(
"Error dispatching message. Message: {:?}, error: {:?}",
msg, e
);
}
} else {
// If connection is completely gone, no reason to keep running the task anymore.
// If connection is completely gone, no reason to keep running the task
// anymore.
trace!("Connection is gone, stopping associated object server task");
break;
}
Expand Down
10 changes: 8 additions & 2 deletions zbus/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use static_assertions::assert_impl_all;
use std::{convert::Infallible, error, fmt, io, sync::Arc};
use zbus_names::{Error as NamesError, OwnedErrorName};
use zvariant::Error as VariantError;
use zbus_names::{Error as NamesError, InterfaceName, OwnedErrorName};
use zvariant::{Error as VariantError, ObjectPath};

use crate::{
fdo,
Expand Down Expand Up @@ -59,6 +59,8 @@ pub enum Error {
MissingParameter(&'static str),
/// Serial number in the message header is 0 (which is invalid).
InvalidSerial,
/// The given interface already exists at the given path.
InterfaceExists(InterfaceName<'static>, ObjectPath<'static>),
}

assert_impl_all!(Error: Send, Sync, Unpin);
Expand All @@ -85,6 +87,7 @@ impl PartialEq for Error {
(Self::NameTaken, Self::NameTaken) => true,
(Error::InputOutput(_), Self::InputOutput(_)) => false,
(Self::Failure(s1), Self::Failure(s2)) => s1 == s2,
(Self::InterfaceExists(s1, s2), Self::InterfaceExists(o1, o2)) => s1 == o1 && s2 == o2,
(_, _) => false,
}
}
Expand Down Expand Up @@ -113,6 +116,7 @@ impl error::Error for Error {
Error::Failure(_) => None,
Error::MissingParameter(_) => None,
Error::InvalidSerial => None,
Error::InterfaceExists(_, _) => None,
}
}
}
Expand Down Expand Up @@ -147,6 +151,7 @@ impl fmt::Display for Error {
write!(f, "Parameter `{}` was not specified but it is required", p)
}
Error::InvalidSerial => write!(f, "Serial number in the message header is 0"),
Error::InterfaceExists(i, p) => write!(f, "Interface `{i}` already exists at `{p}`"),
}
}
}
Expand Down Expand Up @@ -176,6 +181,7 @@ impl Clone for Error {
Error::Failure(e) => Error::Failure(e.clone()),
Error::MissingParameter(p) => Error::MissingParameter(p),
Error::InvalidSerial => Error::InvalidSerial,
Error::InterfaceExists(i, p) => Error::InterfaceExists(i.clone(), p.clone()),
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions zbus/src/fdo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl Properties {
Error::UnknownInterface(format!("Unknown interface '{interface_name}'"))
})?;

let res = iface.read().await.get(property_name).await;
let res = iface.instance.read().await.get(property_name).await;
res.unwrap_or_else(|| {
Err(Error::UnknownProperty(format!(
"Unknown property '{property_name}'"
Expand All @@ -160,7 +160,12 @@ impl Properties {
Error::UnknownInterface(format!("Unknown interface '{interface_name}'"))
})?;

match iface.read().await.set(property_name, &value, &ctxt) {
match iface
.instance
.read()
.await
.set(property_name, &value, &ctxt)
{
zbus::object_server::DispatchResult::RequiresMut => {}
zbus::object_server::DispatchResult::NotFound => {
return Err(Error::UnknownProperty(format!(
Expand All @@ -172,6 +177,7 @@ impl Properties {
}
}
let res = iface
.instance
.write()
.await
.set_mut(property_name, &value, &ctxt)
Expand All @@ -198,7 +204,7 @@ impl Properties {
Error::UnknownInterface(format!("Unknown interface '{interface_name}'"))
})?;

let res = iface.read().await.get_all().await?;
let res = iface.instance.read().await.get_all().await?;
Ok(res)
}

Expand Down
2 changes: 1 addition & 1 deletion zbus/src/message/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(super) enum FieldCode {
ErrorName = 4,
/// Code for [`Field::ReplySerial`](enum.Field.html#variant.ReplySerial)
ReplySerial = 5,
/// Code for [`Field::Destinatione`](enum.Field.html#variant.Destination)
/// Code for [`Field::Destination`](enum.Field.html#variant.Destination)
Destination = 6,
/// Code for [`Field::Sender`](enum.Field.html#variant.Sender)
Sender = 7,
Expand Down
1 change: 1 addition & 0 deletions zbus/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ impl fmt::Debug for Message {
let mut msg = f.debug_struct("Msg");
let h = self.header();
msg.field("type", &h.message_type());
msg.field("serial", &self.primary_header().serial_num());
if let Some(sender) = h.sender() {
msg.field("sender", &sender);
}
Expand Down
29 changes: 27 additions & 2 deletions zbus/src/object_server/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ pub trait Interface: Any + Send + Sync {
where
Self: Sized;

/// Whether each method call will be handled from a different spawned task.
///
/// Note: When methods are called from separate tasks, they may not be run in the order in which
/// they were called.
fn spawn_tasks_for_methods(&self) -> bool {
true
}

/// Get a property value. Returns `None` if the property doesn't exist.
async fn get(&self, property_name: &str) -> Option<fdo::Result<OwnedValue>>;

Expand Down Expand Up @@ -134,9 +142,26 @@ pub trait Interface: Any + Send + Sync {
fn introspect_to_writer(&self, writer: &mut dyn Write, level: usize);
}

/// A newtype for a reference counted Interface trait-object, with a manual Debug impl.
/// A type for a reference counted Interface trait-object, with associated run-time details and a
/// manual Debug impl.
#[derive(Clone)]
pub(crate) struct ArcInterface(pub(crate) Arc<RwLock<dyn Interface>>);
pub(crate) struct ArcInterface {
pub instance: Arc<RwLock<dyn Interface>>,
pub spawn_tasks_for_methods: bool,
}

impl ArcInterface {
pub fn new<I>(iface: I) -> Self
where
I: Interface,
{
let spawn_tasks_for_methods = iface.spawn_tasks_for_methods();
Self {
instance: Arc::new(RwLock::new(iface)),
spawn_tasks_for_methods,
}
}
}

impl fmt::Debug for ArcInterface {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
Loading

0 comments on commit f4958e3

Please sign in to comment.