Skip to content

Commit

Permalink
Fix: switch context when processing DPL MQTT requests (#572)
Browse files Browse the repository at this point in the history
MQTT message callbacks are executed in the MQTT thread context. when
processing topics that control the DPL, we must avoid executing methods
that are not thread-safe. this change binds the methods to be called to
the respective parameters and executes them in the TaskScheduler
context, such that they no longer need to be thread-safe.
  • Loading branch information
schlimmchen authored Dec 31, 2023
1 parent ef1aec3 commit 8f5c487
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
8 changes: 8 additions & 0 deletions include/MqttHandlePowerLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "Configuration.h"
#include <espMqttClient.h>
#include <TaskSchedulerDeclarations.h>
#include <mutex>
#include <deque>
#include <functional>

class MqttHandlePowerLimiterClass {
public:
Expand All @@ -18,6 +21,11 @@ class MqttHandlePowerLimiterClass {
uint32_t _lastPublishStats;
uint32_t _lastPublish;

// MQTT callbacks to process updates on subscribed topics are executed in
// the MQTT thread's context. we use this queue to switch processing the
// user requests into the main loop's context (TaskScheduler context).
mutable std::mutex _mqttMutex;
std::deque<std::function<void()>> _mqttCallbacks;
};

extern MqttHandlePowerLimiterClass MqttHandlePowerLimiter;
32 changes: 20 additions & 12 deletions src/MqttHandlePowerLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ void MqttHandlePowerLimiterClass::init(Scheduler& scheduler)

void MqttHandlePowerLimiterClass::loop()
{
if (!MqttSettings.getConnected() ) {
std::unique_lock<std::mutex> mqttLock(_mqttMutex);

const CONFIG_T& config = Configuration.get();

if (!config.PowerLimiter.Enabled) {
_mqttCallbacks.clear();
return;
}

const CONFIG_T& config = Configuration.get();
for (auto& callback : _mqttCallbacks) { callback(); }
_mqttCallbacks.clear();

mqttLock.unlock();

if (!MqttSettings.getConnected() ) { return; }

if ((millis() - _lastPublish) > (config.Mqtt.PublishInterval * 1000) ) {
auto val = static_cast<unsigned>(PowerLimiter.getMode());
Expand All @@ -53,13 +63,6 @@ void MqttHandlePowerLimiterClass::loop()
void MqttHandlePowerLimiterClass::onCmdMode(const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{
const CONFIG_T& config = Configuration.get();

// ignore messages if PowerLimiter is disabled
if (!config.PowerLimiter.Enabled) {
return;
}

std::string strValue(reinterpret_cast<const char*>(payload), len);
int intValue = -1;
try {
Expand All @@ -71,19 +74,24 @@ void MqttHandlePowerLimiterClass::onCmdMode(const espMqttClientTypes::MessagePro
return;
}

std::lock_guard<std::mutex> mqttLock(_mqttMutex);

using Mode = PowerLimiterClass::Mode;
switch (static_cast<Mode>(intValue)) {
case Mode::UnconditionalFullSolarPassthrough:
MessageOutput.println("Power limiter unconditional full solar PT");
PowerLimiter.setMode(Mode::UnconditionalFullSolarPassthrough);
_mqttCallbacks.push_back(std::bind(&PowerLimiterClass::setMode,
&PowerLimiter, Mode::UnconditionalFullSolarPassthrough));
break;
case Mode::Disabled:
MessageOutput.println("Power limiter disabled (override)");
PowerLimiter.setMode(Mode::Disabled);
_mqttCallbacks.push_back(std::bind(&PowerLimiterClass::setMode,
&PowerLimiter, Mode::Disabled));
break;
case Mode::Normal:
MessageOutput.println("Power limiter normal operation");
PowerLimiter.setMode(Mode::Normal);
_mqttCallbacks.push_back(std::bind(&PowerLimiterClass::setMode,
&PowerLimiter, Mode::Normal));
break;
default:
MessageOutput.printf("PowerLimiter - unknown mode %d\r\n", intValue);
Expand Down

0 comments on commit 8f5c487

Please sign in to comment.