Skip to content

Commit

Permalink
refactored MqttPubSub.h
Browse files Browse the repository at this point in the history
  • Loading branch information
theelims committed Feb 9, 2024
1 parent 6864b5d commit 3c7932c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 124 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.
### Changed

- more generic board definition in platformio.ini (#20)
- refactored MqttPubSub.h into a single class to improve readability

### Fixed

Expand Down
194 changes: 70 additions & 124 deletions lib/framework/MqttPubSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* https://github.com/theelims/ESP32-sveltekit
*
* Copyright (C) 2018 - 2023 rjwats
* Copyright (C) 2023 theelims
* Copyright (C) 2023 - 2024 theelims
*
* All Rights Reserved. This software may be modified and distributed under
* the terms of the LGPL v3 license. See the LICENSE file for details.
Expand All @@ -21,52 +21,61 @@
#define MQTT_ORIGIN_ID "mqtt"

template <class T>
class MqttConnector
class MqttPubSub
{
protected:
StatefulService<T> *_statefulService;
PsychicMqttClient *_mqttClient;
size_t _bufferSize;

MqttConnector(StatefulService<T> *statefulService, PsychicMqttClient *mqttClient, size_t bufferSize) : _statefulService(statefulService),
_mqttClient(mqttClient),
_bufferSize(bufferSize)
{
_mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this));
}

virtual void onConnect() = 0;

public:
inline PsychicMqttClient *getMqttClient() const
MqttPubSub(JsonStateReader<T> stateReader,
JsonStateUpdater<T> stateUpdater,
StatefulService<T> *statefulService,
PsychicMqttClient *mqttClient,
const String &pubTopic = "",
const String &subTopic = "",
bool retain = false,
size_t bufferSize = DEFAULT_BUFFER_SIZE) : _stateReader(stateReader),
_stateUpdater(stateUpdater),
_statefulService(statefulService),
_mqttClient(mqttClient),
_pubTopic(pubTopic),
_subTopic(subTopic),
_retain(retain),
_bufferSize(bufferSize)

{
return _mqttClient;
_statefulService->addUpdateHandler([&](const String &originId)
{ publish(); },
false);

_mqttClient->onConnect(std::bind(&MqttPubSub::onConnect, this));

_mqttClient->onMessage(std::bind(&MqttPubSub::onMqttMessage,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
}
};

template <class T>
class MqttPub : virtual public MqttConnector<T>
{
public:
MqttPub(JsonStateReader<T> stateReader,
StatefulService<T> *statefulService,
PsychicMqttClient *mqttClient,
const String &pubTopic = "",
bool retain = false,
size_t bufferSize = DEFAULT_BUFFER_SIZE) : MqttConnector<T>(statefulService, mqttClient, bufferSize),
_stateReader(stateReader),
_pubTopic(pubTopic),
_retain(retain)
void configureTopics(const String &pubTopic, const String &subTopic)
{
MqttConnector<T>::_statefulService->addUpdateHandler([&](const String &originId)
{ publish(); },
false);
setSubTopic(subTopic);
setPubTopic(pubTopic);
}

void setRetain(const bool retain)
void setSubTopic(const String &subTopic)
{
_retain = retain;
publish();
if (!_subTopic.equals(subTopic))
{
// unsubscribe from the existing topic if one was set
if (_subTopic.length() > 0)
{
_mqttClient->unsubscribe(_subTopic.c_str());
}
// set the new topic and re-configure the subscription
_subTopic = subTopic;
subscribe();
}
}

void setPubTopic(const String &pubTopic)
Expand All @@ -75,89 +84,44 @@ class MqttPub : virtual public MqttConnector<T>
publish();
}

protected:
virtual void onConnect()
void setRetain(const bool retain)
{
_retain = retain;
publish();
}

private:
JsonStateReader<T> _stateReader;
String _pubTopic;
bool _retain;

void publish()
{
if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected())
if (_pubTopic.length() > 0 && _mqttClient->connected())
{
// serialize to json doc
DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
DynamicJsonDocument json(_bufferSize);
JsonObject jsonObject = json.to<JsonObject>();
MqttConnector<T>::_statefulService->read(jsonObject, _stateReader);
_statefulService->read(jsonObject, _stateReader);

// serialize to string
String payload;
serializeJson(json, payload);

// publish the payload
MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, _retain, payload.c_str());
_mqttClient->publish(_pubTopic.c_str(), 0, _retain, payload.c_str());
}
}
};

