Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncClient: Possible race in MQTTAsync_assignMsgId, results in same msgid in used multiple messages #867

Closed
mtrensch opened this issue Apr 29, 2020 · 4 comments · Fixed by #868
Labels
Milestone

Comments

@mtrensch
Copy link
Contributor

mtrensch commented Apr 29, 2020

Describe the bug
I have a project that publishes about 100 QoS1/2 messages at once using multiple threads every 250ms. After some time the memory and cpu usage of the receiver thread rises.

Looking at the internal list "m->responses", i see that old msgid's are still inserted, while a PUBCOMP has already been received.
It seems that the same msgid is used for multiple messages

Looking at the function "MQTTAsync_assignMsgId" shows a possible race when not being called from an internal thread. The previous msgid is fetched before the mutex has been locked and assigned inside locked context. If Thread B was blocked by the mutex and Thread A stores a new msgid, "m->c->msgID" might be overwritten by the old msgid. As the message is not in the command list yet, the check against messages in the command queue does not help

To Reproduce
I currently don't have a setup, but I think the description above is quite explaining the problem.
I added to print if a message from the response queue does not match the incoming PUBCOMP. (see https://github.com/mtrensch/paho.mqtt.c/blob/3148fe2d5f4b87e16266dfe559c0764e16ca0546/src/MQTTAsync.c#L4126)

@@ -4123,12 +4127,16 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
                                                Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
                                                (*(m->dc))(m->dcContext, msgid);
                                        }
+                                       Log(LOG_ERROR, -1, "*** pubcomp,ack,rec with msgid %d, type=%d. QueueSize=%d",
+                                           msgid, pack->header.bits.type, m->responses->count);
+
                                        /* use the msgid to find the callback to be called */
                                        while (ListNextElement(m->responses, &current))
                                        {
                                                MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
                                                if (command->command.token == msgid)
                                                {
+                                                       Log(LOG_ERROR, -1, "*** Found message %p (callback=%p)", command, command->command.onSuccess);
                                                        if (!ListDetach(m->responses, command)) /* then remove the response from the list */
                                                                Log(LOG_ERROR, -1, "Publish command not removed from command list");
                                                        if (command->command.onSuccess)
@@ -4171,6 +4179,8 @@ static MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
                                                        }
                                                        MQTTAsync_freeCommand(command);
                                                        break;
+                                               } else {
+                                                       Log(LOG_ERROR, -1, "*** Skipped message (%d!=%d)", command->command.token, msgid);
                                                }
                                        }
                                        if (mqttversion >= MQTTVERSION_5)

This produced the following protocol log (I stripped unnecessary debugging output)

