Skip to content

Commit

Permalink
add auto_ack mode, so it can be turned off to give application contro…
Browse files Browse the repository at this point in the history
…l over when acknowledgements are sent.
  • Loading branch information
petersilva committed Mar 3, 2021
1 parent 225ab37 commit 6225aff
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
7 changes: 7 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ transport
set to "websockets" to send MQTT over WebSockets. Leave at the default of
"tcp" to use raw TCP.

auto_ack
set to False to let application acknowledge messages using the ack(message.mid)
entry point, instead of letting the library take care of it on receipt.

Constructor Example
...................
Expand Down Expand Up @@ -898,6 +901,10 @@ userdata
message
an instance of MQTTMessage. This is a class with members ``topic``, ``payload``, ``qos``, ``retain``.

If the Client is set with auto_ack=False, then MQTT acknowledgements are under
caller control. For each message received, call client.ack( *message.mid* ) when
appropriate.

On Message Example
..................

Expand Down
32 changes: 30 additions & 2 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ def on_connect(client, userdata, flags, rc, properties=None):
"""

def __init__(self, client_id="", clean_session=None, userdata=None,
protocol=MQTTv311, transport="tcp"):
protocol=MQTTv311, transport="tcp", auto_ack=True):
"""client_id is the unique client id string used when connecting to the
broker. If client_id is zero length or None, then the behaviour is
defined by which protocol version is in use. If using MQTT v3.1.1, then
Expand Down Expand Up @@ -545,6 +545,13 @@ def __init__(self, client_id="", clean_session=None, userdata=None,
Set transport to "websockets" to use WebSockets as the transport
mechanism. Set to "tcp" to use raw TCP, which is the default.
Normally, when a message is received, the library automatically
acknowledges immediately. auto_ack=False allows the application to
acknowledge receipt after it has completed processing of a message
using a the ack() method. This addresses vulnerabilty to message loss
if applications fails while processing a message, or while it pending
locally.
"""

if protocol == MQTTv5:
Expand All @@ -561,6 +568,7 @@ def __init__(self, client_id="", clean_session=None, userdata=None,
if transport.lower() not in ('websockets', 'tcp'):
raise ValueError(
'transport must be "websockets" or "tcp", not %s' % transport)
self._auto_ack = auto_ack
self._transport = transport.lower()
self._protocol = protocol
self._userdata = userdata
Expand Down Expand Up @@ -3216,7 +3224,10 @@ def _handle_publish(self):
self._handle_on_message(message)
return MQTT_ERR_SUCCESS
elif message.qos == 1:
rc = self._send_puback(message.mid)
if self._auto_ack:
rc = self._send_puback(message.mid)
else:
rc = MQTT_ERR_SUCCESS
self._handle_on_message(message)
return rc
elif message.qos == 2:
Expand All @@ -3228,6 +3239,23 @@ def _handle_publish(self):
else:
return MQTT_ERR_PROTOCOL

def ack( self, mid ):
"""
send an acknowledgement for a given message id. (stored in message.mid )
only useful in QoS=1 and auto_ack=False
"""
if self._auto_ack :
return MQTT_ERR_SUCCESS
return self._send_puback(mid)

def auto_ack( self, on=True ):
"""
The paho library normally acknowledges messages as soon as they are delivered to the caller.
If auto_ack is turned off, then the caller can manually acknowledge messages once application
processing is complete.
"""
self._auto_ack = on

def _handle_pubrel(self):
if self._protocol == MQTTv5:
if self._in_packet['remaining_length'] < 2:
Expand Down

0 comments on commit 6225aff

Please sign in to comment.