Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+ Abstract MqttClient Publish to PubSub #236

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions lib/framework/MqttPubSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,34 @@ class MqttPub : virtual public MqttConnector<T> {
StatefulService<T>* statefulService,
AsyncMqttClient* mqttClient,
const String& pubTopic = "",
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
MqttConnector<T>(statefulService, mqttClient, bufferSize), _stateReader(stateReader), _pubTopic(pubTopic) {
size_t bufferSize = DEFAULT_BUFFER_SIZE,
const bool retain = false) :
MqttConnector<T>(statefulService, mqttClient, bufferSize),
_stateReader(stateReader),
_pubTopic(pubTopic),
_retain(retain) {
MqttConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false);
}

void setRetain(bool retain) {
_retain = retain;
}

void setPubTopic(const String& pubTopic) {
_pubTopic = pubTopic;
publish();
}

void publish(const char* topic, const char* payload){
MqttConnector<T>::_mqttClient->publish(topic, 0, _retain, payload);
}
void publish(const char* topic, const char* payload, bool retain){
MqttConnector<T>::_mqttClient->publish(topic, 0, retain, payload);
}
void publish(const char* topic, const char* payload, bool retain, uint8_t qos){
MqttConnector<T>::_mqttClient->publish(topic, qos, retain, payload);
}

protected:
virtual void onConnect() {
publish();
Expand All @@ -51,6 +69,7 @@ class MqttPub : virtual public MqttConnector<T> {
private:
JsonStateReader<T> _stateReader;
String _pubTopic;
bool _retain;

void publish() {
if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected()) {
Expand All @@ -64,7 +83,7 @@ class MqttPub : virtual public MqttConnector<T> {
serializeJson(json, payload);

// publish the payload
MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, false, payload.c_str());
MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, _retain, payload.c_str());
}
}
};
Expand Down Expand Up @@ -145,17 +164,44 @@ class MqttPubSub : public MqttPub<T>, public MqttSub<T> {
AsyncMqttClient* mqttClient,
const String& pubTopic = "",
const String& subTopic = "",
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
size_t bufferSize = DEFAULT_BUFFER_SIZE,
const bool retain = false) :
MqttConnector<T>(statefulService, mqttClient, bufferSize),
MqttPub<T>(stateReader, statefulService, mqttClient, pubTopic, bufferSize),
MqttPub<T>(stateReader, statefulService, mqttClient, pubTopic, bufferSize, retain),
MqttSub<T>(stateUpdater, statefulService, mqttClient, subTopic, bufferSize) {
}
MqttPubSub(JsonStateReader<T> stateReader,
JsonStateUpdater<T> stateUpdater,
StatefulService<T>* statefulService,
AsyncMqttClient* mqttClient,
const bool retain = false) :
MqttConnector<T>(statefulService, mqttClient, DEFAULT_BUFFER_SIZE),
MqttPub<T>(stateReader, statefulService, mqttClient, "", DEFAULT_BUFFER_SIZE, retain),
MqttSub<T>(stateUpdater, statefulService, mqttClient, "", DEFAULT_BUFFER_SIZE) {
}

public:
void configureTopics(const String& pubTopic, const String& subTopic) {
MqttSub<T>::setSubTopic(subTopic);
MqttPub<T>::setPubTopic(pubTopic);
}
void configureTopics(const String& pubTopic, const String& subTopic, const bool retain) {
MqttSub<T>::setSubTopic(subTopic);
MqttPub<T>::setPubTopic(pubTopic);
MqttPub<T>::setRetain(retain);
}
void configureRetain(bool retain) {
MqttPub<T>::setRetain(retain);
}
void publish(const char* topic, const char* payload) {
MqttPub<T>::publish(topic, payload);
}
void publish(const char* topic, const char* payload, bool retain) {
MqttPub<T>::publish(topic, payload, retain);
}
void publish(const char* topic, const char* payload, bool retain, uint8_t qos) {
MqttPub<T>::publish(topic, payload, retain, qos);
}

protected:
void onConnect() {
Expand Down
5 changes: 3 additions & 2 deletions src/LightStateService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ LightStateService::LightStateService(AsyncWebServer* server,
LIGHT_SETTINGS_ENDPOINT_PATH,
securityManager,
AuthenticationPredicates::IS_AUTHENTICATED),
_mqttPubSub(LightState::haRead, LightState::haUpdate, this, mqttClient),
_mqttPubSub(LightState::haRead, LightState::haUpdate, this, mqttClient, true),
_webSocket(LightState::read,
LightState::update,
this,
Expand Down Expand Up @@ -67,7 +67,8 @@ void LightStateService::registerConfig() {

String payload;
serializeJson(doc, payload);
_mqttClient->publish(configTopic.c_str(), 0, false, payload.c_str());

_mqttPubSub.publish(configTopic.c_str(), payload.c_str());

_mqttPubSub.configureTopics(pubTopic, subTopic);
}