Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10606 from EOSIO/EPE-1175-rodeos-streaming-routin…
Browse files Browse the repository at this point in the history
…g-key-str

rodeos: Add support for stream_wapper_v1 with std::string route 📦
  • Loading branch information
heifner authored Aug 10, 2021
2 parents e747262 + 7f91158 commit e0bada9
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 40 deletions.
13 changes: 11 additions & 2 deletions programs/rodeos/streamer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ struct streamer_plugin_impl : public streamer_t {
void stream_data(const char* data, uint64_t data_size) override {
eosio::input_stream bin(data, data_size);
stream_wrapper res = eosio::from_bin<stream_wrapper>(bin);
const auto& sw = std::get<stream_wrapper_v0>(res);
publish_to_streams(sw);
std::visit([&](const auto& sw) { publish_to_streams(sw); }, res);
}

void publish_to_streams(const stream_wrapper_v0& sw) {
std::string route;
for (const auto& stream : streams) {
route = sw.route.to_string();
if (stream->check_route(route)) {
stream->publish(sw.data, route);
}
}
}

void publish_to_streams(const stream_wrapper_v1& sw) {
for (const auto& stream : streams) {
if (stream->check_route(sw.route)) {
stream->publish(sw.data, sw.route);
Expand Down
7 changes: 6 additions & 1 deletion programs/rodeos/streamer_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ struct stream_wrapper_v0 {
std::vector<char> data;
};
EOSIO_REFLECT(stream_wrapper_v0, route, data);
using stream_wrapper = std::variant<stream_wrapper_v0>;
struct stream_wrapper_v1 {
std::string route;
std::vector<char> data;
};
EOSIO_REFLECT(stream_wrapper_v1, route, data);
using stream_wrapper = std::variant<stream_wrapper_v0, stream_wrapper_v1>;

class streamer_plugin : public appbase::plugin<streamer_plugin> {

Expand Down
16 changes: 8 additions & 8 deletions programs/rodeos/streams/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@
namespace b1 {

class logger : public stream_handler {
std::vector<eosio::name> routes_;

public:
logger(std::vector<eosio::name> routes) : routes_(std::move(routes)) { ilog("logger initialized"); }

const std::vector<eosio::name>& get_routes() const override { return routes_; }
explicit logger(std::vector<std::string> routes)
: stream_handler(std::move(routes)) {
ilog("logger initialized");
}

void publish(const std::vector<char>& data, const eosio::name& routing_key) override {
ilog("logger stream [${data_size}] >> ${data}", ("data", data)("data_size", data.size()));
void publish(const std::vector<char>& data, const std::string& routing_key) override {
ilog("logger stream ${r}: [${data_size}] >> ${data}",
("r", routing_key)("data", data)("data_size", data.size()));
}
};

inline void initialize_loggers(std::vector<std::unique_ptr<stream_handler>>& streams,
const std::vector<std::string>& loggers) {
for (const auto& routes_str : loggers) {
std::vector<eosio::name> routes = extract_routes(routes_str);
std::vector<std::string> routes = extract_routes(routes_str);
logger logger_streamer{ std::move(routes) };
streams.emplace_back(std::make_unique<logger>(std::move(logger_streamer)));
}
Expand Down
27 changes: 12 additions & 15 deletions programs/rodeos/streams/rabbitmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class rabbitmq : public stream_handler {
const bool publish_immediately_ = false;
const std::string exchange_name_;
const std::string queue_name_;
const std::vector<eosio::name> routes_;
// capture all messages per block and send as one amqp transaction
std::deque<std::pair<std::string, std::vector<char>>> queue_;

Expand All @@ -35,11 +34,11 @@ class rabbitmq : public stream_handler {
}

public:
rabbitmq(std::vector<eosio::name> routes, const AMQP::Address& address, bool publish_immediately, std::string queue_name)
: address_(address)
rabbitmq(std::vector<std::string> routes, const AMQP::Address& address, bool publish_immediately, std::string queue_name)
: stream_handler(std::move(routes))
, address_(address)
, publish_immediately_(publish_immediately)
, queue_name_( std::move( queue_name))
, routes_( std::move( routes))
{
ilog("Connecting to RabbitMQ address ${a} - Queue: ${q}...", ("a", address)( "q", queue_name_));
bool error = false;
Expand All @@ -52,12 +51,12 @@ class rabbitmq : public stream_handler {
init();
}

rabbitmq(std::vector<eosio::name> routes, const AMQP::Address& address, bool publish_immediately,
rabbitmq(std::vector<std::string> routes, const AMQP::Address& address, bool publish_immediately,
std::string exchange_name, std::string exchange_type)
: address_(address)
: stream_handler(std::move(routes))
, address_(address)
, publish_immediately_(publish_immediately)
, exchange_name_( std::move( exchange_name))
, routes_( std::move( routes))
{
ilog("Connecting to RabbitMQ address ${a} - Exchange: ${e}...", ("a", address)( "e", exchange_name_));
bool error = false;
Expand All @@ -70,8 +69,6 @@ class rabbitmq : public stream_handler {
init();
}

const std::vector<eosio::name>& get_routes() const override { return routes_; }

void start_block(uint32_t block_num) override {
queue_.clear();
}
Expand All @@ -83,7 +80,7 @@ class rabbitmq : public stream_handler {
}
}

void publish(const std::vector<char>& data, const eosio::name& routing_key) override {
void publish(const std::vector<char>& data, const std::string& routing_key) override {
if (exchange_name_.empty()) {
if( publish_immediately_ ) {
amqp_publisher_->publish_message_direct( queue_name_, data,
Expand All @@ -95,12 +92,12 @@ class rabbitmq : public stream_handler {
}
} else {
if( publish_immediately_ ) {
amqp_publisher_->publish_message_direct( routing_key.to_string(), data,
amqp_publisher_->publish_message_direct( routing_key, data,
[]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
} );
} else {
queue_.emplace_back( std::make_pair( routing_key.to_string(), data ) );
queue_.emplace_back( std::make_pair( routing_key, data ) );
}
}
}
Expand Down Expand Up @@ -133,7 +130,7 @@ class rabbitmq : public stream_handler {
// amqp:///vhost//*
//
inline AMQP::Address parse_rabbitmq_address(const std::string& cmdline_arg, std::string& queue_name_or_exchange_spec,
std::vector<eosio::name>& routes) {
std::vector<std::string>& routes) {
// AMQP address starts with "amqp://" or "amqps://".
const auto double_slash_pos = cmdline_arg.find("//");
if (double_slash_pos == std::string::npos) {
Expand Down Expand Up @@ -171,7 +168,7 @@ inline void initialize_rabbits_queue(std::vector<std::unique_ptr<stream_handler>
const boost::filesystem::path& p) {
for (const std::string& rabbit : rabbits) {
std::string queue_name;
std::vector<eosio::name> routes;
std::vector<std::string> routes;

AMQP::Address address = parse_rabbitmq_address(rabbit, queue_name, routes);

Expand All @@ -189,7 +186,7 @@ inline void initialize_rabbits_exchange(std::vector<std::unique_ptr<stream_handl
const boost::filesystem::path& p) {
for (const std::string& rabbit : rabbits) {
std::string exchange;
std::vector<eosio::name> routes;
std::vector<std::string> routes;

AMQP::Address address = parse_rabbitmq_address(rabbit, exchange, routes);

Expand Down
21 changes: 13 additions & 8 deletions programs/rodeos/streams/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,34 @@ struct streamer_t {

class stream_handler {
public:
explicit stream_handler(std::vector<std::string> routes)
: routes_(std::move(routes)) {}

virtual ~stream_handler() {}
virtual const std::vector<eosio::name>& get_routes() const = 0;
virtual void start_block(uint32_t block_num) {};
virtual void publish(const std::vector<char>& data, const eosio::name& routing_key) = 0;
virtual void publish(const std::vector<char>& data, const std::string& routing_key) = 0;
virtual void stop_block(uint32_t block_num) {}

bool check_route(const eosio::name& stream_route) {
if (get_routes().size() == 0) {
bool check_route(const std::string& stream_route) {
if (routes_.size() == 0) {
return true;
}

for (const auto& name : get_routes()) {
for (const auto& name : routes_) {
if (name == stream_route) {
return true;
}
}

return false;
}

private:
std::vector<std::string> routes_;
};

inline std::vector<eosio::name> extract_routes(const std::string& routes_str) {
std::vector<eosio::name> streaming_routes{};
inline std::vector<std::string> extract_routes(const std::string& routes_str) {
std::vector<std::string> streaming_routes{};
bool star = false;
std::string routings = routes_str;
while (routings.size() > 0) {
Expand All @@ -44,7 +49,7 @@ inline std::vector<eosio::name> extract_routes(const std::string& routes_str) {
std::string route = routings.substr(0, pos);
ilog("extracting route ${route}", ("route", route));
if (route != "*") {
streaming_routes.emplace_back(eosio::name(route));
streaming_routes.emplace_back(std::move(route));
} else {
star = true;
}
Expand Down
12 changes: 6 additions & 6 deletions programs/rodeos/tests/test_rodeos_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
using namespace eosio::literals;

static void parse_and_check(const std::string& cmdline_arg, const std::string& expected_address,
const std::string& expected_queue_name, const std::vector<eosio::name>& expected_routes) {
const std::string& expected_queue_name, const std::vector<std::string>& expected_routes) {
std::string queue_name;
std::vector<eosio::name> routes;
std::vector<std::string> routes;

auto amqp_address = b1::parse_rabbitmq_address(cmdline_arg, queue_name, routes);

Expand All @@ -27,19 +27,19 @@ BOOST_AUTO_TEST_CASE(rabbitmq_address_parsing) {

// Two slashes (/queue/routes)
parse_and_check("amqp://user:pass@host//", "amqp://user:pass@host/", "", {});
parse_and_check("amqp://user:pass@host//r1,r2", "amqp://user:pass@host/", "", { "r1"_n, "r2"_n });
parse_and_check("amqp://user:pass@host//r1,r2", "amqp://user:pass@host/", "", { "r1", "r2" });
parse_and_check("amqp://user:pass@host/queue/", "amqp://user:pass@host/", "queue", {});
parse_and_check("amqp://user:pass@host/queue/*", "amqp://user:pass@host/", "queue", {});
parse_and_check("amqp://user:pass@host/queue/r1", "amqp://user:pass@host/", "queue", { "r1"_n });
parse_and_check("amqp://user:pass@host/queue/r1,r2", "amqp://user:pass@host/", "queue", { "r1"_n, "r2"_n });
parse_and_check("amqp://user:pass@host/queue/r1", "amqp://user:pass@host/", "queue", { "r1" });
parse_and_check("amqp://user:pass@host/queue/r1,r2", "amqp://user:pass@host/", "queue", { "r1", "r2" });

// Three slashes (/vhost/queue/routes)
parse_and_check("amqps://user:pass@host/vhost/queue/*", "amqps://user:pass@host:5671/vhost", "queue", {});
parse_and_check("amqps://user:pass@host/vhost//*", "amqps://user:pass@host:5671/vhost", "", {});

// Check that amqp-cpp detects invalid AMQP addresses.
std::string queue_name;
std::vector<eosio::name> routes;
std::vector<std::string> routes;

BOOST_CHECK_EXCEPTION(
b1::parse_rabbitmq_address("user:pass@host", queue_name, routes), std::runtime_error,
Expand Down

0 comments on commit e0bada9

Please sign in to comment.