From e92a23e7f9cb4c5fc4e4777bddb0f136ddf1c168 Mon Sep 17 00:00:00 2001 From: Kirill Isakov Date: Tue, 10 Nov 2020 19:36:57 +0600 Subject: [PATCH] Add support for handling MQTT commands --- main/app_main.c | 124 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 102 insertions(+), 22 deletions(-) diff --git a/main/app_main.c b/main/app_main.c index e5ba656..c1917b0 100644 --- a/main/app_main.c +++ b/main/app_main.c @@ -13,12 +13,11 @@ #include "freertos/queue.h" #include "freertos/task.h" -#include "assert.h" -#include "time.h" - #include "lwip/err.h" #include "lwip/sys.h" +#include "time.h" + #include "ds18b20.h" #include "owb.h" #include "owb_rmt.h" @@ -69,8 +68,15 @@ typedef struct { psk_hint_key_t psk_hint; char msg[256]; EventGroupHandle_t event; + const char *cmd_topic; + const char *resp_topic; } mqtt_client; +typedef struct { + const char *const cmd; + bool (*handler)(mqtt_client *const client, const esp_mqtt_event_handle_t evt); +} mqtt_cmd_handler; + // tag for application logs static const char *const kTag = "airmon"; @@ -78,13 +84,17 @@ static const char *const kTag = "airmon"; static time_t boot_timestamp = 0; enum { - TIME_VALID = BIT0, - NET_CONNECTED = BIT1, - NET_DISCONNECTED = BIT2, + STATE_TIME_VALID = BIT0, + STATE_NET_CONNECTED = BIT1, + STATE_NET_DISCONNECTED = BIT2, }; static EventGroupHandle_t state_evt; +static void wait_state(const EventBits_t bits) { + xEventGroupWaitBits(state_evt, bits, false, true, portMAX_DELAY); +} + enum { MQTT_READY = BIT0, MQTT_ACK = BIT1, @@ -102,12 +112,55 @@ static time_t get_timestamp() { } } +static bool mqtt_is_broadcast_cmd(const esp_mqtt_event_handle_t evt) { + return strncmp(evt->topic, "cmd/*", evt->topic_len) == 0; +} + +static bool mqtt_handle_ping(mqtt_client *const client, + const esp_mqtt_event_handle_t evt) { + const bool bcast = mqtt_is_broadcast_cmd(evt); + const char *const topic = bcast ? "response/*" : client->resp_topic; + const char *const resp = bcast ? CONFIG_DEV_NAME ": pong" : "pong"; + return esp_mqtt_client_publish(client->handle, topic, resp, 0, 1, 0) != -1; +} + +_Noreturn static bool mqtt_handle_restart(mqtt_client *const client, + const esp_mqtt_event_handle_t evt) { + esp_mqtt_client_publish(client->handle, evt->topic, "restarting", 0, 1, 0); + esp_restart(); +} + +static const mqtt_cmd_handler mqtt_handlers[] = { + {"ping", mqtt_handle_ping}, + {"restart", mqtt_handle_restart}, +}; + +static void mqtt_subscribe_to_commands(const mqtt_client *const client) { + esp_mqtt_client_subscribe(client->handle, "cmd/*", 2); + esp_mqtt_client_subscribe(client->handle, client->cmd_topic, 2); +} + +static bool mqtt_handle_message(mqtt_client *const client, + const esp_mqtt_event_handle_t evt) { + const size_t num_handlers = sizeof(mqtt_handlers) / sizeof(*mqtt_handlers); + + for (size_t i = 0; i < num_handlers; ++i) { + const mqtt_cmd_handler *const handler = &mqtt_handlers[i]; + if (strncmp(handler->cmd, evt->data, evt->data_len) == 0) { + return handler->handler(client, evt); + } + } + + return false; +} + static esp_err_t mqtt_event_handler(const esp_mqtt_event_handle_t evt) { mqtt_client *const client = evt->user_context; switch (evt->event_id) { case MQTT_EVENT_CONNECTED: xEventGroupSetBits(client->event, MQTT_READY); + mqtt_subscribe_to_commands(client); ESP_LOGI(kTag, "mqtt connected"); break; @@ -121,6 +174,17 @@ static esp_err_t mqtt_event_handler(const esp_mqtt_event_handle_t evt) { ESP_LOGD(kTag, "mqtt broker received message %d", evt->msg_id); break; + case MQTT_EVENT_DATA: + ESP_LOGI(kTag, "mqtt message received (id %d)", evt->msg_id); + if (!mqtt_handle_message(client, evt)) { + ESP_LOGI(kTag, "could not handle message (id %d)", evt->msg_id); + } + break; + + case MQTT_EVENT_SUBSCRIBED: + ESP_LOGI(kTag, "mqtt subscription successful (%d)", evt->msg_id); + break; + case MQTT_EVENT_ERROR: ESP_LOGE(kTag, "mqtt error %d", evt->error_handle->error_type); break; @@ -138,7 +202,7 @@ static size_t hex_str_to_bytes(const char *const str, uint8_t *const buf, const size_t hex_len = strlen(str); const size_t psk_len = hex_len / 2; - assert(hex_len % 2 == 0); + configASSERT(hex_len % 2 == 0); for (size_t i = 0; i < psk_len; ++i) { char b[3] = {0, 0, 0}; @@ -173,6 +237,9 @@ static mqtt_client *mqtt_client_create(const char *const broker_uri, client->psk_hint.key = psk; *((size_t *)&(client->psk_hint.key_size)) = psk_len; + client->cmd_topic = "cmd/" CONFIG_DEV_NAME; + client->resp_topic = "response/" CONFIG_DEV_NAME; + const esp_mqtt_client_config_t conf = {.uri = CONFIG_MQTT_BROKER_URI, .event_handle = mqtt_event_handler, .user_context = client, @@ -180,11 +247,24 @@ static mqtt_client *mqtt_client_create(const char *const broker_uri, .keepalive = 30}; client->handle = esp_mqtt_client_init(&conf); - ESP_ERROR_CHECK(esp_mqtt_client_start(client->handle)); + if (client->handle == NULL) { + ESP_LOGE(kTag, "esp_mqtt_client_init failed"); + goto cleanup; + } + + if (esp_mqtt_client_start(client->handle) != ESP_OK) { + ESP_LOGE(kTag, "esp_mqtt_client_start failed"); + goto cleanup; + } client->event = xEventGroupCreate(); return client; + +cleanup: + free(psk); + free(client); + return NULL; } static void mqtt_client_free(mqtt_client *const client) { @@ -202,13 +282,13 @@ static void mqtt_client_free(mqtt_client *const client) { static void time_sync_notification() { if (boot_timestamp == 0) { boot_timestamp = time(NULL); - xEventGroupSetBits(state_evt, TIME_VALID); + xEventGroupSetBits(state_evt, STATE_TIME_VALID); } ESP_LOGI(kTag, "sntp time update finished"); } static void task_sntp_update() { - xEventGroupWaitBits(state_evt, NET_CONNECTED, false, false, portMAX_DELAY); + wait_state(STATE_NET_CONNECTED); sntp_setoperatingmode(SNTP_OPMODE_POLL); sntp_setservername(0, "pool.ntp.org"); @@ -326,12 +406,12 @@ _Noreturn void task_collect_temps(const sensor_config *const config) { static void handle_ip_event(void *const arg, const esp_event_base_t event_base, const int32_t event_id, void *const event_data) { - assert(event_base == IP_EVENT); + configASSERT(event_base == IP_EVENT); switch (event_id) { case IP_EVENT_STA_GOT_IP: { - xEventGroupClearBits(state_evt, NET_DISCONNECTED); - xEventGroupSetBits(state_evt, NET_CONNECTED); + xEventGroupClearBits(state_evt, STATE_NET_DISCONNECTED); + xEventGroupSetBits(state_evt, STATE_NET_CONNECTED); const ip_event_got_ip_t *const evt = event_data; ESP_LOGI(kTag, "got ip %d.%d.%d.%d", IP2STR(&evt->ip_info.ip)); @@ -339,8 +419,8 @@ static void handle_ip_event(void *const arg, const esp_event_base_t event_base, } case IP_EVENT_STA_LOST_IP: - xEventGroupClearBits(state_evt, NET_CONNECTED); - xEventGroupSetBits(state_evt, NET_DISCONNECTED); + xEventGroupClearBits(state_evt, STATE_NET_CONNECTED); + xEventGroupSetBits(state_evt, STATE_NET_DISCONNECTED); ESP_LOGI(kTag, "lost ip"); break; @@ -353,7 +433,7 @@ static void handle_ip_event(void *const arg, const esp_event_base_t event_base, static void handle_wifi_event(void *const arg, const esp_event_base_t event_base, const int32_t event_id, void *const event_data) { - assert(event_base == WIFI_EVENT); + configASSERT(event_base == WIFI_EVENT); switch (event_id) { case WIFI_EVENT_STA_START: @@ -422,7 +502,7 @@ static QueueHandle_t make_measurement_queue() { sizeof(measurement_queue.buffer) / item_size, item_size, (uint8_t *)measurement_queue.buffer, &measurement_queue.queue); - assert(queue); + configASSERT(queue); return queue; } @@ -513,7 +593,7 @@ static void mqtt_client_wait_ready(mqtt_client *const client) { xEventGroupWaitBits(client->event, MQTT_READY, false, false, portMAX_DELAY); } -static void restart_periph(const periph_module_t module) { +static void restart_peripheral(const periph_module_t module) { periph_module_disable(module); periph_module_enable(module); } @@ -522,8 +602,8 @@ _Noreturn void app_main() { app_init_log(); // reset peripherals in case of prior crash - restart_periph(PERIPH_RMT_MODULE); - restart_periph(PERIPH_UART0_MODULE); + restart_peripheral(PERIPH_RMT_MODULE); + restart_peripheral(PERIPH_UART0_MODULE); state_evt = xEventGroupCreate(); @@ -536,7 +616,7 @@ _Noreturn void app_main() { start_temp_tasks(ms_queue); start_pm_task(ms_queue); - xEventGroupWaitBits(state_evt, NET_CONNECTED, false, false, portMAX_DELAY); + wait_state(STATE_NET_CONNECTED); mqtt_client *const client = mqtt_client_create(CONFIG_MQTT_BROKER_URI, CONFIG_MQTT_PSK); @@ -546,7 +626,7 @@ _Noreturn void app_main() { esp_restart(); } - xEventGroupWaitBits(state_evt, TIME_VALID, false, false, portMAX_DELAY); + wait_state(STATE_TIME_VALID); for (measurement ms;;) { mqtt_client_wait_ready(client);