Skip to content

Commit

Permalink
Synchronize sendMessage/send #1494
Browse files Browse the repository at this point in the history
  • Loading branch information
icraggs committed Sep 3, 2024
1 parent 07b773f commit 24b3439
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
32 changes: 30 additions & 2 deletions src/MQTTAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,13 @@ int MQTTAsync_reconnect(MQTTAsync handle)
}


int MQTTAsync_inCallback()
{
thread_id_type thread_id = Paho_thread_getid();
return thread_id == sendThread_id || thread_id == receiveThread_id;
}


int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, const int* qos, MQTTAsync_responseOptions* response)
{
MQTTAsyncs* m = handle;
Expand All @@ -989,6 +996,8 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, con
int msgid = 0;

FUNC_ENTRY;
if (!MQTTAsync_inCallback())
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0)
Expand Down Expand Up @@ -1092,6 +1101,8 @@ int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char* const* topic, con
rc = PAHO_MEMORY_ERROR;

exit:
if (!MQTTAsync_inCallback())
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
Expand All @@ -1116,6 +1127,8 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M
int msgid = 0;

FUNC_ENTRY;
if (!MQTTAsync_inCallback())
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0)
Expand Down Expand Up @@ -1180,6 +1193,8 @@ int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char* const* topic, M
rc = MQTTAsync_addCommand(unsub, sizeof(unsub));

exit:
if (!MQTTAsync_inCallback())
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
Expand All @@ -1204,6 +1219,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
int msgid = 0;

FUNC_ENTRY;
if (!MQTTAsync_inCallback())
MQTTAsync_lock_mutex(mqttasync_mutex);
if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0)
Expand Down Expand Up @@ -1287,6 +1304,8 @@ int MQTTAsync_send(MQTTAsync handle, const char* destinationName, int payloadlen
rc = MQTTAsync_addCommand(pub, sizeof(pub));

exit:
if (!MQTTAsync_inCallback())
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
Expand Down Expand Up @@ -1324,10 +1343,19 @@ int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const M

int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions* options)
{
int rc = 0;

FUNC_ENTRY;
if (!MQTTAsync_inCallback())
MQTTAsync_lock_mutex(mqttasync_mutex);
if (options != NULL && (strncmp(options->struct_id, "MQTD", 4) != 0 || options->struct_version < 0 || options->struct_version > 1))
return MQTTASYNC_BAD_STRUCTURE;
rc = MQTTASYNC_BAD_STRUCTURE;
else
return MQTTAsync_disconnect1(handle, options, 0);
rc = MQTTAsync_disconnect1(handle, options, 0);
if (!MQTTAsync_inCallback())
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}


Expand Down
12 changes: 0 additions & 12 deletions src/MQTTAsyncUtils.c
Original file line number Diff line number Diff line change
Expand Up @@ -2814,19 +2814,9 @@ int MQTTAsync_assignMsgId(MQTTAsyncs* m)
{
int start_msgid;
int msgid;
thread_id_type thread_id = 0;
int locked = 0;

/* need to check: commands list and response list for a client */
FUNC_ENTRY;
/* We might be called in a callback. In which case, this mutex will be already locked. */
thread_id = Paho_thread_getid();
if (thread_id != sendThread_id && thread_id != receiveThread_id)
{
MQTTAsync_lock_mutex(mqttasync_mutex);
locked = 1;
}

/* Fetch last message ID in locked state */
start_msgid = m->c->msgID;
msgid = start_msgid;
Expand All @@ -2847,8 +2837,6 @@ int MQTTAsync_assignMsgId(MQTTAsyncs* m)
MQTTAsync_unlock_mutex(mqttcommand_mutex);
if (msgid != 0)
m->c->msgID = msgid;
if (locked)
MQTTAsync_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(msgid);
return msgid;
}
Expand Down

0 comments on commit 24b3439

Please sign in to comment.