Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when publishing from multiple threads. #354

Closed
blockbomb opened this issue Jan 24, 2019 · 11 comments
Closed

Deadlock when publishing from multiple threads. #354

blockbomb opened this issue Jan 24, 2019 · 11 comments

Comments

@blockbomb
Copy link

There seems to be an issue that is causing my codebase a deadlock that seem related to #168 and #235. when using version 1.4.0 of the project.

I have a multi-threaded application that performs the client.loop_forever(retry_first_connection=True) at the end of my main thread. This performs all the event handling for the MQTT layer. On the underside of that I have another thread that reads messages that come from a USB device.

When I receive an MQTT message my handler issues commands to the USB Device and await their successful completion before exiting the handler (one mqtt message may require multiple command/response pairs on the USB comm). There are other asynchronous messages that may come up from the USB device from time to time and I publish those on the broker when they arrive.

I use not returning from the callback as a way to throttle my consumption of the next message on the broker to alleviate my need for queuing messages in the application however in version (1.4.0) if I receive one of the asynchronous messages from the USB device it attempts to publish from another thread which ends up calling

client.publish
self._send_publish
self._packet_queue
self._call_socket_register_write

which of course waits on the self._callback_mutex that is already held by the first thread.

A few questions.

  1. in Use mutexes as context managers #168 it was mentioned that the changes would prohibit two callbacks being called at the same time is it undesirable to have two callbacks called at the same time? In my case I suppose I was relying on it being possible.

  2. when 1. was mentioned above the enhancements for external loop control were not yet added to the codebase so maybe we need to be more careful about when we issue the callback for socket register write.

  3. I am attempting to use the project in the wrong manner? in which case how would I stop the broker from publishing to my client while I wait for the USB device communications to come back with a response to the initial request while still being able to publish to the broker but without maintaining an
    incoming queue of my own in the application. I would assume that the MQ in MQTT should be handling that aspect without my need to.

3a) I suppose it is possible I could create another client that is associated with the USB device that wouldn't clash with the main client, but it was easier to reason with a unique shared client for the application.

any input would be greatly appreciated, Thank you for all of your hard work I really enjoy this project.

-BB

@mehdilauters
Copy link

mehdilauters commented Feb 22, 2019

We also have a deadlock using the mqtt client in 2 different threads with the same version (1.4.0)
As a quick (and ugly) workaround we did the following wrapper.

class MQTTClientWrapper(mqtt.Client):

    def __init__(self, name):
        mqtt.Client.__init__(self, name)

    def publish(self, topic, payload=None, qos=0, retain=False):
        with self._callback_mutex:
            mqtt.Client.publish(self, topic, payload, qos, retain)

@dw7086
Copy link

dw7086 commented Oct 18, 2019

To avoid the publish in the callback in multiple threads, it will cause the deadlock issue.

@corroleaus
Copy link

For me this error started to happen in v1.4.0 because of the addition of

self._call_socket_register_write()

at line 2545 in client.py in method _packet_queue. _call_socket_register_write() tries to aquire the self._callback_mutex lock, which it never did in 1.3.1 and subsequently if another thread is receiving a packet at the same time, trying to aquire that same mutex to pass data to a callback, stuff can break.

In my case pahos locks interferes with application level locks because of that the thread calling publish( and thus ends up calling _call_socket_register_write() ) waits for self._callback_mutex, that is held by another thread receiving a mqtt packet. That thread in turn waits for a lock outside of paho, that is held by the first thread that called publish.

Its entirely possible to redesign usage of paho to accomodate for this behaviour, but in my opinion it is kind of weird to add callback logic solely to integrate with external event loops, that gets called by default.

Hopefully this can help someone with a similar issue.

@corroleaus
Copy link

A simple fix for above mentioned issue would be to simply not aquire self._callback_mutex_lock in _call_socket_register_write if no callback is defined.

--- a/src/paho/mqtt/client.py
+++ b/src/paho/mqtt/client.py
@@ -2137,8 +2137,8 @@ class Client(object):
         if not self._sock or self._registered_write:
             return
         self._registered_write = True
-        with self._callback_mutex:
-            if self.on_socket_register_write:
+        if self.on_socket_register_write:
+            with self._callback_mutex:
                 try:
                     self.on_socket_register_write(
                         self, self._userdata, self._sock)

@mjcumming
Copy link

I have experience this problem as well. Created a separate thread with an event loop and scheduled publishing using loop.call_soon_threadsafe.

@riccardoch
Copy link

I had the same issue. I have multiple threads that can call client.publish.
I solved using a queue to send the messages; so I don't call client.publish, but I've created a utility method to add messages to my queue. In a separated thread I manage the queue to send the messages, in this way I'm sure I do the client.publish only from one thread.

@AndrewCarterUK
Copy link

We're having this issue. The library should make it clear that it is not suitable for thread safe use. At the moment it occupies the dangerous space of pretending to be thread safe.

@dbeinder
Copy link

dbeinder commented Jul 23, 2020

If you don't use the on_socket_register_write callbacks, this is a simple performance-neutral workaround:

#   NetLoop Thread:                                         User code in another thread
#   loop(): receives message
#   loop_read() => _packet_read()
#   _packet_handle() => handle_publish()
#   _handle_on_message(): holds _callback_mutex
#   USER CODE: long-running message handler,                User code tries to publish:
#              gets preempted by another thread             publish(): holds _out_message_mutex
#                                                           _send_publish()
#                                                           _packet_queue()
#                                                           _call_socket_register_write(): blocks on _callback_mutex
#   USER CODE: long-running message handler resumes 
#              and attempts to publish a response
#   publish(): blocks on _out_message_mutex
#              => deadlock

# avoid deadlock by nop-ing socket control callback stubs
import paho.mqtt.client as mqtt
mqtt.Client._call_socket_register_write = lambda _self: None
mqtt.Client._call_socket_unregister_write = lambda _self, _sock=None: None

Ultimately, the global _callback_mutex should either be removed or made more granular.
According to #168 the reason for holding _callback_mutex everytime a callback is set or executed, is to avoid this scenario:

  1. User in main thread sets on_subscribe to be a function
  2. Client acquires _callback_mutex, and self._on_subscribe is present
  3. Before the function is actually called, user sets on_subscribe to be None
  4. Client tries to call None, raises an exception

If the point is simply to avoid the exception, just take a copy of the callback before making the is None check.
What the current system actually does, is turn the operation of setting a callback into:
Block until existing callback is finished, then set callback to new function
I'd argue that a potential multi-second block is not what the average developer expects in a simple property setter. Even worse, it waits for any running callback, not just the one being set.
If there is a need to set a callback in such a way that guarantees, that after returning from the setter, the previous callback has finished executing - I think it would be better to expose a locking mechanism, so a developer can implement this behavior only when needed.

@ralight
Copy link
Contributor

ralight commented Jul 22, 2021

The 1.6.x branch has a lot of improvement in this area if you'd like to give it a try.

@jimfunk
Copy link

jimfunk commented Jun 13, 2022

I had the same issue and it went away after upgrading to 1.6.1.

@MattBrittan
Copy link
Contributor

I'm going to close this due to it's age and the fact that the v1.6 improved things; if anyone is encountering similar issues I'd suggest raising a new issue (trying to get the issue count down to a manageable level!).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests