Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for external event loops. #235

Merged
merged 1 commit into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,46 @@ and ``MQTT_LOG_DEBUG``. The message itself is in ``buf``.
This may be used at the same time as the standard Python logging, which can be
enabled via the ``enable_logger`` method.

on_socket_open()
''''''''''''''''

::

on_socket_open(client, userdata, sock)

Called when the socket has been opened.
Use this to register the socket with an external event loop for reading.

on_socket_close()
'''''''''''''''''

::

on_socket_close(client, userdata, sock)

Called when the socket is about to be closed.
Use this to unregister a socket from an external event loop for reading.

on_socket_register_write()
''''''''''''''''''''''''''

::

on_socket_register_write(client, userdata, sock)

Called when a write operation to the socket failed because it would have blocked, e.g. output buffer full.
Use this to register the socket with an external event loop for writing.

on_socket_unregister_write()
''''''''''''''''''''''''''''

::

on_socket_unregister_write(client, userdata, sock)

Called when a write operation to the socket succeeded after it had previously failed.
Use this to unregister the socket from an external event loop for writing.

External event loop support
```````````````````````````

Expand Down Expand Up @@ -995,6 +1035,9 @@ socket()

Returns the socket object in use in the client to allow interfacing with other
event loops.
This call is particularly useful for select_ based loops. See ``examples/loop_select.py``.

.. _select: https://docs.python.org/3/library/select.html#select.select

want_write()
''''''''''''
Expand All @@ -1005,6 +1048,46 @@ want_write()

Returns true if there is data waiting to be written, to allow interfacing the
client with other event loops.
This call is particularly useful for select_ based loops. See ``examples/loop_select.py``.

.. _select: https://docs.python.org/3/library/select.html#select.select

state callbacks
'''''''''''''''

::

on_socket_open
on_socket_close
on_socket_register_write
on_socket_unregister_write

Use these callbacks to get notified about state changes in the socket.
This is particularly useful for event loops where you register or unregister a socket
for reading+writing. See ``examples/loop_asyncio.py`` for an example.

When the socket is opened, ``on_socket_open`` is called.
Register the socket with your event loop for reading.

When the socket is about to be closed, ``on_socket_close`` is called.
Unregister the socket from your event loop for reading.

When a write to the socket failed because it would have blocked, e.g. output buffer full,
``on_socket_register_write`` is called.
Register the socket with your event loop for writing.

When the next write to the socket succeeded, ``on_socket_unregister_write`` is called.
Unregister the socket from your event loop for writing.

The callbacks are always called in this order:

- ``on_socket_open``
- Zero or more times:

- ``on_socket_register_write``
- ``on_socket_unregister_write``

- ``on_socket_close``

Global helper functions
```````````````````````
Expand Down
108 changes: 108 additions & 0 deletions examples/loop_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3

import socket
import uuid
import paho.mqtt.client as mqtt
import asyncio

client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
topic = client_id
print("Using client_id / topic: " + client_id)


class AsyncioHelper:
def __init__(self, loop, client):
self.loop = loop
self.client = client
self.client.on_socket_open = self.on_socket_open
self.client.on_socket_close = self.on_socket_close
self.client.on_socket_register_write = self.on_socket_register_write
self.client.on_socket_unregister_write = self.on_socket_unregister_write

def on_socket_open(self, client, userdata, sock):
print("Socket opened")

def cb():
print("Socket is readable, calling loop_read")
client.loop_read()

self.loop.add_reader(sock, cb)
self.misc = self.loop.create_task(self.misc_loop())

def on_socket_close(self, client, userdata, sock):
print("Socket closed")
self.loop.remove_reader(sock)
self.misc.cancel()

def on_socket_register_write(self, client, userdata, sock):
print("Watching socket for writability.")

def cb():
print("Socket is writable, calling loop_write")
client.loop_write()

self.loop.add_writer(sock, cb)

def on_socket_unregister_write(self, client, userdata, sock):
print("Stop watching socket for writability.")
self.loop.remove_writer(sock)

async def misc_loop(self):
print("misc_loop started")
while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
break
print("misc_loop finished")


class AsyncMqttExample:
def __init__(self, loop):
self.loop = loop

def on_connect(self, client, userdata, flags, rc):
print("Subscribing")
client.subscribe(topic)

def on_message(self, client, userdata, msg):
if not self.got_message:
print("Got unexpected message: {}".format(msg.decode()))
else:
self.got_message.set_result(msg.payload)

def on_disconnect(self, client, userdata, rc):
self.disconnected.set_result(rc)

async def main(self):
self.disconnected = self.loop.create_future()
self.got_message = None

self.client = mqtt.Client(client_id=client_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect

aioh = AsyncioHelper(self.loop, self.client)

self.client.connect('iot.eclipse.org', 1883, 60)
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

for c in range(3):
await asyncio.sleep(5)
print("Publishing")
self.got_message = self.loop.create_future()
self.client.publish(topic, b'Hello' * 40000, qos=1)
msg = await self.got_message
print("Got response with {} bytes".format(len(msg)))
self.got_message = None

self.client.disconnect()
print("Disconnected: {}".format(await self.disconnected))


print("Starting")
loop = asyncio.get_event_loop()
loop.run_until_complete(AsyncMqttExample(loop).main())
loop.close()
print("Finished")
89 changes: 89 additions & 0 deletions examples/loop_select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python3

import socket
import uuid
import paho.mqtt.client as mqtt
from select import select
from time import time

client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
topic = client_id
print("Using client_id / topic: " + client_id)


class SelectMqttExample:
def __init__(self):
pass

def on_connect(self, client, userdata, flags, rc):
print("Subscribing")
client.subscribe(topic)

def on_message(self, client, userdata, msg):
if self.state not in {1, 3, 5}:
print("Got unexpected message: {}".format(msg.decode()))
return

print("Got message with len {}".format(len(msg.payload)))
self.state += 1
self.t = time()

def on_disconnect(self, client, userdata, rc):
self.disconnected = True, rc

def do_select(self):
sock = self.client.socket()
if not sock:
raise Exception("Socket is gone")

print("Selecting for reading" + (" and writing" if self.client.want_write() else ""))
r, w, e = select(
[sock],
[sock] if self.client.want_write() else [],
[],
1
)

if sock in r:
print("Socket is readable, calling loop_read")
self.client.loop_read()

if sock in w:
print("Socket is writable, calling loop_write")
self.client.loop_write()

self.client.loop_misc()

def main(self):
self.disconnected = (False, None)
self.t = time()
self.state = 0

self.client = mqtt.Client(client_id=client_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect

self.client.connect('iot.eclipse.org', 1883, 60)
print("Socket opened")
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)

while not self.disconnected[0]:
self.do_select()

if self.state in {0, 2, 4}:
if time() - self.t >= 5:
print("Publishing")
self.client.publish(topic, b'Hello' * 40000)
self.state += 1

if self.state == 6:
self.state += 1
self.client.disconnect()

print("Disconnected: {}".format(self.disconnected[1]))


print("Starting")
SelectMqttExample().main()
print("Finished")
Loading