Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: proxy::call appears to deadlock #350

Closed
OliverNChalk opened this issue May 18, 2023 · 9 comments
Closed

bug: proxy::call appears to deadlock #350

OliverNChalk opened this issue May 18, 2023 · 9 comments

Comments

@OliverNChalk
Copy link
Contributor

Looking for guidance on how to best create a minimum repro or provide useful info here. I have an application that is using systemd-zbus which should just be a zbus proxy with nicer types. I am running zbus v3.12 and I periodically get deadlocks where the application will wait on ManagerProxy::list_units forever.

Looking into the generated code, list_units expands to this:

        /// ListUnits method
        pub async fn list_units(&self) -> zbus::Result<Vec<Unit>> {
            let reply = self.0.call("ListUnits", &()).await?;
            ::std::result::Result::Ok(reply)
        }

It dispatches a call and then awaits a reply. Is there any possibility the reply is never getting delivered because a different signal is in the pipe? For reference I subscribe to unit & job status changes, so it seems feasible those events are occurring at the same time im trying to push a call_method through the pipe. I don't have a mental model of the zbus queuing mechanism/async runtime.

Let me know if the best step forward is for me to pull out my systemd interaction code into a minimal repro, or if this is enough info to give you some ideas.

@zeenix
Copy link
Contributor

zeenix commented May 18, 2023

Sounds like an issue with the rest of your runtime/asnyc code but of course it can always be a regression. A minimal reproducer would indeed be very helpful in identifying the issue, whether or not, it's in zbus. 🙏

@zeenix
Copy link
Contributor

zeenix commented May 18, 2023

For the record we've a completely separate queue for handling incoming signals and one for receiving replies (i-e MessageStream).

@OliverNChalk
Copy link
Contributor Author

Sounds like an issue with the rest of your runtime/asnyc code but of course it can always be a regression. A minimal reproducer would indeed be very helpful in identifying the issue, whether or not, it's in zbus. pray

Ahh, I hadn't considered that. I am using tokio and this is the function that never returns:

    async fn reload_unit_state(&mut self) -> Result<(), zbus::Error> {
        let units = self.manager.list_units().await?;

        self.units = units
            .into_iter()
            .map(|unit| (unit.name.clone().into(), unit))
            .collect();

        Ok(())
    }

Are you saying if I have another task/call in parallel that blocks I could lockup the whole tokio runtime? I'm assuming there's no way for me to prevent the return of this call outside of completely locking up the tokio executor right?

@OliverNChalk
Copy link
Contributor Author

I've reviewed my code and I cannot see how it is possible that my application code is somehow sabotaging this async call. I will provide some additional context:

I have confirmed via timeout that the tokio executor is not blocked, the timeout executes fine indicating my event loop is turning over correctly:

// This timeout will trigger in the scenario where `list_units` fails to return (presumed deadlocked).
tokio::time::timeout(RELOAD_STATE_TIMEOUT, self.reload_unit_state()).await??;

Of interest is I take out multiple signal streams, re-reading the docs I am reminded of this statement:

NOTE: You must ensure a MessageStream is continuously polled or you will experience hangs. If you don’t need to continuously poll the MessageStream but need to keep it around for later use, keep the connection around and convert it into a MessageStream when needed. The conversion is not an expensive operation so you don’t need to worry about performance, unless you do it very frequently. If you need to convert back and forth frequently, you may want to consider keeping both a connection and stream around.

Is it the case that because I block waiting for the list_units call I have somehow produced the hang & I must service the stream while I am making the method call?

If it's relevant this is my core run loop (not expecting you to read/debug my code, only read this if it helps):

Run Loop loop { tokio::select! { // SOCKETS Some(update) = self.nexus_updates_rx.recv() => { debug!("Received update from nexus");
                    match update {
                        NexusUpdate::DesiredState(state) => {
                            if self.units_dirty {
                                continue;
                            }

                            self.delta_close_state(state).await?;
                        }
                        NexusUpdate::RecheckUnitFiles(hosts) => {
                            let host_glob = hosts.compile_matcher();
                            if host_glob.is_match(self.host.deref()) {
                                info!(
                                    host = %self.host,
                                    "RecheckUnitFiles requested, reloading unit file state",
                                );

                                self.reload_unit_files_state().await?;
                            }
                        }
                    }

                    // Units will be reloaded by heartbeat interval.
                }

                // SYSTEMD UPDATES
                opt = self.unit_files_changed.next() => {
                    assert!(opt.is_some(), "UnitFilesChanged stream dropped unexpectedly");

                    info!("Reloading unit files");

                    self.reload_unit_files_state().await?;
                }
                opt = self.new_units.next() => {
                    assert!(opt.is_some(), "UnitNew stream dropped unexpectedly");

                    debug!("UnitNew signal received");

                    self.units_dirty = true;
                }
                opt = self.removed_units.next() => {
                    assert!(opt.is_some(), "UnitRemoved stream dropped unexpectedly");

                    debug!("UnitRemoved signal received");

                    self.units_dirty = true;
                }
                opt = self.new_jobs.next() => {
                    assert!(opt.is_some(), "JobNew stream dropped unexpectedly");

                    debug!("JobNew signal received");

                    self.units_dirty = true;
                }
                opt = self.removed_jobs.next() => {
                    assert!(opt.is_some(), "JobRemoved stream dropped unexpectedly");

                    debug!("JobRemoved signal received");

                    self.units_dirty = true;
                }

                // TIMERS
                _ = self.interval.tick() => {
                    if self.units_dirty {
                        // NB: This call appears to have the potential to
                        // deadlock in the Zbus library. For now we error out
                        // and restart the app in these cases.
                        debug!("Reloading unit state");
                        tokio::time::timeout(
                            RELOAD_STATE_TIMEOUT,
                            self.reload_unit_state(),
                        ).await??;
                        self.units_dirty = false;
                        debug!("Unit state reloaded");
                    }

                    let active_state = self.active_state();
                    debug!("Publishing status heartbeat");

                    self.outpost_updates_tx.try_send(active_state)?;
                }

                // CHILD THREADS
                res = &mut bridge => {
                    let res = res?.unwrap();
                    error!(?res, "ZeroMQ bridge thread exited");

                    return res;
                }
                res = &mut sockets => {
                    let res = res?.unwrap();
                    error!(?res, "ZeroMQ sockets thread exited");

                    return res;
                }

                // CONTROL
                _ = self.cxl.cancelled() => {
                    info!("Outpost server shutdown requested");
                    break;
                }
            }
        }

