Skip to content

Commit

Permalink
Optimize MQTT subscription handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tbnobody committed Sep 23, 2024
1 parent d6a5fef commit 93b6e5a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 54 deletions.
24 changes: 23 additions & 1 deletion include/MqttHandleInverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <Hoymiles.h>
#include <TaskSchedulerDeclarations.h>
#include <espMqttClient.h>
#include <frozen/map.h>
#include <frozen/string.h>

class MqttHandleInverterClass {
public:
Expand All @@ -19,7 +21,6 @@ class MqttHandleInverterClass {
private:
void loop();
void publishField(std::shared_ptr<InverterAbstract> inv, const ChannelType_t type, const ChannelNum_t channel, const FieldId_t fieldId);
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total);

Task _loopTask;

Expand All @@ -41,6 +42,27 @@ class MqttHandleInverterClass {
FLD_IRR,
FLD_Q
};

enum class Topic : unsigned {
LimitPersistentRelative,
LimitPersistentAbsolute,
LimitNonPersistentRelative,
LimitNonPersistentAbsolute,
Power,
Restart,
};

static constexpr frozen::string _cmdtopic = "+/cmd/";
static constexpr frozen::map<frozen::string, Topic, 6> _subscriptions = {
{ "limit_persistent_relative", Topic::LimitPersistentRelative },
{ "limit_persistent_absolute", Topic::LimitPersistentAbsolute },
{ "limit_nonpersistent_relative", Topic::LimitNonPersistentRelative },
{ "limit_nonpersistent_absolute", Topic::LimitNonPersistentAbsolute },
{ "power", Topic::Power },
{ "restart", Topic::Restart },
};

void onMqttMessage(Topic t, const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total);
};

extern MqttHandleInverterClass MqttHandleInverter;
92 changes: 39 additions & 53 deletions src/MqttHandleInverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@
#include "MqttSettings.h"
#include <ctime>

#define TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE "limit_persistent_relative"
#define TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE "limit_persistent_absolute"
#define TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE "limit_nonpersistent_relative"
#define TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE "limit_nonpersistent_absolute"
#define TOPIC_SUB_POWER "power"
#define TOPIC_SUB_RESTART "restart"

#define PUBLISH_MAX_INTERVAL 60000

MqttHandleInverterClass MqttHandleInverter;
Expand Down Expand Up @@ -154,23 +147,19 @@ String MqttHandleInverterClass::getTopic(std::shared_ptr<InverterAbstract> inv,
return inv->serialString() + "/" + chanNum + "/" + chanName;
}

void MqttHandleInverterClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total)
void MqttHandleInverterClass::onMqttMessage(Topic t, const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, const size_t len, const size_t index, const size_t total)
{
const CONFIG_T& config = Configuration.get();

char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*

char* serial_str;
char* subtopic;
char* setting;
char* rest = &token_topic[strlen(config.Mqtt.Topic)];

serial_str = strtok_r(rest, "/", &rest);
subtopic = strtok_r(rest, "/", &rest);
setting = strtok_r(rest, "/", &rest);

if (serial_str == NULL || subtopic == NULL || setting == NULL) {
if (serial_str == NULL) {
return;
}

Expand All @@ -183,56 +172,56 @@ void MqttHandleInverterClass::onMqttMessage(const espMqttClientTypes::MessagePro
return;
}

// check if subtopic is unequal cmd
if (strcmp(subtopic, "cmd")) {
std::string strValue(reinterpret_cast<const char*>(payload), len);
float payload_val = -1;
try {
payload_val = std::stof(strValue);
} catch (std::invalid_argument const& e) {
MessageOutput.printf("MQTT handler: cannot parse payload of topic '%s' as float: %s\r\n",
topic, strValue.c_str());
return;
}

char* strlimit = new char[len + 1];
memcpy(strlimit, payload, len);
strlimit[len] = '\0';
const float payload_val = strtof(strlimit, NULL);
delete[] strlimit;

if (payload_val < 0) {
MessageOutput.printf("MQTT payload < 0 received --> ignoring\r\n");
return;
}

if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE)) {
switch (t) {
case Topic::LimitPersistentRelative:
// Set inverter limit relative persistent
MessageOutput.printf("Limit Persistent: %.1f %%\r\n", payload_val);
inv->sendActivePowerControlRequest(payload_val, PowerLimitControlType::RelativPersistent);
break;

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE)) {
case Topic::LimitPersistentAbsolute:
// Set inverter limit absolute persistent
MessageOutput.printf("Limit Persistent: %.1f W\r\n", payload_val);
inv->sendActivePowerControlRequest(payload_val, PowerLimitControlType::AbsolutPersistent);
break;

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE)) {
case Topic::LimitNonPersistentRelative:
// Set inverter limit relative non persistent
MessageOutput.printf("Limit Non-Persistent: %.1f %%\r\n", payload_val);
if (!properties.retain) {
inv->sendActivePowerControlRequest(payload_val, PowerLimitControlType::RelativNonPersistent);
} else {
MessageOutput.println("Ignored because retained");
}
break;

} else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE)) {
case Topic::LimitNonPersistentAbsolute:
// Set inverter limit absolute non persistent
MessageOutput.printf("Limit Non-Persistent: %.1f W\r\n", payload_val);
if (!properties.retain) {
inv->sendActivePowerControlRequest(payload_val, PowerLimitControlType::AbsolutNonPersistent);
} else {
MessageOutput.println("Ignored because retained");
}
break;

} else if (!strcmp(setting, TOPIC_SUB_POWER)) {
case Topic::Power:
// Turn inverter on or off
MessageOutput.printf("Set inverter power to: %d\r\n", static_cast<int32_t>(payload_val));
inv->sendPowerControlRequest(static_cast<int32_t>(payload_val) > 0);
break;

} else if (!strcmp(setting, TOPIC_SUB_RESTART)) {
case Topic::Restart:
// Restart inverter
MessageOutput.printf("Restart inverter\r\n");
if (!properties.retain && payload_val == 1) {
Expand All @@ -245,29 +234,26 @@ void MqttHandleInverterClass::onMqttMessage(const espMqttClientTypes::MessagePro

void MqttHandleInverterClass::subscribeTopics()
{
using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
using std::placeholders::_4;
using std::placeholders::_5;
using std::placeholders::_6;

const String topic = MqttSettings.getPrefix();
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_POWER), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_RESTART), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
String const& prefix = MqttSettings.getPrefix();

auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
String fullTopic(prefix + _cmdtopic.data() + subTopic);
MqttSettings.subscribe(fullTopic.c_str(), 0,
std::bind(&MqttHandleInverterClass::onMqttMessage, this, t,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6));
};

for (auto const& s : _subscriptions) {
subscribe(s.first.data(), s.second);
}
}

void MqttHandleInverterClass::unsubscribeTopics()
{
const String topic = MqttSettings.getPrefix();
MqttSettings.unsubscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE));
MqttSettings.unsubscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE));
MqttSettings.unsubscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE));
MqttSettings.unsubscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE));
MqttSettings.unsubscribe(String(topic + "+/cmd/" + TOPIC_SUB_POWER));
MqttSettings.unsubscribe(String(topic + "+/cmd/" + TOPIC_SUB_RESTART));
String const& prefix = MqttSettings.getPrefix() + _cmdtopic.data();
for (auto const& s : _subscriptions) {
MqttSettings.unsubscribe(prefix + s.first.data());
}
}

0 comments on commit 93b6e5a

Please sign in to comment.