Skip to content

Commit

Permalink
Ignore broken pipe error from zbus
Browse files Browse the repository at this point in the history
  • Loading branch information
DataTriny committed Dec 27, 2023
1 parent d6394ba commit e38a55e
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 32 deletions.
14 changes: 7 additions & 7 deletions platforms/unix/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl AdapterImpl {
is_window_focused: bool,
root_window_bounds: WindowBounds,
action_handler: Box<dyn ActionHandler + Send>,
) -> Option<Self> {
) -> Self {
let tree = Tree::new(initial_state, is_window_focused);
let id = NEXT_ADAPTER_ID.fetch_add(1, Ordering::SeqCst);
let (messages, context) = {
Expand All @@ -222,11 +222,11 @@ impl AdapterImpl {
app_context.push_adapter(id, &context);
(messages, context)
};
Some(AdapterImpl {
AdapterImpl {
id,
messages,
context,
})
}
}

pub(crate) async fn register_tree(&self) {
Expand Down Expand Up @@ -428,7 +428,7 @@ impl Drop for AdapterImpl {
}
}

pub(crate) type LazyAdapter = Pin<Arc<Lazy<Option<AdapterImpl>, Boxed<Option<AdapterImpl>>>>>;
pub(crate) type LazyAdapter = Pin<Arc<Lazy<AdapterImpl, Boxed<AdapterImpl>>>>;

pub struct Adapter {
r#impl: LazyAdapter,
Expand Down Expand Up @@ -474,23 +474,23 @@ impl Adapter {
let mut bounds = self.root_window_bounds.lock().unwrap();
*bounds = new_bounds;
}
if let Some(Some(r#impl)) = Lazy::try_get(&self.r#impl) {
if let Some(r#impl) = Lazy::try_get(&self.r#impl) {
r#impl.set_root_window_bounds(new_bounds);
}
}

/// If and only if the tree has been initialized, call the provided function
/// and apply the resulting update.
pub fn update_if_active(&self, update_factory: impl FnOnce() -> TreeUpdate) {
if let Some(Some(r#impl)) = Lazy::try_get(&self.r#impl) {
if let Some(r#impl) = Lazy::try_get(&self.r#impl) {
r#impl.update(update_factory());
}
}

/// Update the tree state based on whether the window is focused.
pub fn update_window_focus_state(&self, is_focused: bool) {
self.is_window_focused.store(is_focused, Ordering::SeqCst);
if let Some(Some(r#impl)) = Lazy::try_get(&self.r#impl) {
if let Some(r#impl) = Lazy::try_get(&self.r#impl) {
r#impl.update_window_focus_state(is_focused);
}
}
Expand Down
59 changes: 43 additions & 16 deletions platforms/unix/src/atspi/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use atspi::{
Interface, InterfaceSet,
};
use serde::Serialize;
use std::{collections::HashMap, env::var, sync::Weak};
use std::{collections::HashMap, env::var, io, sync::Weak};
use zbus::{
names::{BusName, InterfaceName, MemberName, OwnedUniqueName},
zvariant::{Str, Value},
Expand Down Expand Up @@ -123,12 +123,15 @@ impl Bus {
Ok(())
}

async fn register_interface<T>(&self, path: &str, interface: T) -> Result<()>
async fn register_interface<T>(&self, path: &str, interface: T) -> Result<bool>
where
T: zbus::Interface,
{
self.conn.object_server().at(path, interface).await?;
Ok(())
map_or_ignoring_broken_pipe(
self.conn.object_server().at(path, interface).await,
false,
|result| result,
)
}

pub(crate) async fn unregister_interfaces(
Expand Down Expand Up @@ -160,12 +163,15 @@ impl Bus {
Ok(())
}

async fn unregister_interface<T>(&self, path: &str) -> Result<()>
async fn unregister_interface<T>(&self, path: &str) -> Result<bool>
where
T: zbus::Interface,
{
self.conn.object_server().remove::<T, _>(path).await?;
Ok(())
map_or_ignoring_broken_pipe(
self.conn.object_server().remove::<T, _>(path).await,
false,
|result| result,
)
}

pub(crate) async fn emit_object_event(
Expand Down Expand Up @@ -338,14 +344,35 @@ impl Bus {
signal_name: &str,
body: EventBody<'_, T>,
) -> Result<()> {
self.conn
.emit_signal(
Option::<BusName>::None,
target.path(),
InterfaceName::from_str_unchecked(interface),
MemberName::from_str_unchecked(signal_name),
&body,
)
.await
map_or_ignoring_broken_pipe(
self.conn
.emit_signal(
Option::<BusName>::None,
target.path(),
InterfaceName::from_str_unchecked(interface),
MemberName::from_str_unchecked(signal_name),
&body,
)
.await,
(),
|_| (),
)
}
}

pub(crate) fn map_or_ignoring_broken_pipe<T, U, F>(
result: zbus::Result<T>,
default: U,
f: F,
) -> zbus::Result<U>
where
F: FnOnce(T) -> U,
{
match result {
Ok(result) => Ok(f(result)),
Err(zbus::Error::InputOutput(error)) if error.kind() == io::ErrorKind::BrokenPipe => {
Ok(default)
}
Err(error) => Err(error),
}
}
2 changes: 1 addition & 1 deletion platforms/unix/src/atspi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ impl From<accesskit::Rect> for Rect {
}
}

pub(crate) use bus::Bus;
pub(crate) use bus::*;
pub(crate) use object_address::OwnedObjectAddress;
pub(crate) use object_id::ObjectId;
15 changes: 7 additions & 8 deletions platforms/unix/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use zbus::{Connection, Task};

use crate::{
adapter::{LazyAdapter, Message},
atspi::{interfaces::Event, Bus, OwnedObjectAddress},
atspi::{interfaces::Event, map_or_ignoring_broken_pipe, Bus, OwnedObjectAddress},
util::WindowBounds,
};

Expand Down Expand Up @@ -167,9 +167,10 @@ async fn listen(session_bus: Connection) -> zbus::Result<()> {
select! {
change = changes.next() => {
atspi_bus = if let Some(change) = change {
match change.get().await {
Ok(true) => Bus::new(&session_bus).await.ok(),
_ => None,
if change.get().await? {
map_or_ignoring_broken_pipe(Bus::new(&session_bus).await, None, Some)?
} else {
None
}
} else {
None
Expand All @@ -182,16 +183,14 @@ async fn listen(session_bus: Connection) -> zbus::Result<()> {
if let Some(activation_context) = ACTIVATION_CONTEXT.get() {
let activation_context = activation_context.lock().await;
for adapter in &activation_context.adapters {
if let Some(adapter) = &*adapter.as_ref().await {
adapter.register_tree().await;
}
adapter.as_ref().await.register_tree().await;
}
}
}
}
message = messages.next() => {
if let Some((message, atspi_bus)) = message.zip(atspi_bus.as_ref()) {
let _ = process_adapter_message(atspi_bus, message).await;
process_adapter_message(atspi_bus, message).await?;
}
}
complete => return Ok(()),
Expand Down

0 comments on commit e38a55e

Please sign in to comment.