Skip to content

Commit

Permalink
Add support for handling MQTT commands
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Nov 10, 2020
1 parent 5ed3395 commit e92a23e
Showing 1 changed file with 102 additions and 22 deletions.
124 changes: 102 additions & 22 deletions main/app_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
#include "freertos/queue.h"
#include "freertos/task.h"

#include "assert.h"
#include "time.h"

#include "lwip/err.h"
#include "lwip/sys.h"

#include "time.h"

#include "ds18b20.h"
#include "owb.h"
#include "owb_rmt.h"
Expand Down Expand Up @@ -69,22 +68,33 @@ typedef struct {
psk_hint_key_t psk_hint;
char msg[256];
EventGroupHandle_t event;
const char *cmd_topic;
const char *resp_topic;
} mqtt_client;

typedef struct {
const char *const cmd;
bool (*handler)(mqtt_client *const client, const esp_mqtt_event_handle_t evt);
} mqtt_cmd_handler;

// tag for application logs
static const char *const kTag = "airmon";

// UNIX time when the system was booted
static time_t boot_timestamp = 0;

enum {
TIME_VALID = BIT0,
NET_CONNECTED = BIT1,
NET_DISCONNECTED = BIT2,
STATE_TIME_VALID = BIT0,
STATE_NET_CONNECTED = BIT1,
STATE_NET_DISCONNECTED = BIT2,
};

static EventGroupHandle_t state_evt;

static void wait_state(const EventBits_t bits) {
xEventGroupWaitBits(state_evt, bits, false, true, portMAX_DELAY);
}

enum {
MQTT_READY = BIT0,
MQTT_ACK = BIT1,
Expand All @@ -102,12 +112,55 @@ static time_t get_timestamp() {
}
}

static bool mqtt_is_broadcast_cmd(const esp_mqtt_event_handle_t evt) {
return strncmp(evt->topic, "cmd/*", evt->topic_len) == 0;
}

static bool mqtt_handle_ping(mqtt_client *const client,
const esp_mqtt_event_handle_t evt) {
const bool bcast = mqtt_is_broadcast_cmd(evt);
const char *const topic = bcast ? "response/*" : client->resp_topic;
const char *const resp = bcast ? CONFIG_DEV_NAME ": pong" : "pong";
return esp_mqtt_client_publish(client->handle, topic, resp, 0, 1, 0) != -1;
}

_Noreturn static bool mqtt_handle_restart(mqtt_client *const client,
const esp_mqtt_event_handle_t evt) {
esp_mqtt_client_publish(client->handle, evt->topic, "restarting", 0, 1, 0);
esp_restart();
}

static const mqtt_cmd_handler mqtt_handlers[] = {
{"ping", mqtt_handle_ping},
{"restart", mqtt_handle_restart},
};

static void mqtt_subscribe_to_commands(const mqtt_client *const client) {
esp_mqtt_client_subscribe(client->handle, "cmd/*", 2);
esp_mqtt_client_subscribe(client->handle, client->cmd_topic, 2);
}

static bool mqtt_handle_message(mqtt_client *const client,
const esp_mqtt_event_handle_t evt) {
const size_t num_handlers = sizeof(mqtt_handlers) / sizeof(*mqtt_handlers);

for (size_t i = 0; i < num_handlers; ++i) {
const mqtt_cmd_handler *const handler = &mqtt_handlers[i];
if (strncmp(handler->cmd, evt->data, evt->data_len) == 0) {
return handler->handler(client, evt);
}
}

return false;
}

