-
Notifications
You must be signed in to change notification settings - Fork 725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent ack of received message until final #348
Comments
While this is possible, why would you want to do this?? Imagine you block with a sleep(60) in on_message, then qos=1 ack is not sent to the broker. Then the broker keeps trying to send the message again and again, queuing the message in the library buffer. I don't think that behaviour is acceptable. The problem in general terms is, you are trying to use protocol level handshaking concepts to signify application level signals. Which I think is bad, we should not confuse the two. |
@vrst37 it is common for many broker protocols to allow waiting with ack until a message is processed. The general thinking is that it is not recieved persistently until e.g. stored. A broker should not resend a message while the connection is not lost. In Mqtt 5 it is even forbidden to resend while the connection is alive. |
@vrst37 I find that there is risk of message lost even with QoS 1 due to the PUBACK was sent back to the broker before calling the on_message callback function. If there is power outage, hardware or other system failure after the PUBACK and before the on_message callback is called, there will be message lost and it seems that there is no way to avoid it. The Java implementation Eclipse Paho client sends the acknowledgement until the messageArrived method returns cleanly.
Besides, I have also tried the mqtt.js client library and it also sends the PUBACK after the on message callback function returns. In other MQ products, they also support similar feature, e.g. RabbitMQ
In the on_message function, programmers may want to write the message to Redis, MQ, Database, etc. with high availability allowing workers for further processing for building reliable and scalable systems. |
MQTT is an application layer protocol on top of the TCP transport layer protocol. If the PUBACK is not sent after the client or the broker handling the message, I think the TCP already provides reliable message delivery and the extra acknowledgement would not be needed. Just my two cents. |
Coming from a RabbitMQ / Google Cloud PubSub background, I find it strange as well that MQTT messages are automatically ACK'ed, even if a problem occurred in the handler. |
I've looked at other Paho implementations, and it seems that the C implementation is the only (other) one that supports explicit acknowledgement, via the return value of the MQTTAsync_messageArrived function. |
I found this eclipse-paho/paho.mqtt.c#522 to be something worth looking at!
Please re-consider this, I would be more that happy to contribute code/tests |
Hello, this is really a drawback for the paho-mqtt library compared to other libraries. We now have no way to guarantee that we will not lose any message. The solution proposed (processing before sending the ack message) would be perfect and doesn't seem complicated to implement. This is a requirement for paho-mqtt to be production-grade! We cannot afford the risk to lose a message due to network glitches or other. |
Hi @chrismaes87 ,
If you read it, it sounds that if you cannot process a message, you can only assure that the broker will re-send the PUBLISH package IF you re-connect. Of course this happens only on publishes QoS > 0. This means that in case you do not want to acknowledge the PUBLISH with a PUBACK the broker will store it and he is only required to send the message to the client again when you re-connect. Please correct me if I am wrong. #FirstGitCommentEver :) |
@munluk this i already commented on in my comment above. Why you should not send the ack is due to if you get a fail crash or fail silent between receiving the message and handling it. When the client crash or hangs it will reconnect. You can't use this as a way to handle any exception, since at least you need to reconnect. But you won't lose the message with this change compared to how it is implemented today. |
The MQTT specifications says:
I personally find this annoying and misleading to burry this small fact in the specs but that's the downside of a lightweight and telemetry-focused protocol. 🙈 Besides this "non-normative example" the specification is extremely vague about the definition of "accepting ownership of a message". I would prefer the application and not the messaging library to decide when the ownership of a message is accepted. A "manual ack" mode would be extremely valuable (like NATS Streaming has). I think a "manual ack" mode would not conflict with the specification as one can easily include the processing of a message (e.g. store it in a database) into the "taking ownership process". I understand that this change would be somewhat revolutionary in the MQTT library landscape, but I see a clear need and no obvious downsides in adding it as an optional feature. |
@linuxbasic, the manual acknowledging is built in the java version of the paho client 😉 https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/IMqttAsyncClient.java#L823 |
I have made a patch 6225aff It adds auto_ack as an argument to the init function, defaulting to True (existing behaviour.) messages already have the "mid" attribute defined. Just need to provide that as an argument to the new Also added an auto_ack(on=True) entry point to turn if off ... purpose of that was this bit of logic, in my client code:
To give the ability to test for presence of the feature, and use it if it is there. It is available in: https://github.com/MetPX/paho.mqtt.python |
With the current implementation, puback says that the message has reached an intermediate point between the server and the application. If we say that the paho library is the receiver, this conforms to the spec ("arrives at the receiver at least once"). However, I would argue that it does so in a very narrow way that is next to useless. That way, QoS 1 slightly increases the chance of delivering the message but it doesn't give any guarantees that are meaningful to the application, i.e. the message can still get lost before it has any effect that is visible to the application. There is some evidence that the authors of MQTT Version 3.1.1 wanted the narrow interpretation, which only focusses on network problems: They talk about "data loss on some older TCP networks" and they don't forbid resending messages while the TCP connection is alive. Both of that is changed in MQTT 5 (as has been pointed out by @HaraldGustafsson). I would argue that the correct default behaviour would be to first call the handler and then puback the message. That way, we will handle it at least once (and maybe more than once if we are interrupted before the puback). Furthermore, the sender can use our puback messages to implement a sensible rate limit. While I strongly think that this is the correct default, it would be an incompatible change. The argument is not so clear cut when the handler aborts with an exception: We would usually not puback and propagate the error. This will most likely lead to a reconnect and the message will be processed again. This is a good reaction in many cases, e.g. if handling failed due to temporary connection problems with the database. It wouldn't make much sense to lie and puback because the next message will run into the same problem. However, this will hamper any further progress if the exception is due to a persistent problem, e.g. one message contains an emoji and the application is not prepared to handle this. We would be better off skipping this message. The approach taken by the PR is different: It doesn't change the default and it gives all the power to the application. It is also more powerful than simply changing the order of calls: The application may return from the callback and only later acknowledge the message. This is required for asyncio-mqtt, which puts the message into an asyncio queue to be handled later. Thus, the PR is necessary even if we decide to change the default behaviour (which I assume, we won't do). With that power, comes some responsibility: The application must make sure to always call ack() eventually. It is in a better position than the library because it can know which exceptions it can recover from. Still, this is no easy feat if the way to the usual ack() call involves several threads or tasks and has multiple failure points. For example, asyncio-mqtt will put messages into a bounded queue and it will drop messages if the queue is full. Unfortunately, some parts of the MQTT 3.1.1 approach are still present in MQTT 5 (and there are good arguments for keeping it that way):
The non-normative part says that the first one is for the benefit of the subscriber (i.e. the application) but it wouldn't be unthinkable that a server takes advantage of that guarantee to optimize its tracking of unacknowledged messages. I think we can add this requirement to the help text and let the application deal with it. They are in a better position to do so. For the second one: We either have to add a default on_message handler with |
I would add that callbacks need to return quickly to avoid message loss (learned that the hard way, yup!) I think callbacks are synchronous with the library. My guess would be there are many applications where ensuring a message is properly received is more involved than just running the initial (abbreviated) callback. |
Just wanted to +1 this issue. I am really surprised that there is not a single python mqtt client library that lets me not send an ACK to the broker when storage to the database fails. |
Just feel the need to point out later in that same section of the MQTT standard: "By default, a Server MUST treat every Topic as an Ordered Topic when it is forwarding messages on Non‑shared Subscriptions. [MQTT-4.6.0-6]. A Server MAY provide an administrative or other mechanism to allow one or more Topics to not be treated as an Ordered Topic." So the above stipulations about ordering apply to non-shared subscriptions. In my use case, all subscriptions are shared, so it does not apply, even in the standard. Also note that the next sentence gives permission for the server to permit out of order... without saying how to signal that to a client. |
fwiw... I'm also puzzled by the term "ordered Topic"... in a single connection, one could subscribe to multiple topics, either with multiple subscribe topics or with wildcards, it isn't clear to me that the broker is sending "an ordered Topic" at all. it is sending messages from many topics. It isn't clear to me that messages from different topics are considered part of the same "ordered Topic" or not. |
my mearning may be more clear thus... perhaps messages on topic A are 10x more expensive (in time) to process than topic B but 100x less frequent. so when you receive a message on A, in a stream of B, then it makes sense to acknowledge 10B's before the A you received, maximizing throughput. I read the spec. as the combined flow on A and B are a single ordered Topic, so it would insist that the A be acknowledged while the B's wait. |
I would be interested in this, as apparently many others are. |
To my understanding this issue was fixed with release 1.6.0 in commit 9a4941e: The ack of QoS = 1 message now occur after on_publish callback. If the issue isn't fixed, feel free to reopen this issue. |
I'm not sure if this is worth re-opening, but until #753 is merged, providing for application controlled manual acknowledgements, I don't think all the cases that people are interested in related to this issue are addressed. The most common/obvious case is back-pressure... if something being used for persistence is temporarily unavailable, then we want to apply back-pressure to the broker, and stop receiving new messages. The calls to on_message are automatic, and the acknowledgement of messages happens after on_message is called without regard to success. Incoming queue of messages will be drained into the current consumer, regardless of the ability to process them. I don't know if such problems have been addressed in more recent code, but I had lots of issues with message loss when my on_message processing got complicated. In my usage of the library, on_messsage had to be extremely lightweight to avoid message loss. So having on_message not return as a means of controlling acknowledgements has not been an option in the past. This is especially problematic when using shared subscriptions where processing is OK on one participant, but broken in the other. Rather than There is no way for an application to apply back-pressure, that is, to get the source to wait while the consumer has some temporary issue, or for different participants to consume at different rates. There should be no reason to debate the need for manual application level acknowledgements since other language bindings (e.g. java) provide this same functionality. it has already been debated and accepted, and refusing to add it to the python binding is just hobbling python users when compared to users of other languages. |
Agree that manual ack could be added. I've closed the issue because the issue was about acking after on_message. Side note on back pressure: I use to do back pressure by not returning from on the on_message callback, which... "work". It does because we don't ack the message but will cause the client to disconnect due to not responding to keepalive which is not perfect. Anyway #753 is merged. |
Hi,
Would it be possible adapt paho so that qos=1 message are not acknowledge until after handling of the on_message call? I was thinking that in
client.py: _handle_publish()
we have the following code:Could it be changed to:
And then allow
_handle_on_message
to pass on SomeSuitableException. Which should be some Paho defined exception. This would allow not acknowledge message if for example the callback fails to permanently store the message due to error in such storage.The text was updated successfully, but these errors were encountered: