Skip to content

Commit

Permalink
WIP Poller and tests apps
Browse files Browse the repository at this point in the history
  • Loading branch information
pkarneliuk committed May 6, 2024
1 parent 1459869 commit f15e00f
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 42 deletions.
48 changes: 48 additions & 0 deletions include/impl/Node.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <nlohmann/json.hpp>
#include "impl/Logger.hpp"
#include "impl/Transport.hpp"

using namespace std::literals;

namespace dlsm {

class Node {
public:

using Transport = dlsm::Transport<dlsm::ZMQ>;

const nlohmann::json config_;
dlsm::Logger log_;
Transport transport_;
Transport::Pub pub_;
Transport::Sub sub_;
Transport::Poller poller_;

Node(nlohmann::json config)
: config_(std::move(config))
, log_{config_.value("log","debug:stdout:")}
, transport_{config_.value("transport","io_threads=1")}
, pub_{transport_.pub(config_.at("commands").get<std::string>())}
, sub_{transport_.sub(config_.at("commands").get<std::string>())}
, poller_{transport_.poller()}
{
LOG_INFO("Started config {}", config_.dump(4));
}
Node(std::filesystem::path config) : Node{nlohmann::json::parse(std::ifstream{config.string()})} {}

void send() {
auto tosend = "1234"s;
pub_.send(tosend);
}

void recv() {
auto torecv = "000011111111"s;
sub_.recv(torecv);

LOG_INFO("Started recv {}", torecv);
}
};

} // namespace dlsm
86 changes: 63 additions & 23 deletions include/impl/Transport.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <cstddef>
#include <cstring>
#include <memory>
#include <span>
#include <string_view>
#include <type_traits>

namespace dlsm {

template <typename C, typename T = typename C::value_type>
concept DataSize = requires(C c) {
{ std::data(c) };
{ std::size(c) };
};

template <typename T>
concept PayloadPOD = std::is_trivially_destructible_v<T> && std::is_standard_layout_v<T> && sizeof(T) <= 1024;

Expand All @@ -29,18 +38,19 @@ struct ZMQ;
template <typename Runtime>
class Transport {
public:
using Data = std::span<std::byte>;
struct Pub {
class Impl;
std::unique_ptr<Impl> p;

~Pub();
void* loan(const std::uint32_t payload);
void publish(void* payload);
void release(void* payload);
Data loan(const std::size_t length);
void publish(Data payload);
void release(Data payload);

bool send(const void* data, const std::size_t length) {
if (void* payload = loan(static_cast<std::uint32_t>(length)); payload) {
std::memcpy(payload, data, length);
if (auto payload = loan(length); !payload.empty()) {
std::memcpy(payload.data(), data, length);
publish(payload);
return true;
}
Expand Down Expand Up @@ -70,19 +80,18 @@ class Transport {
template <typename T>
struct Sample {
Pub& pub_;
T* sample_;
Data sample_;

Sample(Pub& p) : pub_{p}, sample_{reinterpret_cast<T*>(p.loan(sizeof(T)))} {}
Sample(Pub& p) : pub_{p}, sample_{p.loan(sizeof(T))} {}
~Sample() {
if (sample_) pub_.release(sample_);
if (!sample_.empty()) pub_.release(sample_);
}

operator bool() const { return sample_ != nullptr; }
T& value() { return *sample_; }
operator bool() const { return !sample_.empty(); }
T& value() { return *reinterpret_cast<T*>(sample_.data()); }
void publish() {
if (sample_) {
if (!sample_.empty()) {
pub_.publish(sample_);
sample_ = nullptr;
}
}
};
Expand All @@ -98,12 +107,12 @@ class Transport {
std::unique_ptr<Impl> p;

~Sub();
const void* take();
void release(const void* payload);
Data take();
void release(Data payload);

bool recv(void* data, std::size_t& length) {
if (const void* payload = take(); payload) {
std::memcpy(data, payload, length);
if (Data payload = take(); !payload.empty()) {
std::memcpy(data, payload.data(), length);
release(payload);
return true;
}
Expand All @@ -117,8 +126,14 @@ class Transport {
}
template <dlsm::PayloadContiguous Contiguous>
std::size_t recv(Contiguous& c) {
std::size_t size = std::size(c) * sizeof(typename Contiguous::value_type);
return recv(std::data(c), size) ? size : 0UL;
if (Data payload = take(); !payload.empty()) {
const auto n = payload.size() / sizeof(typename Contiguous::value_type);
c.resize(n);
std::memcpy(std::data(c), payload.data(), n * sizeof(typename Contiguous::value_type));
release(payload);
return n;
}
return 0;
}
template <dlsm::PayloadSequence Sequence>
std::size_t recv(Sequence& c) {
Expand All @@ -136,15 +151,15 @@ class Transport {
template <typename T>
struct Sample {
Sub& sub_;
const T* sample_;
Data sample_;

Sample(Sub& s) : sub_{s}, sample_{reinterpret_cast<const T*>(sub_.take())} {}
Sample(Sub& s) : sub_{s}, sample_{sub_.take()} {}
~Sample() {
if (sample_) sub_.release(sample_);
if (!sample_.empty()) sub_.release(sample_);
}

operator bool() const { return sample_ != nullptr; }
const T& value() { return *sample_; }
operator bool() const { return !sample_.empty(); }
const T& value() { return *reinterpret_cast<const T*>(sample_.data()); }
};

template <typename T>
Expand All @@ -153,12 +168,37 @@ class Transport {
}
};

struct Poller {
class Impl;
std::unique_ptr<Impl> p;


struct Timer {
using Ptr = std::unique_ptr<Timer>;
struct Handler {
virtual ~Handler() = default;
virtual void handler(Timer& timer) = 0;
};
virtual ~Timer() = default;
virtual bool active() = 0;
virtual void cancel() = 0;
virtual void reset() = 0;
virtual void set(std::chrono::milliseconds interval) = 0;
};

~Poller();

Timer::Ptr timer(Timer::Handler& handler, std::chrono::milliseconds interval);
std::size_t once(std::chrono::milliseconds timeout = 0);
};

const std::unique_ptr<Runtime> ptr_;

Transport(std::string_view options);
~Transport();

Pub pub(std::string_view options);
Sub sub(std::string_view options);
Poller poller();
};
} // namespace dlsm
4 changes: 3 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
find_package(nlohmann_json REQUIRED)
find_package(spdlog REQUIRED)
find_package(ZeroMQ REQUIRED)

file(GLOB_RECURSE PUBLIC_HDRS ../include/dlsm/*.hpp ${VERSION_HPP})
file(GLOB_RECURSE INTERNAL_HDRS ../include/impl/*.hpp)

add_library (dlsm STATIC ${INTERNAL_HDRS} ${PUBLIC_HDRS})
target_precompile_headers (dlsm PRIVATE ${INTERNAL_HDRS} ${PUBLIC_HDRS})
# target_precompile_headers (dlsm PRIVATE ${INTERNAL_HDRS} ${PUBLIC_HDRS})
target_include_directories(dlsm PUBLIC
"$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/gen/include>"
"$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/include>"
Expand All @@ -26,6 +27,7 @@ target_link_libraries (dlsm
Options
$<TARGET_NAME_IF_EXISTS:Coverage>
fbs
nlohmann_json::nlohmann_json
spdlog::spdlog
atomic
PRIVATE
Expand Down
Loading

0 comments on commit f15e00f

Please sign in to comment.