Skip to content

Commit

Permalink
Move MQTT command handling out of client
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Nov 17, 2020
1 parent 9158340 commit a475fd4
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 138 deletions.
2 changes: 1 addition & 1 deletion firmware/main/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
idf_component_register(
SRCS app_main.cc time.cc mqtt.cc settings.cc state.cc pms.cc measurement.cc dallas.cc net.cc
SRCS app_main.cc time.cc mqtt.cc settings.cc state.cc pms.cc measurement.cc dallas.cc net.cc commands.cc
INCLUDE_DIRS "."
EMBED_TXTFILES "ca.pem"
)
Expand Down
7 changes: 4 additions & 3 deletions firmware/main/app_main.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include "ca.hh"
#include "commands.hh"
#include "common.hh"
#include "dallas.hh"
#include "device_config.hh"
Expand All @@ -16,9 +18,6 @@

State *appState = nullptr;

// CA certificate for MQTT TLS and OTA updates
extern const char caPemStart[] asm("_binary_ca_pem_start");

static void initLog() {
#ifdef DEBUG
esp_log_level_set("*", ESP_LOG_DEBUG);
Expand Down Expand Up @@ -75,6 +74,8 @@ extern "C" [[noreturn]] void app_main() {
mqtt::Client client{appSettings.mqtt.broker, caPemStart,
appSettings.mqtt.username, appSettings.mqtt.password};

initCommandHandler(client);

appState->wait(AppState::STATE_TIME_VALID);

for (char buf[256];;) {
Expand Down
4 changes: 4 additions & 0 deletions firmware/main/ca.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#pragma once

// CA certificate for MQTT TLS and OTA updates
extern const char caPemStart[] asm("_binary_ca_pem_start");
119 changes: 119 additions & 0 deletions firmware/main/commands.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#include "ca.hh"
#include "common.hh"
#include "mqtt.hh"
#include "settings.hh"
#include "utils.hh"
#include <algorithm>
#include <esp_http_client.h>
#include <esp_https_ota.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>

static bool handlePing(mqtt::Client &client, const mqtt::Message &msg) {
return client.send(msg.respTopic, appSettings.devName);
}

[[noreturn]] static bool handleRestart(mqtt::Client &client,
const mqtt::Message &msg) {
client.send(msg.respTopic, "restarting");
esp_restart();
}

static bool handleWriteSetting(mqtt::Client &client, const mqtt::Message &msg,
const std::vector<std::string> &args) {
if (args.size() != 3) {
client.send(msg.respTopic, "usage: setting/set name_spaces value_also");
return false;
}
const esp_err_t err = appSettings.write(args[1].c_str(), args[2].c_str());
if (err == ESP_OK) {
client.send(msg.respTopic, "setting set");
} else {
client.send(msg.respTopic, "setting write failed");
}
return true;
}

static bool handleOta(mqtt::Client &client, const mqtt::Message &msg,
const std::vector<std::string> &args) {
if (args.size() != 2) {
client.send(msg.respTopic, "usage: ota https://server/path.bin");
return false;
}

const esp_http_client_config_t config{
.url = args[1].c_str(),
.cert_pem = caPemStart,
};

const esp_err_t ret = esp_https_ota(&config);

if (ret == ESP_OK) {
ESP_LOGI(logTag, "OTA update finished");
client.send(msg.respTopic, "OTA success");
esp_restart();
}

ESP_LOGE(logTag, "could not perform OTA update: 0x%x", ret);
client.send(msg.respTopic, "OTA failed");

return false;
}

static bool handleUnknown(mqtt::Client &client, const mqtt::Message &msg) {
client.send(msg.respTopic, "unknown command");
return false;
}

static bool handleMessage(mqtt::Client &client, const mqtt::Message &msg) {
std::vector<std::string> tokens;

const auto end = msg.data.cend();
for (auto begin = msg.data.cbegin(); begin != end;) {
const auto nextSpace = std::find_if(begin, end, isspace);
if (nextSpace != begin) {
tokens.push_back(std::string{begin, nextSpace});
if (nextSpace == end) {
break;
}
}
begin = std::find_if(nextSpace, end, [](char c) { return !isspace(c); });
}

if (tokens.empty()) {
client.send(msg.topic, "no command specified");
return false;
}

const std::string &command = tokens.front();

if (command == "ota") {
return handleOta(client, msg, tokens);
}
if (command == "ping") {
return handlePing(client, msg);
}
if (command == "restart") {
return handleRestart(client, msg);
}
if (command == "setting/set") {
return handleWriteSetting(client, msg, tokens);
}
return handleUnknown(client, msg);
}

[[noreturn]] void taskCommandHandler(void *const arg) {
mqtt::Client &client = *reinterpret_cast<mqtt::Client *>(arg);

while (true) {
const mqtt::Message msg = client.receive();
if (!handleMessage(client, msg)) {
ESP_LOGE(logTag, "could not handle message");
}
}
}

void initCommandHandler(mqtt::Client &client) {
xTaskCreate(taskCommandHandler, "mqtt_cmd_handler", KiB(2), &client, 4,
nullptr);
}
5 changes: 5 additions & 0 deletions firmware/main/commands.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#pragma once

#include "mqtt.hh"

void initCommandHandler(mqtt::Client &client);
136 changes: 18 additions & 118 deletions firmware/main/mqtt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,8 @@
#include "common.hh"
#include "settings.hh"
#include "utils.hh"
#include <esp_http_client.h>
#include <esp_https_ota.h>
#include <esp_log.h>

static bool isBroadcastCmd(const esp_mqtt_event_handle_t evt) {
return strncmp(evt->topic, "cmd/*", evt->topic_len) == 0;
}

namespace mqtt {

esp_err_t Client::handleEvent(const esp_mqtt_event_handle_t evt) {
Expand All @@ -18,7 +12,8 @@ esp_err_t Client::handleEvent(const esp_mqtt_event_handle_t evt) {
switch (evt->event_id) {
case MQTT_EVENT_CONNECTED:
client.setState(MqttState::Ready);
client.subscribeToCommands();
client.subscribe("cmd/*", 2);
client.subscribe(client.cmdTopic.c_str(), 2);
ESP_LOGI(logTag, "mqtt connected");
break;

Expand All @@ -31,12 +26,16 @@ esp_err_t Client::handleEvent(const esp_mqtt_event_handle_t evt) {
ESP_LOGD(logTag, "mqtt broker received message %d", evt->msg_id);
break;

case MQTT_EVENT_DATA:
case MQTT_EVENT_DATA: {
ESP_LOGI(logTag, "mqtt message received (id %d)", evt->msg_id);
if (!client.handleMessage(evt)) {
ESP_LOGI(logTag, "could not handle message (id %d)", evt->msg_id);
}
auto *const msg = new Message{
.id = evt->msg_id,
.topic = std::string{evt->topic, static_cast<unsigned>(evt->topic_len)},
.data = std::string{evt->data, static_cast<unsigned>(evt->data_len)}};
msg->respTopic = msg->isBroadcast() ? "response/*" : client.respTopic;
client.msgQueue.put(msg);
break;
}

case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(logTag, "mqtt subscription successful (%d)", evt->msg_id);
Expand Down Expand Up @@ -86,9 +85,9 @@ void Client::waitReady() const {
false, portMAX_DELAY);
}

bool Client::send(const char *const topic, const char *const data) {
bool Client::send(const std::string &topic, const char *const data) {
for (int result = -1; result < 0;) {
result = esp_mqtt_client_publish(handle, topic, data, 0, 1, 1);
result = esp_mqtt_client_publish(handle, topic.c_str(), data, 0, 1, 1);
if (result < 0) {
ESP_LOGI(logTag, "mqtt publish failed, retrying");
vTaskDelay(secToTicks(5));
Expand All @@ -98,111 +97,6 @@ bool Client::send(const char *const topic, const char *const data) {
return true;
}

bool Client::handlePing(const esp_mqtt_event_handle_t evt,
const char *const respTopic) {
const char *data;
char buf[64];

if (isBroadcastCmd(evt)) {
snprintf(buf, sizeof(buf), "pong: %s", appSettings.devName);
data = buf;
} else {
data = "pong";
}

return send(respTopic, data);
}

bool Client::handleUnknown(const char *const respTopic) {
send(respTopic, "unknown command");
return false;
}

[[noreturn]] bool Client::handleRestart(const char *const respTopic) {
send(respTopic, "restarting");
esp_restart();
}

bool Client::handleWriteSetting(const char *const respTopic,
const std::vector<std::string> &args) {
if (args.size() != 3) {
send(respTopic, "usage: setting/set name_without_spaces value_too");
return false;
}
const esp_err_t err = appSettings.write(args[1].c_str(), args[2].c_str());
if (err == ESP_OK) {
send(respTopic, "setting set");
} else {
send(respTopic, "setting write failed");
}
return true;
}

bool Client::handleOta(const char *const respTopic,
const std::vector<std::string> &args) {
if (args.size() != 2) {
send(respTopic, "usage: ota https://server/path.bin");
return false;
}

const esp_http_client_config_t config{
.url = args[1].c_str(),
.cert_pem = cert,
};

const esp_err_t ret = esp_https_ota(&config);

if (ret == ESP_OK) {
ESP_LOGI(logTag, "OTA update finished");
send(respTopic, "OTA success");
esp_restart();
}

ESP_LOGE(logTag, "could not perform OTA update: 0x%x", ret);
send(respTopic, "OTA failed");

return false;
}

bool Client::handleMessage(const esp_mqtt_event_handle_t evt) {
const char *const topic =
isBroadcastCmd(evt) ? "response/*" : respTopic.c_str();

char cmd[evt->data_len + 1];
memcpy(cmd, evt->data, evt->data_len);
cmd[sizeof(cmd) - 1] = '\0';

std::vector<std::string> tokens;

for (const char *tok = strtok(cmd, " \t"); tok != nullptr;
tok = strtok(nullptr, " \t")) {
tokens.push_back(tok);
}

if (tokens.size() < 1) {
send(topic, "no command specified");
return false;
}

const std::string &command = tokens[0];

if (command == "ota") {
return handleOta(topic, tokens);
}
if (command == "ping") {
return handlePing(evt, topic);
}
if (command == "restart") {
return handleRestart(topic);
}
return handleUnknown(topic);
}

void Client::subscribeToCommands() {
esp_mqtt_client_subscribe(handle, "cmd/*", 2);
esp_mqtt_client_subscribe(handle, cmdTopic.c_str(), 2);
}

void Client::setState(const MqttState bits) {
xEventGroupSetBits(event, static_cast<EventBits_t>(bits));
}
Expand All @@ -211,4 +105,10 @@ void Client::clearState(const MqttState bits) {
xEventGroupClearBits(event, static_cast<EventBits_t>(bits));
}

bool Client::subscribe(const char *topic, int qos) {
return esp_mqtt_client_subscribe(handle, topic, qos) != ESP_OK;
}

Message Client::receive() { return *msgQueue.take(); }

} // namespace mqtt
Loading

0 comments on commit a475fd4

Please sign in to comment.