Skip to content

Commit

Permalink
Added ZMQ Transport XPUB/XSUB support
Browse files Browse the repository at this point in the history
  • Loading branch information
pkarneliuk committed Oct 1, 2023
1 parent 4a2e137 commit 8766070
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
22 changes: 16 additions & 6 deletions src/Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ class ZMQ final : public dlsm::Transport {
if (ptr) check(0 == zmq_ctx_term(ptr));
}} {
check(nullptr != context_);
// clang-format off
const auto opts = std::map<std::string_view, int>{
static const auto opts = std::map<std::string_view, int>{
// clang-format off
{"io_threads", ZMQ_IO_THREADS},
{"max_sockets", ZMQ_MAX_SOCKETS},
{"sched_policy", ZMQ_THREAD_SCHED_POLICY},
{"priority", ZMQ_THREAD_PRIORITY},
{"cpu_add", ZMQ_THREAD_AFFINITY_CPU_ADD},
{"cpu_rem", ZMQ_THREAD_AFFINITY_CPU_REMOVE},
{"ipv6", ZMQ_IPV6},
// clang-format on
};
// clang-format on

// Parse options and try to set their values to context
for (const auto& opt : dlsm::Str::ParseOpts(options)) try {
Expand Down Expand Up @@ -108,9 +108,11 @@ class ZMQ final : public dlsm::Transport {
// clang-format on
const auto& endpoint = opts_.required("endpoint");
switch (type_) {
case ZMQ_XPUB:
case ZMQ_PUB:
check(0 == zmq_bind(socket_.get(), endpoint.c_str()), "Bind to '" + endpoint + "'");
break;
case ZMQ_XSUB:
case ZMQ_SUB:
check(0 == zmq_connect(socket_.get(), endpoint.c_str()), "Connect to '" + endpoint + "'");
for (const auto& topic : subscribtions()) subscription<true>(topic);
Expand All @@ -129,9 +131,11 @@ class ZMQ final : public dlsm::Transport {
check(0 == zmq_getsockopt(socket_.get(), ZMQ_LAST_ENDPOINT, std::data(endpoint), &size));
endpoint.resize(size);
switch (type_) {
case ZMQ_XPUB:
case ZMQ_PUB:
check(0 == zmq_unbind(socket_.get(), std::data(endpoint)), "unbind(" + endpoint + ") ");
break;
case ZMQ_XSUB:
case ZMQ_SUB:
for (const auto& topic : subscribtions()) subscription<false>(topic);
check(0 == zmq_disconnect(socket_.get(), std::data(endpoint)), "disconnect(" + endpoint + ") ");
Expand Down Expand Up @@ -168,9 +172,15 @@ class ZMQ final : public dlsm::Transport {

template <bool Enable>
void subscription(const std::string& topic) {
const auto [opt, msg] = Enable ? std::pair{ZMQ_SUBSCRIBE, "Subscribe to '" + topic + "' "}
: std::pair{ZMQ_UNSUBSCRIBE, "Unsubscribe from '" + topic + "' "};
check(0 == zmq_setsockopt(socket_.get(), opt, std::data(topic), std::size(topic)), msg);
if (type_ == ZMQ_SUB) {
const auto [opt, msg] = Enable ? std::pair{ZMQ_SUBSCRIBE, "Subscribe to '" + topic + "' "}
: std::pair{ZMQ_UNSUBSCRIBE, "Unsubscribe from '" + topic + "' "};
check(0 == zmq_setsockopt(socket_.get(), opt, std::data(topic), std::size(topic)), msg);
} else if (type_ == ZMQ_XSUB) {
std::string msg(1, Enable ? '\x1' : '\x0');
msg.insert(1, topic);
send(msg.data(), msg.size());
}
}

std::vector<std::string> subscribtions() const { return dlsm::Str::split(opts_.get("subscribe", ""), "+"); }
Expand Down
49 changes: 47 additions & 2 deletions tests/unit/TestTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,24 @@ TEST(Transport, Construction) {
ThrowsMessage<std::invalid_argument>("Unexpected ZMQ Context option:thread=1 with:map::at"))
<< "Invalid configuration values";

EXPECT_THAT([] { dlsm::Transport::create("max_sockets=2,cpu_add=1,cpu_rem=1,ipv6=1,sched_policy=-1"); },
ThrowsMessage<std::invalid_argument>(
"Unexpected ZMQ Context option:sched_policy=-1 with: Error:Invalid argument"));

EXPECT_THAT(
[] { dlsm::Transport::create("priority=-1"); },
ThrowsMessage<std::invalid_argument>("Unexpected ZMQ Context option:priority=-1 with: Error:Invalid argument"));

EXPECT_THAT([&] { impl->endpoint("type=pub,endpoint=tcp://bad"); },
ThrowsMessage<dlsm::Transport::Exception>("Bind to 'tcp://bad' Error:Invalid argument"));

EXPECT_THAT([&] { impl->endpoint("type=sub,endpoint=tcp://bad"); },
ThrowsMessage<dlsm::Transport::Exception>("Connect to 'tcp://bad' Error:Invalid argument"));

EXPECT_THAT(
[&] { impl->endpoint("type=pub,endpoint=inproc://inproc-pub-sub,ipv6=25"); },
ThrowsMessage<std::invalid_argument>("Unexpected ZMQ Endpoint option:ipv6=25 with: Error:Invalid argument"));

EXPECT_THAT([&] { impl->endpoint("type=stream,endpoint=inproc://inproc-pub-sub"); },
ThrowsMessage<std::invalid_argument>("Endpoint type=stream is unsupported"));
}
Expand All @@ -33,8 +45,12 @@ TEST(Transport, ZMQPubSub) {
ASSERT_NE(impl, nullptr);

for (const auto& e : {"inproc://inproc-pub-sub"s, "ipc://@ipc-pub-sub"s, "tcp://127.0.0.1:5555"s}) {
auto pub = impl->endpoint("type=pub,endpoint=" + e);
auto sub = impl->endpoint("type=sub,endpoint=" + e);
auto pub = impl->endpoint("type=pub,endpoint=" + e +
",delay_ms=10"
",send_buf_size=256,send_hwm_msgs=10,send_timeout_ms=10");
auto sub = impl->endpoint("type=sub,endpoint=" + e +
",delay_ms=50,connect_timeout_ms=20"
",recv_buf_size=256,recv_hwm_msgs=10,recv_timeout_ms=10");

struct Payload {
int data = 42;
Expand Down Expand Up @@ -67,3 +83,32 @@ TEST(Transport, ZMQPubSub) {
}
}
}

TEST(Transport, ZMQXPubXSub) {
auto impl = dlsm::Transport::create("io_threads=1");
ASSERT_NE(impl, nullptr);

for (const auto& e : {"inproc://inproc-pub-sub"s, "ipc://@ipc-pub-sub"s, "tcp://127.0.0.1:5555"s}) {
auto pub = impl->endpoint("type=xpub,endpoint=" + e +
",delay_ms=10"
",send_buf_size=256,send_hwm_msgs=10,send_timeout_ms=10");
auto sub = impl->endpoint("type=xsub,endpoint=" + e +
",delay_ms=50,connect_timeout_ms=20,subscribe=abc"
",recv_buf_size=256,recv_hwm_msgs=10,recv_timeout_ms=10");

std::string subscribe(64, '\0');
const auto size = pub->recv(subscribe);
subscribe.resize(size);
EXPECT_EQ(subscribe,
"\x1"
"abc");

std::string expected = "abc ABCabca";
std::string received(expected.size(), ' ');

EXPECT_TRUE(pub->send("not abc"));
EXPECT_TRUE(pub->send(expected));
EXPECT_TRUE(sub->recv(received));
EXPECT_EQ(received, expected);
}
}

0 comments on commit 8766070

Please sign in to comment.