Skip to content

Commit

Permalink
Merge pull request wolfSSL#329 from dgarske/cleanup_mtnb
Browse files Browse the repository at this point in the history
Add API to check if message is active (non-blocking only)
  • Loading branch information
lealem47 authored Apr 14, 2023
2 parents 7e8cf39 + 8ce55d6 commit b3aef24
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
7 changes: 5 additions & 2 deletions examples/multithread/multithread.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,11 @@ static int TestIsDone(int rc, MQTTCtx* mqttCtx)
/* check if we are in test mode and done */
wm_SemLock(&mtLock);
if ((rc == 0 || rc == MQTT_CODE_CONTINUE) && mqttCtx->test_mode &&
mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS &&
mqttCtx->client.msg.stat.read == MQTT_MSG_BEGIN) {
mNumMsgsDone == NUM_PUB_TASKS && mNumMsgsRecvd == NUM_PUB_TASKS
#ifdef WOLFMQTT_NONBLOCK
&& !MqttClient_IsMessageActive(&mqttCtx->client, NULL)
#endif
) {
wm_SemUnlock(&mtLock);
mqtt_stop_set();
isDone = 1; /* done */
Expand Down
42 changes: 42 additions & 0 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2486,6 +2486,7 @@ int MqttClient_WaitMessage(MqttClient *client, int timeout_ms)
return MqttClient_WaitMessage_ex(client, &client->msg, timeout_ms);
}

#if defined(WOLFMQTT_MULTITHREAD) || defined(WOLFMQTT_NONBLOCK)
int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg)
{
int rc = MQTT_CODE_SUCCESS;
Expand Down Expand Up @@ -2557,6 +2558,47 @@ int MqttClient_CancelMessage(MqttClient *client, MqttObject* msg)
#endif
return rc;
}
#endif /* WOLFMQTT_MULTITHREAD || WOLFMQTT_NONBLOCK */

#ifdef WOLFMQTT_NONBLOCK
static inline int IsMessageActive(MqttObject *msg)
{
return (msg->stat.read != MQTT_MSG_BEGIN ||
msg->stat.write != MQTT_MSG_BEGIN);
}

int MqttClient_IsMessageActive(
MqttClient *client,
MqttObject *msg)
{
int rc;

/* must supply either client or msg */
if (client == NULL && msg == NULL) {
return MQTT_TRACE_ERROR(MQTT_CODE_ERROR_BAD_ARG);
}

/* if msg is null then client->msg is used */
if ((client != NULL && &client->msg == msg) || msg == NULL) {
#ifdef WOLFMQTT_MULTITHREAD
rc = wm_SemLock(&client->lockClient);
if (rc == 0)
#endif
{
rc = IsMessageActive(&client->msg);
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}
}
else {
rc = IsMessageActive(msg);
}
return rc;
}


#endif /* WOLFMQTT_NONBLOCK */


int MqttClient_NetConnect(MqttClient *client, const char* host,
Expand Down
20 changes: 18 additions & 2 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,10 @@ WOLFMQTT_API int MqttClient_WaitMessage(
*/
WOLFMQTT_API int MqttClient_WaitMessage_ex(
MqttClient *client,
MqttObject* msg,
MqttObject *msg,
int timeout_ms);

#if defined(WOLFMQTT_MULTITHREAD) || defined(WOLFMQTT_NONBLOCK)
/*! \brief In a multi-threaded and non-blocking mode this allows you to
cancel an MQTT object that was previously submitted.
* \note This is a blocking function that will wait for MqttNet.read
Expand All @@ -487,7 +488,22 @@ WOLFMQTT_API int MqttClient_WaitMessage_ex(
*/
WOLFMQTT_API int MqttClient_CancelMessage(
MqttClient *client,
MqttObject* msg);
MqttObject *msg);
#endif

#ifdef WOLFMQTT_NONBLOCK
/*! \brief In a non-blocking mode this checks if the message has a read
or write pending (state is not MQTT_MSG_BEGIN).
* \note This function assumes caller owns the object
* \param client Pointer to MqttClient structure
* \param msg Pointer to MqttObject structure
* \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_*
(see enum MqttPacketResponseCodes)
*/
WOLFMQTT_API int MqttClient_IsMessageActive(
MqttClient *client,
MqttObject *msg);
#endif /* WOLFMQTT_NONBLOCK */

/*! \brief Performs network connect with TLS (if use_tls is non-zero value)
* Will perform the MqttTlsCb callback if use_tls is non-zero value
Expand Down

0 comments on commit b3aef24

Please sign in to comment.