Skip to content

Commit

Permalink
LocalBroker: implement and add tests
Browse files Browse the repository at this point in the history
Local broker is new far layer for local-only communication.
  • Loading branch information
DavidB137 committed Oct 29, 2023
1 parent d508a07 commit af01ebf
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 2 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ the strongest signal.
Far layer protocols are used by *bridges* as concentrators. They collect all
data published by clients and provide data from other sources.

There's just one far layer protocol: MQTT.
There are two far layer protocols: MQTT and local broker.
More will come later.

#### MQTT far layer
Expand All @@ -190,6 +190,20 @@ ESP-NOW[^espnow] it's lowercase MAC address without separators)

Topics for *subscribing* are not prepended or modified in any way.

#### Local broker far layer

Basically local MQTT-like broker.

Functions just like MQTT, supports wildcards, but is hosted on the *bridge*
itself.
Handy if you only need communication between nearby IoT devices over single
*bridge* (no IP connectivity required).

Default topic structure for *publishing* is `{PREFIX}/{ADDR}/{TOPIC}`
(see [MQTT topic structure](#mqtt-far-layer) above).

Topics for *subscribing* are not prepended or modified in any way.

### Message types

Message types are generic for current and any future protocols.
Expand Down
3 changes: 2 additions & 1 deletion include/common/spsp_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
#include "spsp_espnow.hpp"
#include "spsp_exception.hpp"
#include "spsp_layers.hpp"
#include "spsp_local_addr_mac.hpp"
#include "spsp_local_addr.hpp"
#include "spsp_local_addr_mac.hpp"
#include "spsp_local_broker.hpp"
#include "spsp_mqtt.hpp"
#include "spsp_node.hpp"
#include "spsp_timer.hpp"
Expand Down
80 changes: 80 additions & 0 deletions include/common/spsp_local_broker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* @file spsp_local_broker.hpp
* @author Dávid Benko (davidbenko@davidbenko.dev)
* @brief Local broker far layer for SPSP
*
* @copyright Copyright (c) 2023
*
*/

#pragma once

#include <mutex>
#include <string>

#include "spsp_layers.hpp"
#include "spsp_node.hpp"
#include "spsp_wildcard_trie.hpp"

namespace SPSP::FarLayers::LocalBroker
{
/**
* @brief Local broker far layer
*
* Acts as local MQTT server.
*/
class LocalBroker : public IFarLayer
{
std::mutex m_mutex;
SPSP::WildcardTrie<bool> m_subs; //!< Subscriptions
std::string m_topicPrefix; //!< Topic prefix for publishing

public:
/**
* @brief Constructs a new local broker object
*
* @param topicPrefix Topic prefix for publishing
*/
LocalBroker(const std::string topicPrefix = "spsp");

/**
* @brief Destroys local broker layer object
*
*/
~LocalBroker();

/**
* @brief Publishes message coming from node
*
* @param src Source address
* @param topic Topic
* @param payload Payload (data)
* @return true Delivery successful
* @return false Delivery failed
*/
bool publish(const std::string& src, const std::string& topic,
const std::string& payload);

/**
* @brief Subscribes to given topic
*
* Should be used by `INode` only!
*
* @param topic Topic
* @return true Subscribe successful
* @return false Subscribe failed
*/
bool subscribe(const std::string& topic);

/**
* @brief Unsubscribes from given topic
*
* Should be used by `INode` only!
*
* @param topic Topic
* @return true Unsubscribe successful
* @return false Unsubscribe failed
*/
bool unsubscribe(const std::string& topic);
};
} // namespace SPSP::FarLayers::LocalBroker
75 changes: 75 additions & 0 deletions src/common/spsp_local_broker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* @file spsp_local_broker.cpp
* @author Dávid Benko (davidbenko@davidbenko.dev)
* @brief Local broker far layer for SPSP
*
* @copyright Copyright (c) 2023
*
*/

#include <thread>

#include "spsp_local_broker.hpp"
#include "spsp_logger.hpp"

// Log tag
static const char* SPSP_LOG_TAG = "SPSP/Far/LocalBroker";

namespace SPSP::FarLayers::LocalBroker
{
LocalBroker::LocalBroker(const std::string topicPrefix)
: m_topicPrefix{topicPrefix}
{
SPSP_LOGI("Initialized");
}

LocalBroker::~LocalBroker()
{
SPSP_LOGI("Deinitialized");
}

bool LocalBroker::publish(const std::string& src, const std::string& topic,
const std::string& payload)
{
SPSP_LOGD("Publish: payload '%s' to topic '%s' from %s",
payload.c_str(), topic.c_str(), src.c_str());

std::string topicExtended = m_topicPrefix + "/" + src + "/" + topic;

// Check if node is subscribed to this topic
bool subscribed;
{
const std::scoped_lock lock(m_mutex);
subscribed = !m_subs.find(topicExtended).empty();
}

if (subscribed && this->nodeConnected()) {
// Send data back as received (parameters must be copied)
std::thread t([this, topicExtended, payload]() {
this->getNode()->receiveFar(topicExtended, payload);
});
t.detach();
}

return true;
}

bool LocalBroker::subscribe(const std::string& topic)
{
const std::scoped_lock lock(m_mutex);

SPSP_LOGD("Subscribe to topic '%s'", topic.c_str());

m_subs.insert(topic, true);
return true;
}

bool LocalBroker::unsubscribe(const std::string& topic)
{
const std::scoped_lock lock(m_mutex);

SPSP_LOGD("Unsubscribe from topic '%s'", topic.c_str());

return m_subs.remove(topic);
}
}
131 changes: 131 additions & 0 deletions test/tests/local_broker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#include <catch2/catch_test_macros.hpp>

#include <chrono>
#include <string>
#include <thread>

#include "spsp_local_broker.hpp"
#include "spsp_node.hpp"

using namespace SPSP;
using namespace std::chrono_literals;

const std::string SRC = "549b3d00da16ca2d";
const std::string TOPIC_PREFIX = "spsp";
const std::string TOPIC = "abc";
const std::string TOPIC_FOR_WILDCARD = "111/abc";
const std::string PAYLOAD = "123";
const std::string TOPIC_PUBLISH = TOPIC_PREFIX + "/" + SRC + "/" + TOPIC;
const std::string TOPIC_PUBLISH_WILDCARD = TOPIC_PREFIX + "/" + SRC + "/" + TOPIC_FOR_WILDCARD;
const std::string TOPIC_WILDCARD = TOPIC_PREFIX + "/" + SRC + "/+/" + TOPIC;

TEST_CASE("Check return values", "[LocalBroker]") {
FarLayers::LocalBroker::LocalBroker lb{TOPIC_PREFIX};

SECTION("Publish") {
REQUIRE(lb.publish(SRC, TOPIC, PAYLOAD));
}

SECTION("Subscribe") {
REQUIRE(lb.subscribe(TOPIC));
}

SECTION("Unsubscribe without preceding subscribe") {
REQUIRE(!lb.unsubscribe(TOPIC));
}

SECTION("Unsubscribe without preceding subscribe - wildcard") {
REQUIRE(!lb.unsubscribe(TOPIC + "/#"));
}

SECTION("Unsubscribe with preceding subscribe") {
REQUIRE(lb.subscribe(TOPIC));
REQUIRE(lb.unsubscribe(TOPIC));
}

SECTION("Unsubscribe with preceding subscribe - wildcard") {
REQUIRE(lb.subscribe(TOPIC + "/#"));
REQUIRE(lb.unsubscribe(TOPIC + "/#"));
}
}

TEST_CASE("Receive subscription data", "[LocalBroker]") {
class Node : IFarNode<FarLayers::LocalBroker::LocalBroker>
{
public:
bool called = false;
std::string receivedTopic;
std::string receivedPayload;

using IFarNode::IFarNode;

virtual bool publish(const std::string& topic,
const std::string& payload)
{
return true;
}

virtual bool subscribe(const std::string& topic, SubscribeCb cb)
{
return true;
}

virtual bool unsubscribe(const std::string& topic) {
return true;
}

virtual bool receiveFar(const std::string& topic,
const std::string& payload)
{
called = true;
receivedTopic = topic;
receivedPayload = payload;
return true;
}

virtual void resubscribeAll() {}
};

FarLayers::LocalBroker::LocalBroker lb{TOPIC_PREFIX};
Node node{&lb};

SECTION("Publish, don't receive") {
CHECK(lb.publish(SRC, TOPIC, PAYLOAD));
std::this_thread::sleep_for(10ms);
CHECK(!node.called);
}

SECTION("Subscribe, publish, receive") {
CHECK(lb.subscribe(TOPIC_PUBLISH));
CHECK(lb.publish(SRC, TOPIC, PAYLOAD));
std::this_thread::sleep_for(10ms);
CHECK(node.called);
CHECK(node.receivedTopic == TOPIC_PUBLISH);
CHECK(node.receivedPayload == PAYLOAD);
}

SECTION("Subscribe, publish, receive - wildcard") {
CHECK(lb.subscribe(TOPIC_WILDCARD));
CHECK(lb.publish(SRC, TOPIC_FOR_WILDCARD, PAYLOAD));
std::this_thread::sleep_for(10ms);
CHECK(node.called);
CHECK(node.receivedTopic == TOPIC_PUBLISH_WILDCARD);
CHECK(node.receivedPayload == PAYLOAD);
}

SECTION("Subscribe, unsubscribe, publish, don't receive") {
CHECK(lb.subscribe(TOPIC_PUBLISH));
CHECK(lb.unsubscribe(TOPIC_PUBLISH));
CHECK(lb.publish(SRC, TOPIC, PAYLOAD));
std::this_thread::sleep_for(10ms);
CHECK(!node.called);
}

SECTION("Subscribe, unsubscribe, publish, don't receive - wildcard") {
CHECK(lb.subscribe(TOPIC_WILDCARD));
CHECK(lb.unsubscribe(TOPIC_WILDCARD));
CHECK(lb.publish(SRC, TOPIC_FOR_WILDCARD, PAYLOAD));
std::this_thread::sleep_for(10ms);
CHECK(!node.called);
}
}

0 comments on commit af01ebf

Please sign in to comment.