Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add External Event Loop Example #72

Closed
zpfvo opened this issue Jun 8, 2016 · 9 comments
Closed

Add External Event Loop Example #72

zpfvo opened this issue Jun 8, 2016 · 9 comments
Labels
Status: Available No one has claimed responsibility for resolving this issue. Type: Enhancement A new feature for a minor or major release.

Comments

@zpfvo
Copy link

zpfvo commented Jun 8, 2016

Hi,
i'd like to use this library together with the twisted reactor. Since i'm new to both paho.mqtt and twisted its hard for me to figure this one out. It would be nice to have an example of the external event loop usage (mustn't be twisted).

@ralight
Copy link
Contributor

ralight commented Jun 9, 2016

That sounds like a reasonable request, is there anybody out there that has an example they could share?

@ralight ralight added the Type: Enhancement A new feature for a minor or major release. label Jun 9, 2016
@zpfvo
Copy link
Author

zpfvo commented Jun 9, 2016

I got it running, but i don't think its very nice. I don't use Deferreds and I don't use select on the socket.
(not a complete example)

from twisted.internet import reactor, task 

def is_connected(mqttc):
    if mqttc.socket() is None:
        logging.error("Connection to mqtt broker lost. Stopping")
        reactor.stop()

def want_write(mqttc):
    if mqttc.want_write():
        mqttc.loop_write()

# MQTT STUFF
mqttc = mqtt.Client(transport="websockets")
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
# Uncomment to enable debug messages
# mqttc.on_log = on_log
mqttc.connect("localhost", 1883, 60)
mqttc.subscribe("events/#", 0)
mqttc.message_callback_add('events/comp', on_component_event_message)

# mqtt event loop functions
task.LoopingCall(mqttc.loop_read).start(0.1)
task.LoopingCall(want_write, mqttc).start(0.1)
task.LoopingCall(mqttc.loop_misc).start(5)
task.LoopingCall(is_connected, mqttc).start(1)

reactor.run()

@oxbambooxo
Copy link

oxbambooxo commented Nov 28, 2016

i'am adaptive it for gevent
(not a complete example)

import paho.mqtt.client as mqtt
import gevent

def handle_write(mqttc):
    mqttc._sockpairR.recv(1)
    status = mqttc.loop_write()
    if status != mqtt.MQTT_ERR_SUCCESS:
        print('error', mqtt.error_string(status))
        mqttc._state = mqtt.mqtt_cs_disconnecting

def handle_read(mqttc):
    status = mqttc.loop_read()
    if status != mqtt.MQTT_ERR_SUCCESS:
        print('error', mqtt.error_string(status))
        mqttc._state = mqtt.mqtt_cs_disconnecting

def loop_forever(mqttc, idle=3):
    while mqttc._state != mqtt.mqtt_cs_disconnecting:
        status = mqttc.loop_misc()
        if status == mqtt.MQTT_ERR_SUCCESS:
            gevent.sleep(idle)
        else:
            print('error', mqtt.error_string(status))
            mqttc._state = mqtt.mqtt_cs_disconnecting
            break

# MQTT STUFF(modify it for your case)
mqttc = mqtt.Client(transport="websockets")
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.connect("localhost", 1883, 60)
mqttc.subscribe("events/#", 0)
mqttc.message_callback_add('events/comp', on_component_event_message)
# ...

# mqtt event loop functions
loop = gevent.get_hub().loop
watcher_r = loop.io(mqttc.socket().fileno(), gevent.core.READ)
watcher_r.start(handle_read, mqttc)
watcher_w = loop.io(mqttc._sockpairR.fileno(), gevent.core.READ)
watcher_w.start(handle_write, mqttc)

# loop forever
# or gevent.spawn(loop_forever, mqttc)
loop_forever(mqttc)

@joernheissler
Copy link

Both examples are flawed, according to https://dev.eclipse.org/mhonarc/lists/paho-dev/msg03998.html

The first one will is a busy loop with a sleep. It wastes CPU time if there's nothing to do, and if there is, introduces an artificial 0.1 sec delay.
The second one uses the internal socketpair workaround. And I bet if you publish a huge message, you'll get stuck.

From the looks of it, I think the "external event loop" support is only meant for select. Here's an example:

#!/usr/bin/env python3

import paho.mqtt.client as mqtt
import socket
from select import select
from time import time

topic = 'paho-mqtt-python/issue72'

def on_connect(client, userdata, flags, rc):
    client.subscribe(topic)

def on_message(client, userdata, msg):
    print("Got message with len {}".format(len(msg.payload)))

client = mqtt.Client(client_id='paho-mqtt-python/issue72')
client.on_connect = on_connect
client.on_message = on_message

client.connect('test.mosquitto.org', 1883, 60)
client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

c = 0

while True:
    sock = client.socket()
    if not sock:
        break;

    print("{:.03f} Selecting".format(time()))
    r, w, e = select(
        [sock],
        [sock] if client.want_write() else [],
        [],
        10
    )

    idle = True

    if sock in r:
        print("{:.03f}           Reading".format(time()))
        client.loop_read()
        idle = False

    if sock in w:
        print("{:.03f}                   Writing".format(time()))
        client.loop_write()
        idle = False

    if idle:
        if c == 3:
            break
        print("")
        print("{:.03f}                           Publishing".format(time()))
        client.publish(topic, b'Hello' * 40000)
        c += 1

    client.loop_misc()

client.disconnect()

To support other high level event loops (twisted, gevent, asyncio, ...) https://dev.eclipse.org/mhonarc/lists/paho-dev/msg03999.html suggests to introduce another callback. I came to the same conclusion.
And there should also be a callback when a socket was created. And the disconnect callback should always be called right before the socket will be closed, not afterwards.
And then socket.create_connection is used, which is a blocking call.
I'll try to improve the code when I find some time.

joernheissler pushed a commit to joernheissler/paho.mqtt.python that referenced this issue Oct 8, 2017
Add four more callbacks and documentation to make it possible to use external
event loops other than select(). This could e.g. be asyncio, twisted or gevent.

  * on_socket_open - called when a new socket was opened. --> watch read
  * on_socket_close - called when the socket is about to be closed. --> unwatch read
  * on_socket_register_write - called when mqtt wants to write. --> watch write
  * on_socket_unregister_write - called when mqtt is finished writing. --> unwatch write

Add examples how to write external event loops:

  * loop_select.py - select() based event loop, works without the other changes.
  * loop_asyncio.py - asyncio based event loop, utilizes the new callbacks.

This should also fix the issues eclipse#72 and eclipse#147.

Signed-off-by: Jörn Heissler <eclipse.org@wulf.eu.org>
joernheissler pushed a commit to joernheissler/paho.mqtt.python that referenced this issue Nov 2, 2017
Add four more callbacks and documentation to make it possible to use external
event loops other than select(). This could e.g. be asyncio, twisted or gevent.

  * on_socket_open - called when a new socket was opened. --> watch read
  * on_socket_close - called when the socket is about to be closed. --> unwatch read
  * on_socket_register_write - called when mqtt wants to write. --> watch write
  * on_socket_unregister_write - called when mqtt is finished writing. --> unwatch write

Add examples how to write external event loops:

  * loop_select.py - select() based event loop, works without the other changes.
  * loop_asyncio.py - asyncio based event loop, utilizes the new callbacks.

This should also fix the issues eclipse#72 and eclipse#147.

Signed-off-by: Jörn Heissler <eclipse.org@wulf.eu.org>
joernheissler pushed a commit to joernheissler/paho.mqtt.python that referenced this issue Nov 2, 2017
Add four more callbacks and documentation to make it possible to use external
event loops other than select(). This could e.g. be asyncio, twisted or gevent.

  * on_socket_open - called when a new socket was opened. --> watch read
  * on_socket_close - called when the socket is about to be closed. --> unwatch read
  * on_socket_register_write - called when mqtt wants to write. --> watch write
  * on_socket_unregister_write - called when mqtt is finished writing. --> unwatch write

Add examples how to write external event loops:

  * loop_select.py - select() based event loop, works without the other changes.
  * loop_asyncio.py - asyncio based event loop, utilizes the new callbacks.

This should also fix the issues eclipse#72 and eclipse#147.

Signed-off-by: Jörn Heissler <eclipse.org@wulf.eu.org>
@PierreF
Copy link
Contributor

PierreF commented Aug 4, 2018

I think this is fixed in develop branch with the example/loop_asyncio.py. If not, feel free to reopen this issue.

@bkanuka
Copy link
Contributor

bkanuka commented Dec 6, 2019

FYI: I've created a gist that shows how to use trio as the external event loop instead of built-in or asyncio. I will open a PR to merge this into the examples.

@frederikaalund
Copy link
Contributor

frederikaalund commented Apr 6, 2020

I've created asyncio-mqtt based on the mechanism described here. asyncio-mqtt attempts to wrap paho-mqtt in an idiomatic asyncio-based interface.

@jogehl
Copy link

jogehl commented Mar 31, 2021

I use a external loop on windows with select. The problem is that after executing client.loop_read() the socket closed and returns a -1 as file descriptor. This behaviour can be only seen on some linux distributions.

@MattBrittan MattBrittan reopened this Dec 23, 2023
@github-actions github-actions bot added the Status: Available No one has claimed responsibility for resolving this issue. label Dec 23, 2023
@chintal
Copy link

chintal commented Apr 19, 2024

I've had to get paho.mqtt to coexist (not necessarily fully integrate) with an existing twisted reactor. I found that none of the examples really showed what I wanted to do. I've put together a small example of how to wrap paho,mqtt as a twisted service, available in this gist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Available No one has claimed responsibility for resolving this issue. Type: Enhancement A new feature for a minor or major release.
Projects
None yet
Development

No branches or pull requests

10 participants