@zeenix
Copy link
Contributor

zeenix commented May 19, 2023

Is it the case that because I block waiting for the list_units call I have somehow produced the hang & I must service the stream while I am making the method call?

Yes, that's much possible (and likely causing your issue) if there are many messages coming in. I'd suggest re-organising your code to make use of separate tasks and communicate between them using channels, instead of one big select statement.

Or you could just launch the specific calls in question in a throw-away task. If you need to know the results of the calls in the loop, you can use channels and then read from the channel in one of the arms of your select. From the task you can send any errors back to the main task.

Since most likely your hang is caused by a well-documented limitation with solugions/workarounds, I'll close this now. Feel free to re-open this if you've a reason to believe that's not the case.

@zeenix zeenix closed this as completed May 19, 2023
@zeenix
Copy link
Contributor

zeenix commented May 19, 2023

Actually, thinking more about this, I think with the recent separation of queues/channels, it should be possible to remove this limitation of MessageStream.

For whoever will solve this (mostly likely me): Each MessageStream should run its own task to read messages sent by the socket reader task (thus ensuring that broadcast channel is always getting polled) and use unicast channel (size of 1 is sufficient) to send messages to the Stream impl.

@zeenix zeenix reopened this May 19, 2023
@zeenix
Copy link
Contributor

zeenix commented May 20, 2023

For whoever will solve this (mostly likely me): Each MessageStream should run its own task to read messages sent by the socket reader task (thus ensuring that broadcast channel is always getting polled) and use unicast channel (size of 1 is sufficient) to send messages to the Stream impl.

Very sorry to get your hopes up but upon thinking more about this soution, I realized that this solution simply makes the queue bigger and adds an indirection. More importantly, it will not completely eliminate the possibility of deadlocks.

MessageStream already provides a way to set the queue size in for_match_rule but most folks will only be using the signal streams, which does not provide such an API. So what we could do is to provide:

  1. method to set the queue size of a MessageStream after stream creation.
  2. similar method for SignalStream as well.

This won't solve the underlying issue completely either though, only help you make it less unlikely to happen. I'm afraid the real solution has to be in the client code and making use of tasks, select and join APIs to avoid getting into such situation.

@OliverNChalk
Copy link
Contributor Author

OliverNChalk commented May 20, 2023

For those coming across this issue in the future, here is a simple/common way I deal with keeping the event loop unblocked:

type ReloadUnitStateFut =
    Pin<Box<dyn Future<Output = zbus::Result<Vec<Unit>>> + Send + Sync + 'static>>;

struct MyStruct {
    manager: ManagerProxy<'static>,
    reload_unit_files: FuturesUnordered<ReloadUnitStateFut>,
}


// Clone manager and move it into future to queue async.
let manager = self.manager.clone();
let work = Box::pin(async move { manager.list_units().await });

// Push the workload onto FuturesUnordered so we don't block the event loop.
// We need to continue processing signals, lest we risk a deadlock with
// zbus.
self.reload_unit_files.push(work as ReloadUnitStateFut);

The proxy being clonable (presumably through an Arc) makes this fairly easy to do.

Happy for this issue to be closed out, I think it's already documented - it's just quite surprising if you're in a rush ;P

zeenix added a commit to zeenix/zbus that referenced this issue May 20, 2023
Add getter and setter for message queue capacity.

Related: dbus2#350.
@zeenix
Copy link
Contributor

zeenix commented May 20, 2023

The proxy being clonable (presumably through an Arc) makes this fairly easy to do.

That assumption is correct:

#[derive(Clone, Debug)]
pub struct Proxy<'a> {
    pub(crate) inner: Arc<ProxyInner<'a>>,
}

And this also means that the cloning is very cheap.

@zeenix zeenix closed this as completed May 20, 2023
Taha-Firoz pushed a commit to Taha-Firoz/zbus that referenced this issue Jun 9, 2023
Add getter and setter for message queue capacity.

Related: dbus2#350.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants