Skip to content

Commit

Permalink
Improve support for external event loops.
Browse files Browse the repository at this point in the history
Add four more callbacks and documentation to make it possible to use external
event loops other than select(). This could e.g. be asyncio, twisted or gevent.

  * on_socket_open - called when a new socket was opened. --> watch read
  * on_socket_close - called when the socket is about to be closed. --> unwatch read
  * on_socket_register_write - called when mqtt wants to write. --> watch write
  * on_socket_unregister_write - called when mqtt is finished writing. --> unwatch write

Add examples how to write external event loops:

  * loop_select.py - select() based event loop, works without the other changes.
  * loop_asyncio.py - asyncio based event loop, utilizes the new callbacks.

This should also fix the issues eclipse#72 and eclipse#147.

Signed-off-by: Jörn Heissler <eclipse.org@wulf.eu.org>
  • Loading branch information
Jörn Heissler committed Nov 2, 2017
1 parent 129ab07 commit 68c165f
Show file tree
Hide file tree
Showing 4 changed files with 499 additions and 46 deletions.
83 changes: 83 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,46 @@ and ``MQTT_LOG_DEBUG``. The message itself is in ``buf``.
This may be used at the same time as the standard Python logging, which can be
enabled via the ``enable_logger`` method.

on_socket_open()
''''''''''''''''

::

on_socket_open(client, userdata, sock)

Called when the socket has been opened.
Use this to register the socket with an external event loop for reading.

on_socket_close()
'''''''''''''''''

::

on_socket_close(client, userdata, sock)

Called when the socket is about to be closed.
Use this to unregister a socket from an external event loop for reading.

on_socket_register_write()
''''''''''''''''''''''''''

::

on_socket_register_write(client, userdata, sock)

Called when a write operation to the socket failed because it would have blocked, e.g. output buffer full.
Use this to register the socket with an external event loop for writing.

on_socket_unregister_write()
''''''''''''''''''''''''''''

::

on_socket_unregister_write(client, userdata, sock)

Called when a write operation to the socket succeeded after it had previously failed.
Use this to unregister the socket from an external event loop for writing.

External event loop support
```````````````````````````

Expand Down Expand Up @@ -995,6 +1035,9 @@ socket()

Returns the socket object in use in the client to allow interfacing with other
event loops.
This call is particularly useful for select_ based loops. See ``examples/loop_select.py``.

.. _select: https://docs.python.org/3/library/select.html#select.select

want_write()
''''''''''''
Expand All @@ -1005,6 +1048,46 @@ want_write()

Returns true if there is data waiting to be written, to allow interfacing the
client with other event loops.
This call is particularly useful for select_ based loops. See ``examples/loop_select.py``.

.. _select: https://docs.python.org/3/library/select.html#select.select

callbacks
'''''''''

::

on_socket_open
on_socket_close
on_socket_register_write
on_socket_unregister_write

Use these callbacks to get notified about state changes in the socket.
This is particularly useful for event loops where you register or unregister a socket
for reading+writing. See ``examples/loop_asyncio.py`` for an example.

When the socket is opened, ``on_socket_open`` is called.
Register the socket with your event loop for reading.

When the socket is about to be closed, ``on_socket_close`` is called.
Unregister the socket from your event loop for reading.

When a write to the socket failed because it would have blocked, e.g. output buffer full,
``on_socket_register_write`` is called.
Register the socket with your event loop for writing.

When the next write to the socket succeeded, ``on_socket_unregister_write`` is called.
Unregister the socket from your event loop for writing.

The callbacks are always called in this order:

- ``on_socket_open``
- Zero or more times:

- ``on_socket_register_write``
- ``on_socket_unregister_write``

- ``on_socket_close``

Global helper functions
```````````````````````
Expand Down
108 changes: 108 additions & 0 deletions examples/loop_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3

import socket
import uuid
import paho.mqtt.client as mqtt
import asyncio

client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
topic = client_id
print("Using client_id / topic: " + client_id)


class AsyncioHelper:
def __init__(self, loop, client):
self.loop = loop
self.client = client
self.client.on_socket_open = self.on_socket_open
self.client.on_socket_close = self.on_socket_close
self.client.on_socket_register_write = self.on_socket_register_write
self.client.on_socket_unregister_write = self.on_socket_unregister_write

def on_socket_open(self, client, userdata, sock):
print("Socket opened")

def cb():
print("Socket is readable, calling loop_read")
client.loop_read()

self.loop.add_reader(sock, cb)
self.misc = self.loop.create_task(self.misc_loop())

def on_socket_close(self, client, userdata, sock):
print("Socket closed")
self.loop.remove_reader(sock)
self.misc.cancel()

def on_socket_register_write(self, client, userdata, sock):
print("Watching socket for writability.")

def cb():
print("Socket is writable, calling loop_write")
client.loop_write()

self.loop.add_writer(sock, cb)

def on_socket_unregister_write(self, client, userdata, sock):
print("Stop watching socket for writability.")
self.loop.remove_writer(sock)

async def misc_loop(self):
print("misc_loop started")
while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
break
print("misc_loop finished")


class AsyncMqttExample:
def __init__(self, loop):
self.loop = loop

def on_connect(self, client, userdata, flags, rc):
print("Subscribing")
client.subscribe(topic)

def on_message(self, client, userdata, msg):
if not self.got_message:
print("Got unexpected message: {}".format(msg.decode()))
else:
self.got_message.set_result(msg.payload)

def on_disconnect(self, client, userdata, rc):
self.disconnected.set_result(rc)

async def main(self):
self.disconnected = self.loop.create_future()
self.got_message = None

self.client = mqtt.Client(client_id=client_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect

aioh = AsyncioHelper(self.loop, self.client)

self.client.connect('iot.eclipse.org', 1883, 60)
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

for c in range(3):
await asyncio.sleep(5)
print("Publishing")
self.got_message = self.loop.create_future()
self.client.publish(topic, b'Hello' * 40000, qos=1)
msg = await self.got_message
print("Got response with {} bytes".format(len(msg)))
self.got_message = None

self.client.disconnect()
print("Disconnected: {}".format(await self.disconnected))


print("Starting")
loop = asyncio.get_event_loop()
loop.run_until_complete(AsyncMqttExample(loop).main())
loop.close()
print("Finished")
89 changes: 89 additions & 0 deletions examples/loop_select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python3

import socket
import uuid
import paho.mqtt.client as mqtt
from select import select
from time import time

client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
topic = client_id
print("Using client_id / topic: " + client_id)


class SelectMqttExample:
def __init__(self):
pass

def on_connect(self, client, userdata, flags, rc):
print("Subscribing")
client.subscribe(topic)

def on_message(self, client, userdata, msg):
if self.state not in {1, 3, 5}:
print("Got unexpected message: {}".format(msg.decode()))
return

print("Got message with len {}".format(len(msg.payload)))
self.state += 1
self.t = time()

def on_disconnect(self, client, userdata, rc):
self.disconnected = True, rc

def do_select(self):
sock = self.client.socket()
if not sock:
raise Exception("Socket is gone")

print("Selecting for reading" + (" and writing" if self.client.want_write() else ""))
r, w, e = select(
[sock],
[sock] if self.client.want_write() else [],
[],
1
)

if sock in r:
print("Socket is readable, calling loop_read")
self.client.loop_read()

if sock in w:
print("Socket is writable, calling loop_write")
self.client.loop_write()

self.client.loop_misc()

def main(self):
self.disconnected = (False, None)
self.t = time()
self.state = 0

self.client = mqtt.Client(client_id=client_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect

self.client.connect('iot.eclipse.org', 1883, 60)
print("Socket opened")
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

while not self.disconnected[0]:
self.do_select()

if self.state in {0, 2, 4}:
if time() - self.t >= 5:
print("Publishing")
self.client.publish(topic, b'Hello' * 40000)
self.state += 1

if self.state == 6:
self.state += 1
self.client.disconnect()

print("Disconnected: {}".format(self.disconnected[1]))


print("Starting")
SelectMqttExample().main()
print("Finished")
Loading

0 comments on commit 68c165f

Please sign in to comment.