Trace : 4, 20200429 110545.379 11 091225983063f806 -> CONNECT version 3 clean: 0 (0)
Trace : 4, 20200429 110545.381 11 091225983063f806 <- CONNACK rc: 0
Trace : 4, 20200429 110545.692 11 091225983063f806 -> PUBLISH msgid: 1 qos: 2 retained: 0 rc 0 payload len(687):
Trace : 4, 20200429 110545.692 11 091225983063f806 -> PUBLISH msgid: 2 qos: 2 retained: 0 rc 0 payload len(752):
Trace : 4, 20200429 110545.694 11 091225983063f806 -> PUBLISH msgid: 3 qos: 2 retained: 0 rc 0 payload len(750):
Trace : 4, 20200429 110545.695 11 091225983063f806 -> PUBLISH msgid: 4 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.696 11 091225983063f806 <- PUBREC msgid: 1
Trace : 4, 20200429 110545.697 11 091225983063f806 -> PUBREL msgid: 1 (0)
Trace : 4, 20200429 110545.698 11 091225983063f806 <- PUBREC msgid: 2
Trace : 4, 20200429 110545.698 11 091225983063f806 -> PUBREL msgid: 2 (0)
Trace : 4, 20200429 110545.698 11 091225983063f806 -> PUBLISH msgid: 5 qos: 2 retained: 0 rc 0 payload len(752):
Trace : 4, 20200429 110545.708 11 091225983063f806 -> PUBLISH msgid: 6 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.709 11 091225983063f806 -> PUBLISH msgid: 7 qos: 2 retained: 0 rc 0 payload len(753):
Trace : 4, 20200429 110545.709 11 091225983063f806 <- PUBREC msgid: 3
Trace : 4, 20200429 110545.709 11 091225983063f806 -> PUBREL msgid: 3 (0)
Trace : 4, 20200429 110545.709 11 091225983063f806 <- PUBREC msgid: 4
Trace : 4, 20200429 110545.709 11 091225983063f806 -> PUBREL msgid: 4 (0)
Trace : 4, 20200429 110545.710 11 091225983063f806 <- PUBCOMP msgid:1
Trace : 4, 20200429 110545.710 11 091225983063f806 <- PUBCOMP msgid:2
Trace : 4, 20200429 110545.711 11 091225983063f806 <- PUBREC msgid: 5
Trace : 4, 20200429 110545.711 11 091225983063f806 -> PUBREL msgid: 5 (0)
Trace : 4, 20200429 110545.712 11 091225983063f806 <- PUBREC msgid: 6
Trace : 4, 20200429 110545.712 11 091225983063f806 -> PUBREL msgid: 6 (0)
Trace : 4, 20200429 110545.712 11 091225983063f806 -> PUBLISH msgid: 9 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.713 11 091225983063f806 -> PUBLISH msgid: 10 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.714 11 091225983063f806 -> PUBLISH msgid: 11 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.714 11 091225983063f806 -> PUBLISH msgid: 12 qos: 2 retained: 0 rc 0 payload len(753):
Trace : 4, 20200429 110545.714 11 091225983063f806 -> PUBLISH msgid: 13 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.715 11 091225983063f806 <- PUBREC msgid: 7
Trace : 4, 20200429 110545.715 11 091225983063f806 -> PUBREL msgid: 7 (0)
Trace : 4, 20200429 110545.715 11 091225983063f806 <- PUBCOMP msgid:3
Trace : 4, 20200429 110545.716 11 091225983063f806 <- PUBCOMP msgid:4
Trace : 4, 20200429 110545.717 11 091225983063f806 <- PUBCOMP msgid:5
Trace : 4, 20200429 110545.717 11 091225983063f806 <- PUBCOMP msgid:6
Trace : 4, 20200429 110545.718 11 091225983063f806 <- PUBREC msgid: 9
Trace : 4, 20200429 110545.718 11 091225983063f806 -> PUBREL msgid: 9 (0)
Trace : 4, 20200429 110545.718 11 091225983063f806 <- PUBREC msgid: 10
Trace : 4, 20200429 110545.718 11 091225983063f806 -> PUBREL msgid: 10 (0)
Trace : 4, 20200429 110545.718 11 091225983063f806 <- PUBREC msgid: 11
Trace : 4, 20200429 110545.718 11 091225983063f806 -> PUBREL msgid: 11 (0)
Trace : 4, 20200429 110545.718 11 091225983063f806 -> PUBLISH msgid: 14 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.719 11 091225983063f806 -> PUBLISH msgid: 14 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.720 11 091225983063f806 -> PUBLISH msgid: 15 qos: 2 retained: 0 rc 0 payload len(753):
Trace : 4, 20200429 110545.720 11 091225983063f806 -> PUBLISH msgid: 16 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.721 11 091225983063f806 -> PUBLISH msgid: 17 qos: 2 retained: 0 rc 0 payload len(753):
Trace : 4, 20200429 110545.721 11 091225983063f806 <- PUBREC msgid: 12
Trace : 4, 20200429 110545.721 11 091225983063f806 -> PUBREL msgid: 12 (0)
Trace : 4, 20200429 110545.721 11 091225983063f806 -> PUBLISH msgid: 18 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.722 11 091225983063f806 <- PUBREC msgid: 13
Trace : 4, 20200429 110545.722 11 091225983063f806 -> PUBREL msgid: 13 (0)
Trace : 4, 20200429 110545.722 11 091225983063f806 <- PUBCOMP msgid:7
Trace : 4, 20200429 110545.722 11 091225983063f806 -> PUBLISH msgid: 19 qos: 2 retained: 0 rc 0 payload len(753):
Trace : 4, 20200429 110545.722 11 091225983063f806 -> PUBLISH msgid: 8 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.723 11 091225983063f806 <- PUBCOMP msgid:9
Trace : 4, 20200429 110545.723 11 091225983063f806 <- PUBCOMP msgid:10
Trace : 4, 20200429 110545.723 11 091225983063f806 -> PUBLISH msgid: 20 qos: 2 retained: 0 rc 0 payload len(753):
Trace : 4, 20200429 110545.724 11 091225983063f806 -> PUBLISH msgid: 21 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.724 11 091225983063f806 -> PUBLISH msgid: 22 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.724 11 091225983063f806 <- PUBCOMP msgid:11
Trace : 4, 20200429 110545.725 11 091225983063f806 -> PUBLISH msgid: 23 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.726 11 091225983063f806 <- PUBREC msgid: 14
Trace : 4, 20200429 110545.726 11 091225983063f806 -> PUBREL msgid: 14 (0)
Trace : 4, 20200429 110545.726 11 091225983063f806 <- PUBREC msgid: 14
Trace : 4, 20200429 110545.726 11 091225983063f806 -> PUBLISH msgid: 24 qos: 2 retained: 0 rc 0 payload len(688):
Trace : 4, 20200429 110545.728 11 091225983063f806 <- PUBREC msgid: 15
Trace : 4, 20200429 110545.728 11 091225983063f806 -> PUBREL msgid: 15 (0)
Trace : 4, 20200429 110545.729 11 091225983063f806 <- PUBREC msgid: 16
Trace : 4, 20200429 110545.730 11 091225983063f806 -> PUBREL msgid: 16 (0)
Trace : 4, 20200429 110545.731 11 091225983063f806 <- PUBREC msgid: 17
Trace : 4, 20200429 110545.731 11 091225983063f806 -> PUBREL msgid: 17 (0)
Trace : 4, 20200429 110545.731 11 091225983063f806 <- PUBCOMP msgid:12
Trace : 4, 20200429 110545.733 11 091225983063f806 <- PUBREC msgid: 18
Trace : 4, 20200429 110545.733 11 091225983063f806 -> PUBREL msgid: 18 (0)
Trace : 4, 20200429 110545.733 11 091225983063f806 <- PUBCOMP msgid:13
Trace : 4, 20200429 110545.735 11 091225983063f806 <- PUBREC msgid: 19
Trace : 4, 20200429 110545.735 11 091225983063f806 -> PUBREL msgid: 19 (0)
Trace : 4, 20200429 110545.736 11 091225983063f806 <- PUBREC msgid: 8
Trace : 4, 20200429 110545.737 11 091225983063f806 -> PUBREL msgid: 8 (0)
Trace : 4, 20200429 110545.738 11 091225983063f806 <- PUBREC msgid: 20
Trace : 4, 20200429 110545.738 11 091225983063f806 -> PUBREL msgid: 20 (0)
Trace : 4, 20200429 110545.740 11 091225983063f806 <- PUBREC msgid: 21
Trace : 4, 20200429 110545.740 11 091225983063f806 -> PUBREL msgid: 21 (0)
Trace : 4, 20200429 110545.741 11 091225983063f806 <- PUBREC msgid: 22
Trace : 4, 20200429 110545.741 11 091225983063f806 -> PUBREL msgid: 22 (0)
Trace : 4, 20200429 110545.742 11 091225983063f806 <- PUBREC msgid: 23
Trace : 4, 20200429 110545.742 11 091225983063f806 -> PUBREL msgid: 23 (0)
Trace : 4, 20200429 110545.743 11 091225983063f806 <- PUBCOMP msgid:14
Trace : 4, 20200429 110545.744 11 091225983063f806 <- PUBREC msgid: 24
Trace : 4, 20200429 110545.744 11 091225983063f806 -> PUBREL msgid: 24 (0)
Trace : 4, 20200429 110545.745 11 091225983063f806 <- PUBCOMP msgid:15
Trace : 4, 20200429 110545.745 *** Skipped message (14!=15)

