diff --git a/README.md b/README.md index bc558dd..845aa13 100644 --- a/README.md +++ b/README.md @@ -166,7 +166,7 @@ the strongest signal. Far layer protocols are used by *bridges* as concentrators. They collect all data published by clients and provide data from other sources. -There's just one far layer protocol: MQTT. +There are two far layer protocols: MQTT and local broker. More will come later. #### MQTT far layer @@ -190,6 +190,20 @@ ESP-NOW[^espnow] it's lowercase MAC address without separators) Topics for *subscribing* are not prepended or modified in any way. +#### Local broker far layer + +Basically local MQTT-like broker. + +Functions just like MQTT, supports wildcards, but is hosted on the *bridge* +itself. +Handy if you only need communication between nearby IoT devices over single +*bridge* (no IP connectivity required). + +Default topic structure for *publishing* is `{PREFIX}/{ADDR}/{TOPIC}` +(see [MQTT topic structure](#mqtt-far-layer) above). + +Topics for *subscribing* are not prepended or modified in any way. + ### Message types Message types are generic for current and any future protocols. diff --git a/include/common/spsp_common.hpp b/include/common/spsp_common.hpp index 61fd8d2..7454b10 100644 --- a/include/common/spsp_common.hpp +++ b/include/common/spsp_common.hpp @@ -14,8 +14,9 @@ #include "spsp_espnow.hpp" #include "spsp_exception.hpp" #include "spsp_layers.hpp" -#include "spsp_local_addr_mac.hpp" #include "spsp_local_addr.hpp" +#include "spsp_local_addr_mac.hpp" +#include "spsp_local_broker.hpp" #include "spsp_mqtt.hpp" #include "spsp_node.hpp" #include "spsp_timer.hpp" diff --git a/include/common/spsp_local_broker.hpp b/include/common/spsp_local_broker.hpp new file mode 100644 index 0000000..de7e2af --- /dev/null +++ b/include/common/spsp_local_broker.hpp @@ -0,0 +1,80 @@ +/** + * @file spsp_local_broker.hpp + * @author Dávid Benko (davidbenko@davidbenko.dev) + * @brief Local broker far layer for SPSP + * + * @copyright Copyright (c) 2023 + * + */ + +#pragma once + +#include +#include + +#include "spsp_layers.hpp" +#include "spsp_node.hpp" +#include "spsp_wildcard_trie.hpp" + +namespace SPSP::FarLayers::LocalBroker +{ + /** + * @brief Local broker far layer + * + * Acts as local MQTT server. + */ + class LocalBroker : public IFarLayer + { + std::mutex m_mutex; + SPSP::WildcardTrie m_subs; //!< Subscriptions + std::string m_topicPrefix; //!< Topic prefix for publishing + + public: + /** + * @brief Constructs a new local broker object + * + * @param topicPrefix Topic prefix for publishing + */ + LocalBroker(const std::string topicPrefix = "spsp"); + + /** + * @brief Destroys local broker layer object + * + */ + ~LocalBroker(); + + /** + * @brief Publishes message coming from node + * + * @param src Source address + * @param topic Topic + * @param payload Payload (data) + * @return true Delivery successful + * @return false Delivery failed + */ + bool publish(const std::string& src, const std::string& topic, + const std::string& payload); + + /** + * @brief Subscribes to given topic + * + * Should be used by `INode` only! + * + * @param topic Topic + * @return true Subscribe successful + * @return false Subscribe failed + */ + bool subscribe(const std::string& topic); + + /** + * @brief Unsubscribes from given topic + * + * Should be used by `INode` only! + * + * @param topic Topic + * @return true Unsubscribe successful + * @return false Unsubscribe failed + */ + bool unsubscribe(const std::string& topic); + }; +} // namespace SPSP::FarLayers::LocalBroker diff --git a/src/common/spsp_local_broker.cpp b/src/common/spsp_local_broker.cpp new file mode 100644 index 0000000..7d1d4ae --- /dev/null +++ b/src/common/spsp_local_broker.cpp @@ -0,0 +1,75 @@ +/** + * @file spsp_local_broker.cpp + * @author Dávid Benko (davidbenko@davidbenko.dev) + * @brief Local broker far layer for SPSP + * + * @copyright Copyright (c) 2023 + * + */ + +#include + +#include "spsp_local_broker.hpp" +#include "spsp_logger.hpp" + +// Log tag +static const char* SPSP_LOG_TAG = "SPSP/Far/LocalBroker"; + +namespace SPSP::FarLayers::LocalBroker +{ + LocalBroker::LocalBroker(const std::string topicPrefix) + : m_topicPrefix{topicPrefix} + { + SPSP_LOGI("Initialized"); + } + + LocalBroker::~LocalBroker() + { + SPSP_LOGI("Deinitialized"); + } + + bool LocalBroker::publish(const std::string& src, const std::string& topic, + const std::string& payload) + { + SPSP_LOGD("Publish: payload '%s' to topic '%s' from %s", + payload.c_str(), topic.c_str(), src.c_str()); + + std::string topicExtended = m_topicPrefix + "/" + src + "/" + topic; + + // Check if node is subscribed to this topic + bool subscribed; + { + const std::scoped_lock lock(m_mutex); + subscribed = !m_subs.find(topicExtended).empty(); + } + + if (subscribed && this->nodeConnected()) { + // Send data back as received (parameters must be copied) + std::thread t([this, topicExtended, payload]() { + this->getNode()->receiveFar(topicExtended, payload); + }); + t.detach(); + } + + return true; + } + + bool LocalBroker::subscribe(const std::string& topic) + { + const std::scoped_lock lock(m_mutex); + + SPSP_LOGD("Subscribe to topic '%s'", topic.c_str()); + + m_subs.insert(topic, true); + return true; + } + + bool LocalBroker::unsubscribe(const std::string& topic) + { + const std::scoped_lock lock(m_mutex); + + SPSP_LOGD("Unsubscribe from topic '%s'", topic.c_str()); + + return m_subs.remove(topic); + } +} diff --git a/test/tests/local_broker.cpp b/test/tests/local_broker.cpp new file mode 100644 index 0000000..d1bf2d5 --- /dev/null +++ b/test/tests/local_broker.cpp @@ -0,0 +1,131 @@ +#include + +#include +#include +#include + +#include "spsp_local_broker.hpp" +#include "spsp_node.hpp" + +using namespace SPSP; +using namespace std::chrono_literals; + +const std::string SRC = "549b3d00da16ca2d"; +const std::string TOPIC_PREFIX = "spsp"; +const std::string TOPIC = "abc"; +const std::string TOPIC_FOR_WILDCARD = "111/abc"; +const std::string PAYLOAD = "123"; +const std::string TOPIC_PUBLISH = TOPIC_PREFIX + "/" + SRC + "/" + TOPIC; +const std::string TOPIC_PUBLISH_WILDCARD = TOPIC_PREFIX + "/" + SRC + "/" + TOPIC_FOR_WILDCARD; +const std::string TOPIC_WILDCARD = TOPIC_PREFIX + "/" + SRC + "/+/" + TOPIC; + +TEST_CASE("Check return values", "[LocalBroker]") { + FarLayers::LocalBroker::LocalBroker lb{TOPIC_PREFIX}; + + SECTION("Publish") { + REQUIRE(lb.publish(SRC, TOPIC, PAYLOAD)); + } + + SECTION("Subscribe") { + REQUIRE(lb.subscribe(TOPIC)); + } + + SECTION("Unsubscribe without preceding subscribe") { + REQUIRE(!lb.unsubscribe(TOPIC)); + } + + SECTION("Unsubscribe without preceding subscribe - wildcard") { + REQUIRE(!lb.unsubscribe(TOPIC + "/#")); + } + + SECTION("Unsubscribe with preceding subscribe") { + REQUIRE(lb.subscribe(TOPIC)); + REQUIRE(lb.unsubscribe(TOPIC)); + } + + SECTION("Unsubscribe with preceding subscribe - wildcard") { + REQUIRE(lb.subscribe(TOPIC + "/#")); + REQUIRE(lb.unsubscribe(TOPIC + "/#")); + } +} + +TEST_CASE("Receive subscription data", "[LocalBroker]") { + class Node : IFarNode + { + public: + bool called = false; + std::string receivedTopic; + std::string receivedPayload; + + using IFarNode::IFarNode; + + virtual bool publish(const std::string& topic, + const std::string& payload) + { + return true; + } + + virtual bool subscribe(const std::string& topic, SubscribeCb cb) + { + return true; + } + + virtual bool unsubscribe(const std::string& topic) { + return true; + } + + virtual bool receiveFar(const std::string& topic, + const std::string& payload) + { + called = true; + receivedTopic = topic; + receivedPayload = payload; + return true; + } + + virtual void resubscribeAll() {} + }; + + FarLayers::LocalBroker::LocalBroker lb{TOPIC_PREFIX}; + Node node{&lb}; + + SECTION("Publish, don't receive") { + CHECK(lb.publish(SRC, TOPIC, PAYLOAD)); + std::this_thread::sleep_for(10ms); + CHECK(!node.called); + } + + SECTION("Subscribe, publish, receive") { + CHECK(lb.subscribe(TOPIC_PUBLISH)); + CHECK(lb.publish(SRC, TOPIC, PAYLOAD)); + std::this_thread::sleep_for(10ms); + CHECK(node.called); + CHECK(node.receivedTopic == TOPIC_PUBLISH); + CHECK(node.receivedPayload == PAYLOAD); + } + + SECTION("Subscribe, publish, receive - wildcard") { + CHECK(lb.subscribe(TOPIC_WILDCARD)); + CHECK(lb.publish(SRC, TOPIC_FOR_WILDCARD, PAYLOAD)); + std::this_thread::sleep_for(10ms); + CHECK(node.called); + CHECK(node.receivedTopic == TOPIC_PUBLISH_WILDCARD); + CHECK(node.receivedPayload == PAYLOAD); + } + + SECTION("Subscribe, unsubscribe, publish, don't receive") { + CHECK(lb.subscribe(TOPIC_PUBLISH)); + CHECK(lb.unsubscribe(TOPIC_PUBLISH)); + CHECK(lb.publish(SRC, TOPIC, PAYLOAD)); + std::this_thread::sleep_for(10ms); + CHECK(!node.called); + } + + SECTION("Subscribe, unsubscribe, publish, don't receive - wildcard") { + CHECK(lb.subscribe(TOPIC_WILDCARD)); + CHECK(lb.unsubscribe(TOPIC_WILDCARD)); + CHECK(lb.publish(SRC, TOPIC_FOR_WILDCARD, PAYLOAD)); + std::this_thread::sleep_for(10ms); + CHECK(!node.called); + } +}