template <class T>
class MqttSub : virtual public MqttConnector<T>
{
public:
MqttSub(JsonStateUpdater<T> stateUpdater,
StatefulService<T> *statefulService,
PsychicMqttClient *mqttClient,
const String &subTopic = "",
size_t bufferSize = DEFAULT_BUFFER_SIZE) : MqttConnector<T>(statefulService, mqttClient, bufferSize),
_stateUpdater(stateUpdater),
_subTopic(subTopic)
{
MqttConnector<T>::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
}

void setSubTopic(const String &subTopic)
PsychicMqttClient *getMqttClient()
{
if (!_subTopic.equals(subTopic))
{
// unsubscribe from the existing topic if one was set
if (_subTopic.length() > 0)
{
MqttConnector<T>::_mqttClient->unsubscribe(_subTopic.c_str());
}
// set the new topic and re-configure the subscription
_subTopic = subTopic;
subscribe();
}
return _mqttClient;
}

protected:
virtual void onConnect()
{
subscribe();
}

private:
StatefulService<T> *_statefulService;
PsychicMqttClient *_mqttClient;
int _bufferSize;
JsonStateUpdater<T> _stateUpdater;
JsonStateReader<T> _stateReader;
String _subTopic;

void subscribe()
{
if (_subTopic.length() > 0)
{
MqttConnector<T>::_mqttClient->subscribe(_subTopic.c_str(), 2);
}
}
String _pubTopic;
bool _retain;

void onMqttMessage(char *topic,
char *payload,
Expand All @@ -172,45 +136,27 @@ class MqttSub : virtual public MqttConnector<T>
}

// deserialize from string
DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
DynamicJsonDocument json(_bufferSize);
DeserializationError error = deserializeJson(json, payload);
if (!error && json.is<JsonObject>())
{
JsonObject jsonObject = json.as<JsonObject>();
MqttConnector<T>::_statefulService->update(jsonObject, _stateUpdater, MQTT_ORIGIN_ID);
_statefulService->update(jsonObject, _stateUpdater, MQTT_ORIGIN_ID);
}
}
};

template <class T>
class MqttPubSub : public MqttPub<T>, public MqttSub<T>
{
public:
MqttPubSub(JsonStateReader<T> stateReader,
JsonStateUpdater<T> stateUpdater,
StatefulService<T> *statefulService,
PsychicMqttClient *mqttClient,
const String &pubTopic = "",
const String &subTopic = "",
bool retain = false,
size_t bufferSize = DEFAULT_BUFFER_SIZE) : MqttConnector<T>(statefulService, mqttClient, bufferSize),
MqttPub<T>(stateReader, statefulService, mqttClient, pubTopic, retain, bufferSize),
MqttSub<T>(stateUpdater, statefulService, mqttClient, subTopic, bufferSize)
{
}

public:
void configureTopics(const String &pubTopic, const String &subTopic)
void onConnect()
{
MqttSub<T>::setSubTopic(subTopic);
MqttPub<T>::setPubTopic(pubTopic);
subscribe();
publish();
}

protected:
void onConnect()
void subscribe()
{
MqttSub<T>::onConnect();
MqttPub<T>::onConnect();
if (_subTopic.length() > 0)
{
_mqttClient->subscribe(_subTopic.c_str(), 2);
}
}
};

Expand Down

0 comments on commit 3c7932c

Please sign in to comment.