static esp_err_t mqtt_event_handler(const esp_mqtt_event_handle_t evt) {
mqtt_client *const client = evt->user_context;

switch (evt->event_id) {
case MQTT_EVENT_CONNECTED:
xEventGroupSetBits(client->event, MQTT_READY);
mqtt_subscribe_to_commands(client);
ESP_LOGI(kTag, "mqtt connected");
break;

Expand All @@ -121,6 +174,17 @@ static esp_err_t mqtt_event_handler(const esp_mqtt_event_handle_t evt) {
ESP_LOGD(kTag, "mqtt broker received message %d", evt->msg_id);
break;

case MQTT_EVENT_DATA:
ESP_LOGI(kTag, "mqtt message received (id %d)", evt->msg_id);
if (!mqtt_handle_message(client, evt)) {
ESP_LOGI(kTag, "could not handle message (id %d)", evt->msg_id);
}
break;

case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(kTag, "mqtt subscription successful (%d)", evt->msg_id);
break;

case MQTT_EVENT_ERROR:
ESP_LOGE(kTag, "mqtt error %d", evt->error_handle->error_type);
break;
Expand All @@ -138,7 +202,7 @@ static size_t hex_str_to_bytes(const char *const str, uint8_t *const buf,
const size_t hex_len = strlen(str);
const size_t psk_len = hex_len / 2;

assert(hex_len % 2 == 0);
configASSERT(hex_len % 2 == 0);

for (size_t i = 0; i < psk_len; ++i) {
char b[3] = {0, 0, 0};
Expand Down Expand Up @@ -173,18 +237,34 @@ static mqtt_client *mqtt_client_create(const char *const broker_uri,
client->psk_hint.key = psk;
*((size_t *)&(client->psk_hint.key_size)) = psk_len;

client->cmd_topic = "cmd/" CONFIG_DEV_NAME;
client->resp_topic = "response/" CONFIG_DEV_NAME;

const esp_mqtt_client_config_t conf = {.uri = CONFIG_MQTT_BROKER_URI,
.event_handle = mqtt_event_handler,
.user_context = client,
.psk_hint_key = &client->psk_hint,
.keepalive = 30};

client->handle = esp_mqtt_client_init(&conf);
ESP_ERROR_CHECK(esp_mqtt_client_start(client->handle));
if (client->handle == NULL) {
ESP_LOGE(kTag, "esp_mqtt_client_init failed");
goto cleanup;
}

if (esp_mqtt_client_start(client->handle) != ESP_OK) {
ESP_LOGE(kTag, "esp_mqtt_client_start failed");
goto cleanup;
}

client->event = xEventGroupCreate();

return client;

cleanup:
free(psk);
free(client);
return NULL;
}

static void mqtt_client_free(mqtt_client *const client) {
Expand All @@ -202,13 +282,13 @@ static void mqtt_client_free(mqtt_client *const client) {
static void time_sync_notification() {
if (boot_timestamp == 0) {
boot_timestamp = time(NULL);
xEventGroupSetBits(state_evt, TIME_VALID);
xEventGroupSetBits(state_evt, STATE_TIME_VALID);
}
ESP_LOGI(kTag, "sntp time update finished");
}

static void task_sntp_update() {
xEventGroupWaitBits(state_evt, NET_CONNECTED, false, false, portMAX_DELAY);
wait_state(STATE_NET_CONNECTED);

sntp_setoperatingmode(SNTP_OPMODE_POLL);
sntp_setservername(0, "pool.ntp.org");
Expand Down Expand Up @@ -326,21 +406,21 @@ _Noreturn void task_collect_temps(const sensor_config *const config) {

static void handle_ip_event(void *const arg, const esp_event_base_t event_base,
const int32_t event_id, void *const event_data) {
assert(event_base == IP_EVENT);
configASSERT(event_base == IP_EVENT);

switch (event_id) {
case IP_EVENT_STA_GOT_IP: {
xEventGroupClearBits(state_evt, NET_DISCONNECTED);
xEventGroupSetBits(state_evt, NET_CONNECTED);
xEventGroupClearBits(state_evt, STATE_NET_DISCONNECTED);
xEventGroupSetBits(state_evt, STATE_NET_CONNECTED);

const ip_event_got_ip_t *const evt = event_data;
ESP_LOGI(kTag, "got ip %d.%d.%d.%d", IP2STR(&evt->ip_info.ip));
break;
}

case IP_EVENT_STA_LOST_IP:
xEventGroupClearBits(state_evt, NET_CONNECTED);
xEventGroupSetBits(state_evt, NET_DISCONNECTED);
xEventGroupClearBits(state_evt, STATE_NET_CONNECTED);
xEventGroupSetBits(state_evt, STATE_NET_DISCONNECTED);
ESP_LOGI(kTag, "lost ip");
break;

Expand All @@ -353,7 +433,7 @@ static void handle_ip_event(void *const arg, const esp_event_base_t event_base,
static void handle_wifi_event(void *const arg,
const esp_event_base_t event_base,
const int32_t event_id, void *const event_data) {
assert(event_base == WIFI_EVENT);
configASSERT(event_base == WIFI_EVENT);

switch (event_id) {
case WIFI_EVENT_STA_START:
Expand Down Expand Up @@ -422,7 +502,7 @@ static QueueHandle_t make_measurement_queue() {
sizeof(measurement_queue.buffer) / item_size, item_size,
(uint8_t *)measurement_queue.buffer, &measurement_queue.queue);

assert(queue);
configASSERT(queue);

return queue;
}
Expand Down Expand Up @@ -513,7 +593,7 @@ static void mqtt_client_wait_ready(mqtt_client *const client) {
xEventGroupWaitBits(client->event, MQTT_READY, false, false, portMAX_DELAY);
}

static void restart_periph(const periph_module_t module) {
static void restart_peripheral(const periph_module_t module) {
periph_module_disable(module);
periph_module_enable(module);
}
Expand All @@ -522,8 +602,8 @@ _Noreturn void app_main() {
app_init_log();

// reset peripherals in case of prior crash
restart_periph(PERIPH_RMT_MODULE);
restart_periph(PERIPH_UART0_MODULE);
restart_peripheral(PERIPH_RMT_MODULE);
restart_peripheral(PERIPH_UART0_MODULE);

state_evt = xEventGroupCreate();

Expand All @@ -536,7 +616,7 @@ _Noreturn void app_main() {
start_temp_tasks(ms_queue);
start_pm_task(ms_queue);

xEventGroupWaitBits(state_evt, NET_CONNECTED, false, false, portMAX_DELAY);
wait_state(STATE_NET_CONNECTED);

mqtt_client *const client =
mqtt_client_create(CONFIG_MQTT_BROKER_URI, CONFIG_MQTT_PSK);
Expand All @@ -546,7 +626,7 @@ _Noreturn void app_main() {
esp_restart();
}

xEventGroupWaitBits(state_evt, TIME_VALID, false, false, portMAX_DELAY);
wait_state(STATE_TIME_VALID);

for (measurement ms;;) {
mqtt_client_wait_ready(client);
Expand Down

0 comments on commit e92a23e

Please sign in to comment.