forked from eclipse/paho.mqtt.python
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Improve support for external event loops.
Add four more callbacks and documentation to make it possible to use external event loops other than select(). This could e.g. be asyncio, twisted or gevent. * on_socket_open - called when a new socket was opened. --> watch read * on_socket_close - called when the socket is about to be closed. --> unwatch read * on_socket_register_write - called when mqtt wants to write. --> watch write * on_socket_unregister_write - called when mqtt is finished writing. --> unwatch write Add examples how to write external event loops: * loop_select.py - select() based event loop, works without the other changes. * loop_asyncio.py - asyncio based event loop, utilizes the new callbacks. This should also fix the issues eclipse#72 and eclipse#147. Signed-off-by: Jörn Heissler <eclipse.org@wulf.eu.org>
- Loading branch information
Jörn Heissler
committed
Oct 8, 2017
1 parent
129ab07
commit dfe1fe0
Showing
4 changed files
with
494 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import socket | ||
import uuid | ||
import paho.mqtt.client as mqtt | ||
import asyncio | ||
|
||
client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4()) | ||
topic = client_id | ||
print("Using client_id / topic: " + client_id) | ||
|
||
|
||
class AsyncioHelper: | ||
def __init__(self, loop, client): | ||
self.loop = loop | ||
self.client = client = client | ||
self.client.on_socket_open = self.on_socket_open | ||
self.client.on_socket_close = self.on_socket_close | ||
self.client.on_socket_register_write = self.on_socket_register_write | ||
self.client.on_socket_unregister_write = self.on_socket_unregister_write | ||
|
||
def on_socket_open(self, client, userdata, sock): | ||
print("Socket opened") | ||
|
||
def cb(): | ||
print("Socket is readable, calling loop_read") | ||
client.loop_read() | ||
|
||
self.loop.add_reader(sock, cb) | ||
|
||
def on_socket_close(self, client, userdata, sock): | ||
print("Socket closed") | ||
self.loop.remove_reader(sock) | ||
|
||
def on_socket_register_write(self, client, userdata, sock): | ||
print("Watching socket for writability.") | ||
|
||
def cb(): | ||
print("Socket is writable, calling loop_write") | ||
client.loop_write() | ||
|
||
self.loop.add_writer(sock, cb) | ||
|
||
def on_socket_unregister_write(self, client, userdata, sock): | ||
print("Stop watching socket for writability.") | ||
self.loop.remove_writer(sock) | ||
|
||
async def misc_loop(self): | ||
while True: | ||
self.client.loop_misc() | ||
await asyncio.sleep(1) | ||
|
||
def start(self): | ||
self.misc = self.loop.create_task(self.misc_loop()) | ||
|
||
def stop(self): | ||
self.misc.cancel() | ||
|
||
|
||
class AsyncMqttExample: | ||
def __init__(self, loop): | ||
self.loop = loop | ||
|
||
def on_connect(self, client, userdata, flags, rc): | ||
print("Subscribing") | ||
client.subscribe(topic) | ||
|
||
def on_message(self, client, userdata, msg): | ||
if not self.got_message: | ||
print("Got unexpected message: {}".format(msg.decode())) | ||
else: | ||
self.got_message.set_result(msg.payload) | ||
|
||
def on_disconnect(self, client, userdata, rc): | ||
self.disconnected.set_result(rc) | ||
|
||
async def main(self): | ||
self.disconnected = self.loop.create_future() | ||
self.got_message = None | ||
|
||
self.client = mqtt.Client(client_id=client_id) | ||
self.client.on_connect = self.on_connect | ||
self.client.on_message = self.on_message | ||
self.client.on_disconnect = self.on_disconnect | ||
|
||
aioh = AsyncioHelper(self.loop, self.client) | ||
aioh.start() | ||
|
||
self.client.connect('iot.eclipse.org', 1883, 60) | ||
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) | ||
|
||
for c in range(3): | ||
await asyncio.sleep(5) | ||
print("Publishing") | ||
self.got_message = self.loop.create_future() | ||
self.client.publish(topic, b'Hello' * 40000, qos=1) | ||
msg = await self.got_message | ||
print("Got response with {} bytes".format(len(msg))) | ||
self.got_message = None | ||
|
||
self.client.disconnect() | ||
print("Disconnected: {}".format(await self.disconnected)) | ||
aioh.stop() | ||
|
||
|
||
print("Starting") | ||
loop = asyncio.get_event_loop() | ||
loop.run_until_complete(AsyncMqttExample(loop).main()) | ||
loop.close() | ||
print("Finished") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import socket | ||
import uuid | ||
import paho.mqtt.client as mqtt | ||
from select import select | ||
from time import time | ||
|
||
client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4()) | ||
topic = client_id | ||
print("Using client_id / topic: " + client_id) | ||
|
||
|
||
class SelectMqttExample: | ||
def __init__(self): | ||
pass | ||
|
||
def on_connect(self, client, userdata, flags, rc): | ||
print("Subscribing") | ||
client.subscribe(topic) | ||
|
||
def on_message(self, client, userdata, msg): | ||
if self.state not in {1,3,5}: | ||
print("Got unexpected message: {}".format(msg.decode())) | ||
else: | ||
print("Got message with len {}".format(len(msg.payload))) | ||
self.state += 1 | ||
self.t = time() | ||
|
||
def on_disconnect(self, client, userdata, rc): | ||
self.disconnected = True, rc | ||
|
||
def do_select(self): | ||
sock = self.client.socket() | ||
if not sock: | ||
raise Exception("Socket is gone") | ||
|
||
print("Selecting for reading" + (" and writing" if self.client.want_write() else "")) | ||
r, w, e = select( | ||
[sock], | ||
[sock] if self.client.want_write() else [], | ||
[], | ||
1 | ||
) | ||
|
||
if sock in r: | ||
print("Socket is readable, calling loop_read") | ||
self.client.loop_read() | ||
|
||
if sock in w: | ||
print("Socket is writable, calling loop_write") | ||
self.client.loop_write() | ||
|
||
self.client.loop_misc() | ||
|
||
def main(self): | ||
self.disconnected = (False, None) | ||
self.t = time() | ||
self.state = 0 | ||
|
||
self.client = mqtt.Client(client_id=client_id) | ||
self.client.on_connect = self.on_connect | ||
self.client.on_message = self.on_message | ||
self.client.on_disconnect = self.on_disconnect | ||
|
||
self.client.connect('iot.eclipse.org', 1883, 60) | ||
print("Socket opened") | ||
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048) | ||
|
||
while not self.disconnected[0]: | ||
self.do_select() | ||
|
||
if self.state in {0,2,4}: | ||
if time() - self.t >= 5: | ||
print("Publishing") | ||
self.client.publish(topic, b'Hello' * 40000) | ||
self.state += 1 | ||
|
||
if self.state == 6: | ||
self.state += 1 | ||
self.client.disconnect() | ||
|
||
print("Disconnected: {}".format(self.disconnected[1])) | ||
|
||
print("Starting") | ||
SelectMqttExample().main() | ||
print("Finished") |
Oops, something went wrong.