Expected behavior
I would expect that all active messages have a unique msgid.

** Environment (please complete the following information):**

  • OS: Linux
  • Version 1.3.2

Additional context
This affects multi-threaded environments when using a single client with multiple threads. In QoS0 it does not cause any harm, but at higher QoS levels the command queue size will increase and never be cleaned up

Did sign the ECLA now and did a pull-request #868

@mtrensch mtrensch changed the title AsyncClient: Possible race in MQTTAsync_assignMsgId AsyncClient: Possible race in MQTTAsync_assignMsgId, results in same msgid in multiple messages Apr 29, 2020
@mtrensch mtrensch changed the title AsyncClient: Possible race in MQTTAsync_assignMsgId, results in same msgid in multiple messages AsyncClient: Possible race in MQTTAsync_assignMsgId, results in same msgid in used multiple messages Apr 29, 2020
mtrensch added a commit to mtrensch/paho.mqtt.c that referenced this issue Apr 29, 2020
…ultiple times, if one client uses multiple threads

Fixes eclipse-paho#867

Signed-off-by: Michael Trensch <mtrensch@gmail.com>
@icraggs icraggs added this to the 1.3.3 milestone Apr 29, 2020
@icraggs
Copy link
Contributor

icraggs commented May 1, 2020

I see the problem. Thanks very much for the PR. Were you able to verify that the change fixed the problem?

@icraggs icraggs added the bug label May 1, 2020
@mtrensch
Copy link
Contributor Author

mtrensch commented May 2, 2020

After applying the fix my problem was gone and a long-term-test, with paho mqtt c 1.3.2 with only applied pull-request, was running for 44 hours now. Paho's receiver thread is still at 3% CPU usage and my whole program is stuck at 22% CPU all the time. Memory usage stays at 0.2% for the 44h. So there is no memory / cpu usage rising any more and the problem is gone. Before applying the pull-request my program died after after 3-4 hours, showing CPU and memory usage rising during this period.

When I am back at work, I might try to deliver a program / test to show the problem and verify if it's gone, if you like. But I am not sure if a reliable test can be done, as it happens occasionally and you cannot access the affected functions directly.

@icraggs
Copy link
Contributor

icraggs commented May 2, 2020

You don't need to do any more, unless you really want to. I can see that it's a problem, I missed the initializations when the lock was added. I just wanted to get an idea of whether there were any other problems lurking. Thanks again.

@mtrensch
Copy link
Contributor Author

mtrensch commented May 2, 2020

You are welcome.
I will not try to build a testcase as I am not convinced it is easy to do. The problem is, as you say, obvious and I don't see any more obvious problems in the code path I am using. My long term test did not show any problems now.

It was not easy to detect, as I did search in the wrong location at first, but the good (protocol) tracing mechanism in the library helped a lot.

icraggs added a commit that referenced this issue May 2, 2020
Fetch clients messageid in locked state to prevent using same msgid for different messages (Issue #867)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants