diff --git a/conf/janus.eventhandler.mqttevh.jcfg.sample b/conf/janus.eventhandler.mqttevh.jcfg.sample index 5d273dbe41..85d77e0559 100644 --- a/conf/janus.eventhandler.mqttevh.jcfg.sample +++ b/conf/janus.eventhandler.mqttevh.jcfg.sample @@ -16,6 +16,7 @@ general: { # plain (no indentation) or compact (no indentation and no spaces) url = "tcp://localhost:1883" # The URL of the MQTT server. Only tcp supported at this time. + #mqtt_version = "3.1.1" # Protocol version. Available values: 3.1, 3.1.1 (default), 5. client_id = "janus.example.com" # Janus client id. You have to configure a unique ID (default: guest). #keep_alive_interval = 20 # Keep connection for N seconds (default: 30) #cleansession = 0 # Clean session flag (default: off) @@ -46,4 +47,7 @@ general: { #tls_client_key = "/path/to/key.pem" #tls_ciphers #tls_version + + # These options work with MQTT 5 only. + #add_user_properties = () # List of user property ["key", "value"] pairs to add. } diff --git a/events/janus_mqttevh.c b/events/janus_mqttevh.c index 29bf519531..7b994de9f1 100644 --- a/events/janus_mqttevh.c +++ b/events/janus_mqttevh.c @@ -46,7 +46,6 @@ json_t *janus_mqttevh_handle_request(json_t *request); static int janus_mqttevh_send_message(void *context, const char *topic, json_t *message); static void *janus_mqttevh_handler(void *data); - /* Event handler setup */ static janus_eventhandler janus_mqttevh = JANUS_EVENTHANDLER_INIT ( @@ -92,27 +91,31 @@ static GThread *handler_thread; static volatile gint initialized = 0, stopping = 0; /* JSON serialization options */ -#define DEFAULT_ADDPLUGIN 1 -#define DEFAULT_ADDEVENT 1 -#define DEFAULT_KEEPALIVE 30 -#define DEFAULT_CLEANSESSION 0 /* Off */ -#define DEFAULT_TIMEOUT 30 -#define DEFAULT_DISCONNECT_TIMEOUT 100 -#define DEFAULT_QOS 0 -#define DEFAULT_RETAIN 0 -#define DEFAULT_CONNECT_STATUS "{\"event\": \"connected\", \"eventhandler\": \""JANUS_MQTTEVH_PACKAGE"\"}" -#define DEFAULT_DISCONNECT_STATUS "{\"event\": \"disconnected\"}" -#define DEFAULT_WILL_RETAIN 1 -#define DEFAULT_WILL_QOS 0 -#define DEFAULT_BASETOPIC "/janus/events" -#define DEFAULT_MQTTURL "tcp://localhost:1883" -#define DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER - -#define DEFAULT_TLS_ENABLE FALSE -#define DEFAULT_TLS_VERIFY_PEER FALSE -#define DEFAULT_TLS_VERIFY_HOST FALSE - -static size_t json_format = DEFAULT_JSON_FORMAT; +#define JANUS_MQTTEVH_DEFAULT_ADDPLUGIN 1 +#define JANUS_MQTTEVH_DEFAULT_ADDEVENT 1 +#define JANUS_MQTTEVH_DEFAULT_KEEPALIVE 30 +#define JANUS_MQTTEVH_DEFAULT_CLEANSESSION 0 /* Off */ +#define JANUS_MQTTEVH_DEFAULT_TIMEOUT 30 +#define JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT 100 +#define JANUS_MQTTEVH_DEFAULT_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_RETAIN 0 +#define JANUS_MQTTEVH_DEFAULT_CONNECT_STATUS "{\"event\": \"connected\", \"eventhandler\": \""JANUS_MQTTEVH_PACKAGE"\"}" +#define JANUS_MQTTEVH_DEFAULT_DISCONNECT_STATUS "{\"event\": \"disconnected\"}" +#define JANUS_MQTTEVH_DEFAULT_WILL_RETAIN 1 +#define JANUS_MQTTEVH_DEFAULT_WILL_QOS 0 +#define JANUS_MQTTEVH_DEFAULT_BASETOPIC "/janus/events" +#define JANUS_MQTTEVH_DEFAULT_MQTTURL "tcp://localhost:1883" +#define JANUS_MQTTEVH_DEFAULT_JSON_FORMAT JSON_INDENT(3) | JSON_PRESERVE_ORDER +#define JANUS_MQTTEVH_DEFAULT_TLS_ENABLE FALSE +#define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_PEER FALSE +#define JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_HOST FALSE + +#define JANUS_MQTTEVH_VERSION_3_1 "3.1" +#define JANUS_MQTTEVH_VERSION_3_1_1 "3.1.1" +#define JANUS_MQTTEVH_VERSION_5 "5" +#define JANUS_MQTTEVH_VERSION_DEFAULT JANUS_MQTTEVH_VERSION_3_1_1 + +static size_t json_format = JANUS_MQTTEVH_DEFAULT_JSON_FORMAT; /* Parameter validation (for tweaking via Admin API) */ @@ -142,6 +145,7 @@ typedef struct janus_mqttevh_context { /* Connection data - authentication and url */ struct { + int mqtt_version; int keep_alive_interval; int cleansession; char *client_id; @@ -161,6 +165,9 @@ typedef struct janus_mqttevh_context { char *disconnect_status; int qos; int retain; + #ifdef MQTTVERSION_5 + GArray *add_user_properties; + #endif } publish; /* If we loose connection, the will is our last publish */ @@ -182,21 +189,53 @@ typedef struct janus_mqttevh_context { } tls; } janus_mqttevh_context; +#ifdef MQTTVERSION_5 +/* MQTT 5 specific types */ +typedef struct janus_mqttevh_set_add_user_property_user_data { + GArray *acc; + janus_config *config; +} janus_mqttevh_set_add_user_property_user_data; +#endif + /* Event handler methods */ static void janus_mqttevh_client_connection_lost(void *context, char *cause); static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx); +static int janus_mqttevh_client_disconnect(janus_mqttevh_context *ctx); +static void janus_mqttevh_client_destroy_context(janus_mqttevh_context **ctx); +/* MQTT v3.x interface callbacks */ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_successData *response); static void janus_mqttevh_client_connect_failure(void *context, MQTTAsync_failureData *response); -static int janus_mqttevh_client_reconnect(janus_mqttevh_context *ctx); -static void janus_mqttevh_client_reconnect_success(void *context, MQTTAsync_successData *response); -static void janus_mqttevh_client_reconnect_failure(void *context, MQTTAsync_failureData *response); -static int janus_mqttevh_client_disconnect(janus_mqttevh_context *ctx); static void janus_mqttevh_client_disconnect_success(void *context, MQTTAsync_successData *response); static void janus_mqttevh_client_disconnect_failure(void *context, MQTTAsync_failureData *response); +static void janus_mqttevh_client_publish_message_success(void *context, MQTTAsync_successData *response); +static void janus_mqttevh_client_publish_message_failure(void *context, MQTTAsync_failureData *response); static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload); -static void janus_mqttevh_client_publish_janus_success(void *context, MQTTAsync_successData *response); -static void janus_mqttevh_client_publish_janus_failure(void *context, MQTTAsync_failureData *response); -static void janus_mqttevh_client_destroy_context(janus_mqttevh_context **ctx); +int janus_mqttevh_client_get_response_code(MQTTAsync_failureData *response); +#ifdef MQTTVERSION_5 +/* MQTT v5 interface callbacks */ +static void janus_mqttevh_client_connect_success5(void *context, MQTTAsync_successData5 *response); +static void janus_mqttevh_client_connect_failure5(void *context, MQTTAsync_failureData5 *response); +static void janus_mqttevh_client_disconnect_success5(void *context, MQTTAsync_successData5 *response); +static void janus_mqttevh_client_disconnect_failure5(void *context, MQTTAsync_failureData5 *response); +static void janus_mqttevh_client_publish_message_success5(void *context, MQTTAsync_successData5 *response); +static void janus_mqttevh_client_publish_message_failure5(void *context, MQTTAsync_failureData5 *response); +static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload, MQTTProperties *properties); +int janus_mqttevh_client_get_response_code5(MQTTAsync_failureData5 *response); +#endif +/* MQTT version independent callback implementations */ +static void janus_mqttevh_client_connect_success_impl(void *context); +static void janus_mqttevh_client_connect_failure_impl(void *context, int rc); +static void janus_mqttevh_client_disconnect_success_impl(void *context); +static void janus_mqttevh_client_disconnect_failure_impl(void *context, int rc); +static void janus_mqttevh_client_publish_message_success_impl(void *context); +static void janus_mqttevh_client_publish_message_failure_impl(void *context, int rc); +int janus_mqttevh_client_publish_message_wrap(void *context, const char *topic, int retain, char *payload); + +#ifdef MQTTVERSION_5 +/* MQTT 5 specific functions */ +void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *properties); +void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_ptr); +#endif /* We only handle a single connection */ static janus_mqttevh_context *context = NULL; @@ -267,8 +306,7 @@ static int janus_mqttevh_send_message(void *context, const char *topic, json_t * /* Ok, lets' get rid of the message */ json_decref(message); - rc = janus_mqttevh_client_publish_message(ctx, topic, ctx->publish.retain, payload); - + rc = janus_mqttevh_client_publish_message_wrap(context, topic, ctx->publish.retain, payload); if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_WARN, "Can't publish to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); } @@ -280,6 +318,26 @@ static int janus_mqttevh_send_message(void *context, const char *topic, json_t * return 0; } +int janus_mqttevh_client_publish_message_wrap(void *context, const char *topic, int retain, char *payload) { + int rc = 0; + janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; + +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + MQTTProperties properties = MQTTProperties_initializer; + janus_mqttevh_add_properties(ctx->publish.add_user_properties, &properties); + rc = janus_mqttevh_client_publish_message5(ctx, topic, retain, payload, &properties); + MQTTProperties_free(&properties); + } else { + rc = janus_mqttevh_client_publish_message(ctx, topic, retain, payload); + } +#else + rc = janus_mqttevh_client_publish_message(ctx, topic, retain, payload); +#endif + + return rc; +} + static void janus_mqttevh_client_connection_lost(void *context, char *cause) { /* Notify handlers about this transport being gone */ @@ -290,17 +348,31 @@ static void janus_mqttevh_client_connection_lost(void *context, char *cause) { /* Set up connection to MQTT broker */ static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx) { - int rc; - MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; - options.keepAliveInterval = ctx->connect.keep_alive_interval; + +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + MQTTAsync_connectOptions options5 = MQTTAsync_connectOptions_initializer5; + options = options5; + options.cleanstart = ctx->connect.cleansession; + options.onSuccess5 = janus_mqttevh_client_connect_success5; + options.onFailure5 = janus_mqttevh_client_connect_failure5; + } else { + options.cleansession = ctx->connect.cleansession; + options.onSuccess = janus_mqttevh_client_connect_success; + options.onFailure = janus_mqttevh_client_connect_failure; + } +#else options.cleansession = ctx->connect.cleansession; + options.onSuccess = janus_mqttevh_client_connect_success; + options.onFailure = janus_mqttevh_client_connect_failure; +#endif + + options.MQTTVersion = ctx->connect.mqtt_version; options.username = ctx->connect.username; options.password = ctx->connect.password; options.automaticReconnect = TRUE; - options.onSuccess = janus_mqttevh_client_connect_success; - options.onFailure = janus_mqttevh_client_connect_failure; - options.context = ctx; + options.keepAliveInterval = ctx->connect.keep_alive_interval; MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer; if(ctx->will.enabled) { @@ -308,16 +380,25 @@ static int janus_mqttevh_client_connect(janus_mqttevh_context *ctx) { willOptions.message = ctx->publish.disconnect_status; willOptions.retained = ctx->will.retain; willOptions.qos = ctx->will.qos; - options.will = &willOptions; } - rc = MQTTAsync_connect(ctx->client, &options); - return rc; + options.context = ctx; + return MQTTAsync_connect(ctx->client, &options); } -/* Callback for succesful connection to MQTT broker */ +/* Callback for successful connection to MQTT broker */ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_successData *response) { + janus_mqttevh_client_connect_success_impl(context); +} + +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_connect_success5(void *context, MQTTAsync_successData5 *response) { + janus_mqttevh_client_connect_success_impl(context); +} +#endif + +static void janus_mqttevh_client_connect_success_impl(void *context) { JANUS_LOG(LOG_INFO, "MQTT EVH client has been successfully connected to the broker\n"); janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; @@ -328,7 +409,7 @@ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_succes /* Using LWT's retain for initial status message because * we need to ensure we overwrite LWT if it's retained. */ - int rc = janus_mqttevh_client_publish_message(ctx, topicbuf, ctx->will.retain, ctx->publish.connect_status); + int rc = janus_mqttevh_client_publish_message_wrap(context, topicbuf, ctx->will.retain, ctx->publish.connect_status); if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_WARN, "Can't publish to MQTT topic: %s, return code: %d\n", topicbuf, rc); @@ -337,58 +418,20 @@ static void janus_mqttevh_client_connect_success(void *context, MQTTAsync_succes /* Callback for MQTT broker connection failure */ static void janus_mqttevh_client_connect_failure(void *context, MQTTAsync_failureData *response) { - int rc = response ? response->code : 0; - - /* Notify handlers about this transport failure */ - JANUS_LOG(LOG_ERR, "MQTT EVH client has failed connecting to the broker, return code: %d. Reconnecting...\n", rc); - + int rc = janus_mqttevh_client_get_response_code(response); + janus_mqttevh_client_connect_failure_impl(context, rc); } -/* MQTT broker Reconnect function */ -static int janus_mqttevh_client_reconnect(janus_mqttevh_context *ctx) { - JANUS_LOG(LOG_INFO, "MQTT EVH client reconnecting to %s. Reconnecting...\n", ctx->connect.url); - - MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - options.onSuccess = janus_mqttevh_client_reconnect_success; - options.onFailure = janus_mqttevh_client_reconnect_failure; - options.context = ctx; - options.timeout = ctx->disconnect.timeout; - - return MQTTAsync_disconnect(ctx->client, &options); -} - -/* Callback for successful reconnection to MQTT broker */ -static void janus_mqttevh_client_reconnect_success(void *context, MQTTAsync_successData *response) { - janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; - - JANUS_LOG(LOG_WARN, "MQTT EVH client has been disconnected from %s. Reconnecting...\n", ctx->connect.url); - - int rc = janus_mqttevh_client_connect(context); - if(rc != MQTTASYNC_SUCCESS) { - const char *error; - switch(rc) { - case 1: error = "Connection refused - protocol version"; - break; - case 2: error = "Connection refused - identifier rejected"; - break; - case 3: error = "Connection refused - server unavailable"; - break; - case 4: error = "Connection refused - bad credentials"; - break; - case 5: error = "Connection refused - not authroized"; - break; - default: error = "Connection refused - unknown error"; - break; - } - JANUS_LOG(LOG_FATAL, "Can't connect to MQTT broker, return code: %d (%s)\n", rc, error); - return; - } +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_connect_failure5(void *context, MQTTAsync_failureData5 *response) { + int rc = janus_mqttevh_client_get_response_code5(response); + janus_mqttevh_client_connect_failure_impl(context, rc); } +#endif -/* Callback for MQTT broker reconnect failure */ -static void janus_mqttevh_client_reconnect_failure(void *context, MQTTAsync_failureData *response) { - int rc = response ? response->code : 0; - JANUS_LOG(LOG_ERR, "MQTT EVH client failed reconnecting to MQTT broker, return code: %d\n", rc); +static void janus_mqttevh_client_connect_failure_impl(void *contexts, int rc) { + /* Notify handlers about this transport failure */ + JANUS_LOG(LOG_ERR, "MQTT EVH client has failed connecting to the broker, return code: %d. Reconnecting...\n", rc); } /* Disconnect from MQTT broker */ @@ -399,23 +442,44 @@ static int janus_mqttevh_client_disconnect(janus_mqttevh_context *ctx) { /* Using LWT's retain for disconnect status message because * we need to ensure we overwrite LWT if it's retained. */ - int rc = janus_mqttevh_client_publish_message(ctx, topicbuf, 1, ctx->publish.disconnect_status); + int rc = janus_mqttevh_client_publish_message_wrap(context, topicbuf, 1, ctx->publish.disconnect_status); if(rc != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_WARN, "Can't publish to MQTT topic: %s, return code: %d\n", topicbuf, rc); } MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; - options.onSuccess = janus_mqttevh_client_disconnect_success; - options.onFailure = janus_mqttevh_client_disconnect_failure; options.context = ctx; options.timeout = ctx->disconnect.timeout; + +#ifdef MQTTVERSION_5 + if(ctx->connect.mqtt_version == MQTTVERSION_5) { + options.onSuccess5 = janus_mqttevh_client_disconnect_success5; + options.onFailure5 = janus_mqttevh_client_disconnect_failure5; + } else { + options.onSuccess = janus_mqttevh_client_disconnect_success; + options.onFailure = janus_mqttevh_client_disconnect_failure; + } +#else + options.onSuccess = janus_mqttevh_client_disconnect_success; + options.onFailure = janus_mqttevh_client_disconnect_failure; +#endif + return MQTTAsync_disconnect(ctx->client, &options); } /* Callback for succesful MQTT disconnect */ static void janus_mqttevh_client_disconnect_success(void *context, MQTTAsync_successData *response) { + janus_mqttevh_client_disconnect_success_impl(context); +} +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_disconnect_success5(void *context, MQTTAsync_successData5 *response) { + janus_mqttevh_client_disconnect_success_impl(context); +} +#endif + +static void janus_mqttevh_client_disconnect_success_impl(void *context) { /* Notify handlers about this transport being gone */ janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; @@ -425,11 +489,20 @@ static void janus_mqttevh_client_disconnect_success(void *context, MQTTAsync_suc /* Callback for MQTT disconnect failure */ void janus_mqttevh_client_disconnect_failure(void *context, MQTTAsync_failureData *response) { - int rc = response ? response->code : 0; - janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; + int rc = janus_mqttevh_client_get_response_code(response); + janus_mqttevh_client_disconnect_failure_impl(context, rc); +} - JANUS_LOG(LOG_ERR, "Can't disconnect from MQTT EVH broker %s, return code: %d\n", ctx->connect.url, rc); +#ifdef MQTTVERSION_5 +void janus_mqttevh_client_disconnect_failure5(void *context, MQTTAsync_failureData5 *response) { + int rc = janus_mqttevh_client_get_response_code5(response); + janus_mqttevh_client_disconnect_failure_impl(context, rc); +} +#endif +void janus_mqttevh_client_disconnect_failure_impl(void *context, int rc) { + janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; + JANUS_LOG(LOG_ERR, "Can't disconnect from MQTT EVH broker %s, return code: %d\n", ctx->connect.url, rc); janus_mqttevh_client_destroy_context(&ctx); } @@ -437,14 +510,10 @@ void janus_mqttevh_client_disconnect_failure(void *context, MQTTAsync_failureDat /* Publish mqtt message using paho * Payload is a string. JSON objects should be stringified before calling this function. */ -static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload) -{ +static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload) { int rc; - MQTTAsync_responseOptions options; - memset(&options, 0, sizeof(MQTTAsync_responseOptions)); MQTTAsync_message msg = MQTTAsync_message_initializer; - msg.payload = payload; msg.payloadlen = strlen(payload); msg.qos = ctx->publish.qos; @@ -455,9 +524,37 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons payload = (char *)NULL; */ + MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; + options.context = ctx; + options.onSuccess = janus_mqttevh_client_publish_message_success; + options.onFailure = janus_mqttevh_client_publish_message_failure; + + rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); + if(rc == MQTTASYNC_SUCCESS) { + JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + } else { + JANUS_LOG(LOG_WARN, "FAILURE: MQTT EVH message propably not sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); + } + + return rc; +} + +#ifdef MQTTVERSION_5 +static int janus_mqttevh_client_publish_message5(janus_mqttevh_context *ctx, const char *topic, int retain, char *payload, MQTTProperties *properties) { + int rc; + + MQTTAsync_message msg = MQTTAsync_message_initializer; + msg.payload = payload; + msg.payloadlen = strlen(payload); + msg.qos = ctx->publish.qos; + msg.retained = retain; + msg.properties = MQTTProperties_copy(properties); + + MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = ctx; - options.onSuccess = janus_mqttevh_client_publish_janus_success; - options.onFailure = janus_mqttevh_client_publish_janus_failure; + options.onSuccess5 = janus_mqttevh_client_publish_message_success5; + options.onFailure5 = janus_mqttevh_client_publish_message_failure5; + rc = MQTTAsync_sendMessage(ctx->client, topic, &msg, &options); if(rc == MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_HUGE, "MQTT EVH message sent to topic %s on %s. Result %d\n", topic, ctx->connect.url, rc); @@ -467,9 +564,20 @@ static int janus_mqttevh_client_publish_message(janus_mqttevh_context *ctx, cons return rc; } +#endif /* Callback for successful MQTT publish */ -static void janus_mqttevh_client_publish_janus_success(void *context, MQTTAsync_successData *response) { +static void janus_mqttevh_client_publish_message_success(void *context, MQTTAsync_successData *response) { + janus_mqttevh_client_publish_message_success_impl(context); +} + +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_publish_message_success5(void *context, MQTTAsync_successData5 *response) { + janus_mqttevh_client_publish_message_success_impl(context); +} +#endif + +static void janus_mqttevh_client_publish_message_success_impl(void *context) { janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; JANUS_LOG(LOG_HUGE, "MQTT EVH client has successfully published to MQTT base topic: %s\n", ctx->publish.topic); } @@ -477,13 +585,23 @@ static void janus_mqttevh_client_publish_janus_success(void *context, MQTTAsync_ /* Callback for MQTT publish failure * Should we bring message into queue? Right now, we just drop it. */ -static void janus_mqttevh_client_publish_janus_failure(void *context, MQTTAsync_failureData *response) { +static void janus_mqttevh_client_publish_message_failure(void *context, MQTTAsync_failureData *response) { + int rc = janus_mqttevh_client_get_response_code(response); + janus_mqttevh_client_publish_message_failure_impl(context, rc); +} + +#ifdef MQTTVERSION_5 +static void janus_mqttevh_client_publish_message_failure5(void *context, MQTTAsync_failureData5 *response) { + int rc = janus_mqttevh_client_get_response_code5(response); + janus_mqttevh_client_publish_message_failure_impl(context, rc); +} +#endif + +static void janus_mqttevh_client_publish_message_failure_impl(void *context, int rc) { janus_mqttevh_context *ctx = (janus_mqttevh_context *)context; - int rc = response ? response->code : 0; JANUS_LOG(LOG_ERR, "MQTT EVH client has failed publishing to MQTT topic: %s, return code: %d\n", ctx->publish.topic, rc); } - /* Destroy Janus MQTT event handler session context */ static void janus_mqttevh_client_destroy_context(janus_mqttevh_context **ptr) { JANUS_LOG(LOG_INFO, "About to destroy MQTT EVH context...\n"); @@ -572,21 +690,21 @@ static int janus_mqttevh_init(const char *config_path) { /* Set default values */ /* Strings are set to default values later */ - ctx->addplugin = DEFAULT_ADDPLUGIN; - ctx->addevent = DEFAULT_ADDEVENT; - ctx->publish.qos = DEFAULT_QOS; - ctx->publish.retain = DEFAULT_RETAIN; + ctx->addplugin = JANUS_MQTTEVH_DEFAULT_ADDPLUGIN; + ctx->addevent = JANUS_MQTTEVH_DEFAULT_ADDEVENT; + ctx->publish.qos = JANUS_MQTTEVH_DEFAULT_QOS; + ctx->publish.retain = JANUS_MQTTEVH_DEFAULT_RETAIN; ctx->connect.username = NULL; ctx->connect.password = NULL; - ctx->disconnect.timeout = DEFAULT_TIMEOUT; + ctx->disconnect.timeout = JANUS_MQTTEVH_DEFAULT_TIMEOUT; ctx->will.enabled = FALSE; - ctx->will.qos = DEFAULT_WILL_QOS; - ctx->will.retain = DEFAULT_WILL_RETAIN; + ctx->will.qos = JANUS_MQTTEVH_DEFAULT_WILL_QOS; + ctx->will.retain = JANUS_MQTTEVH_DEFAULT_WILL_RETAIN; - ctx->tls.enable = DEFAULT_TLS_ENABLE; - ctx->tls.verify_peer = DEFAULT_TLS_VERIFY_PEER; - ctx->tls.verify_host = DEFAULT_TLS_VERIFY_HOST; + ctx->tls.enable = JANUS_MQTTEVH_DEFAULT_TLS_ENABLE; + ctx->tls.verify_peer = JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_PEER; + ctx->tls.verify_host = JANUS_MQTTEVH_DEFAULT_TLS_VERIFY_HOST; /* Setup the event handler, if required */ janus_config_item *item = janus_config_get(config, config_general, janus_config_type_item, "enabled"); @@ -599,7 +717,26 @@ static int janus_mqttevh_init(const char *config_path) { /* MQTT URL */ url_item = janus_config_get(config, config_general, janus_config_type_item, "url"); - ctx->connect.url= g_strdup((url_item && url_item->value) ? url_item->value : DEFAULT_MQTTURL); + ctx->connect.url= g_strdup((url_item && url_item->value) ? url_item->value : JANUS_MQTTEVH_DEFAULT_MQTTURL); + + janus_config_item *mqtt_version = janus_config_get(config, config_general, janus_config_type_item, "mqtt_version"); + const char *mqtt_version_str = (mqtt_version && mqtt_version->value) ? mqtt_version->value : JANUS_MQTTEVH_VERSION_DEFAULT; + + if(strcmp(mqtt_version_str, JANUS_MQTTEVH_VERSION_3_1) == 0) { + ctx->connect.mqtt_version = MQTTVERSION_3_1; + } else if(strcmp(mqtt_version_str, JANUS_MQTTEVH_VERSION_3_1_1) == 0) { + ctx->connect.mqtt_version = MQTTVERSION_3_1_1; + } else if(strcmp(mqtt_version_str, JANUS_MQTTEVH_VERSION_5) == 0) { +#ifdef MQTTVERSION_5 + ctx->connect.mqtt_version = MQTTVERSION_5; +#else + JANUS_LOG(LOG_FATAL, "Using MQTT v5 requires compilation with Paho >= 1.3.0\n"); + goto error; +#endif + } else { + JANUS_LOG(LOG_FATAL, "Unknown MQTT version\n"); + goto error; + } janus_config_item *client_id_item = janus_config_get(config, config_general, janus_config_type_item, "client_id"); @@ -633,7 +770,7 @@ static int janus_mqttevh_init(const char *config_path) { json_format = JSON_COMPACT | JSON_PRESERVE_ORDER; } else { JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", json_item->value); - json_format = DEFAULT_JSON_FORMAT; + json_format = JANUS_MQTTEVH_DEFAULT_JSON_FORMAT; } } @@ -645,32 +782,32 @@ static int janus_mqttevh_init(const char *config_path) { /* Connect configuration */ keep_alive_interval_item = janus_config_get(config, config_general, janus_config_type_item, "keep_alive_interval"); ctx->connect.keep_alive_interval = (keep_alive_interval_item && keep_alive_interval_item->value) ? - atoi(keep_alive_interval_item->value) : DEFAULT_KEEPALIVE; + atoi(keep_alive_interval_item->value) : JANUS_MQTTEVH_DEFAULT_KEEPALIVE; if(ctx->connect.keep_alive_interval < 0) { JANUS_LOG(LOG_ERR, "Invalid keep-alive value: %s (falling back to default)\n", keep_alive_interval_item->value); - ctx->connect.keep_alive_interval = DEFAULT_KEEPALIVE; + ctx->connect.keep_alive_interval = JANUS_MQTTEVH_DEFAULT_KEEPALIVE; } cleansession_item = janus_config_get(config, config_general, janus_config_type_item, "cleansession"); ctx->connect.cleansession = (cleansession_item && cleansession_item->value) ? - atoi(cleansession_item->value) : DEFAULT_CLEANSESSION; + atoi(cleansession_item->value) : JANUS_MQTTEVH_DEFAULT_CLEANSESSION; if(ctx->connect.cleansession < 0) { JANUS_LOG(LOG_ERR, "Invalid clean-session value: %s (falling back to default)\n", cleansession_item->value); - ctx->connect.cleansession = DEFAULT_CLEANSESSION; + ctx->connect.cleansession = JANUS_MQTTEVH_DEFAULT_CLEANSESSION; } /* Disconnect configuration */ disconnect_timeout_item = janus_config_get(config, config_general, janus_config_type_item, "disconnect_timeout"); ctx->disconnect.timeout = (disconnect_timeout_item && disconnect_timeout_item->value) ? - atoi(disconnect_timeout_item->value) : DEFAULT_DISCONNECT_TIMEOUT; + atoi(disconnect_timeout_item->value) : JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT; if(ctx->disconnect.timeout < 0) { JANUS_LOG(LOG_ERR, "Invalid disconnect-timeout value: %s (falling back to default)\n", disconnect_timeout_item->value); - ctx->disconnect.timeout = DEFAULT_DISCONNECT_TIMEOUT; + ctx->disconnect.timeout = JANUS_MQTTEVH_DEFAULT_DISCONNECT_TIMEOUT; } topic_item = janus_config_get(config, config_general, janus_config_type_item, "topic"); if(!topic_item || !topic_item->value) { - ctx->publish.topic = g_strdup(DEFAULT_BASETOPIC); + ctx->publish.topic = g_strdup(JANUS_MQTTEVH_DEFAULT_BASETOPIC); } else { ctx->publish.topic = g_strdup(topic_item->value); } @@ -698,14 +835,14 @@ static int janus_mqttevh_init(const char *config_path) { if(connect_status_item && connect_status_item->value) { ctx->publish.connect_status = g_strdup(connect_status_item->value); } else { - ctx->publish.connect_status = g_strdup(DEFAULT_CONNECT_STATUS); + ctx->publish.connect_status = g_strdup(JANUS_MQTTEVH_DEFAULT_CONNECT_STATUS); } disconnect_status_item = janus_config_get(config, config_general, janus_config_type_item, "disconnect_status"); if(disconnect_status_item && disconnect_status_item->value) { ctx->publish.disconnect_status = g_strdup(disconnect_status_item->value); } else { - ctx->publish.disconnect_status = g_strdup(DEFAULT_DISCONNECT_STATUS); + ctx->publish.disconnect_status = g_strdup(JANUS_MQTTEVH_DEFAULT_DISCONNECT_STATUS); } /* LWT config */ @@ -782,24 +919,62 @@ static int janus_mqttevh_init(const char *config_path) { } } +#ifdef MQTTVERSION_5 + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + /* MQTT 5 specific configuration */ + janus_config_array *add_user_properties_array = janus_config_get(config, config_general, janus_config_type_array, "add_user_properties"); + if(add_user_properties_array) { + GList *add_user_properties_array_items = janus_config_get_arrays(config, add_user_properties_array); + if(add_user_properties_array_items != NULL) { + int add_user_properties_array_len = g_list_length(add_user_properties_array_items); + if(add_user_properties_array_len > 0) { + ctx->publish.add_user_properties = g_array_sized_new(FALSE, FALSE, sizeof(MQTTProperty), add_user_properties_array_len); + + janus_mqttevh_set_add_user_property_user_data user_data = { + ctx->publish.add_user_properties, + config + }; + + g_list_foreach( + add_user_properties_array_items, + (GFunc)janus_mqttevh_set_add_user_property, + (gpointer)&user_data + ); + } + } + } + } +#endif + if(!janus_mqtt_evh_enabled) { JANUS_LOG(LOG_WARN, "MQTT event handler support disabled, giving up\n"); goto error; } /* Create a MQTT client */ - res = MQTTAsync_create( + MQTTAsync_createOptions create_options = MQTTAsync_createOptions_initializer; + +#ifdef MQTTVERSION_5 + if (ctx->connect.mqtt_version == MQTTVERSION_5) { + create_options.MQTTVersion = MQTTVERSION_5; + } +#endif + + res = MQTTAsync_createWithOptions( &ctx->client, ctx->connect.url, ctx->connect.client_id, MQTTCLIENT_PERSISTENCE_NONE, - NULL); + NULL, + &create_options + ); if(res != MQTTASYNC_SUCCESS) { JANUS_LOG(LOG_FATAL, "Can't setup library for connection to MQTT broker %s: error %d creating client...\n", ctx->connect.url, res); goto error; } + /* Set callbacks. We should not really subscribe to anything but nevertheless */ res = MQTTAsync_setCallbacks(ctx->client, ctx, @@ -812,6 +987,7 @@ static int janus_mqttevh_init(const char *config_path) { ctx->connect.url, res); goto error; } + JANUS_LOG(LOG_INFO, "Event handler: About to connect to MQTT broker %s: ...\n", ctx->connect.url); @@ -1005,3 +1181,52 @@ static void *janus_mqttevh_handler(void *data) { JANUS_LOG(LOG_VERB, "Leaving MQTTEventHandler handler thread\n"); return NULL; } + +int janus_mqttevh_client_get_response_code(MQTTAsync_failureData *response) { + return response ? response->code : 0; +} + +#ifdef MQTTVERSION_5 +int janus_mqttevh_client_get_response_code5(MQTTAsync_failureData5 *response) { + return response ? response->code : 0; +} + +void janus_mqttevh_add_properties(GArray *user_properties, MQTTProperties *properties) { + if(user_properties == NULL || user_properties->len == 0) { + return; + } + + for(uint i = 0; i < user_properties->len; i++) { + MQTTProperty *property = &g_array_index(user_properties, MQTTProperty, i); + int rc = MQTTProperties_add(properties, property); + if(rc != 0) { + JANUS_LOG(LOG_ERR, "Failed to user properties to MQTT response\n"); + } + } +} + +void janus_mqttevh_set_add_user_property(gpointer item_ptr, gpointer user_data_ptr) { + janus_config_item *item = (janus_config_item*)item_ptr; + if(item->value != NULL) { + return; + } + + janus_mqttevh_set_add_user_property_user_data *user_data = (janus_mqttevh_set_add_user_property_user_data*)user_data_ptr; + GList *key_value = janus_config_get_items(user_data->config, item); + if(key_value == NULL || g_list_length(key_value) != 2) { + JANUS_LOG(LOG_ERR, "Expected a key-value pair\n"); + return; + } + + janus_config_item *key_item = (janus_config_item*)g_list_first(key_value)->data; + janus_config_item *value_item = (janus_config_item*)g_list_last(key_value)->data; + + MQTTProperty property; + property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; + property.value.data.data = g_strdup(key_item->value); + property.value.data.len = strlen(key_item->value); + property.value.value.data = g_strdup(value_item->value); + property.value.value.len = strlen(value_item->value); + g_array_append_val(user_data->acc, property); +} +#endif