From 7f911587c8744322e75d28dd8b31a73baca990fc Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 9 Aug 2021 08:18:29 -0500 Subject: [PATCH] Add support for stream_wapper_v1 with std::string route --- programs/rodeos/streamer_plugin.cpp | 13 +++++++++-- programs/rodeos/streamer_plugin.hpp | 7 +++++- programs/rodeos/streams/logger.hpp | 16 +++++++------- programs/rodeos/streams/rabbitmq.hpp | 27 ++++++++++------------- programs/rodeos/streams/stream.hpp | 21 +++++++++++------- programs/rodeos/tests/test_rodeos_cli.cpp | 12 +++++----- 6 files changed, 56 insertions(+), 40 deletions(-) diff --git a/programs/rodeos/streamer_plugin.cpp b/programs/rodeos/streamer_plugin.cpp index ed6662b10b0..1015f6c514a 100644 --- a/programs/rodeos/streamer_plugin.cpp +++ b/programs/rodeos/streamer_plugin.cpp @@ -28,11 +28,20 @@ struct streamer_plugin_impl : public streamer_t { void stream_data(const char* data, uint64_t data_size) override { eosio::input_stream bin(data, data_size); stream_wrapper res = eosio::from_bin(bin); - const auto& sw = std::get(res); - publish_to_streams(sw); + std::visit([&](const auto& sw) { publish_to_streams(sw); }, res); } void publish_to_streams(const stream_wrapper_v0& sw) { + std::string route; + for (const auto& stream : streams) { + route = sw.route.to_string(); + if (stream->check_route(route)) { + stream->publish(sw.data, route); + } + } + } + + void publish_to_streams(const stream_wrapper_v1& sw) { for (const auto& stream : streams) { if (stream->check_route(sw.route)) { stream->publish(sw.data, sw.route); diff --git a/programs/rodeos/streamer_plugin.hpp b/programs/rodeos/streamer_plugin.hpp index 7be488bddae..e90c902e517 100644 --- a/programs/rodeos/streamer_plugin.hpp +++ b/programs/rodeos/streamer_plugin.hpp @@ -13,7 +13,12 @@ struct stream_wrapper_v0 { std::vector data; }; EOSIO_REFLECT(stream_wrapper_v0, route, data); -using stream_wrapper = std::variant; +struct stream_wrapper_v1 { + std::string route; + std::vector data; +}; +EOSIO_REFLECT(stream_wrapper_v1, route, data); +using stream_wrapper = std::variant; class streamer_plugin : public appbase::plugin { diff --git a/programs/rodeos/streams/logger.hpp b/programs/rodeos/streams/logger.hpp index 08e89373b31..8c383828081 100644 --- a/programs/rodeos/streams/logger.hpp +++ b/programs/rodeos/streams/logger.hpp @@ -6,22 +6,22 @@ namespace b1 { class logger : public stream_handler { - std::vector routes_; - public: - logger(std::vector routes) : routes_(std::move(routes)) { ilog("logger initialized"); } - - const std::vector& get_routes() const override { return routes_; } + explicit logger(std::vector routes) + : stream_handler(std::move(routes)) { + ilog("logger initialized"); + } - void publish(const std::vector& data, const eosio::name& routing_key) override { - ilog("logger stream [${data_size}] >> ${data}", ("data", data)("data_size", data.size())); + void publish(const std::vector& data, const std::string& routing_key) override { + ilog("logger stream ${r}: [${data_size}] >> ${data}", + ("r", routing_key)("data", data)("data_size", data.size())); } }; inline void initialize_loggers(std::vector>& streams, const std::vector& loggers) { for (const auto& routes_str : loggers) { - std::vector routes = extract_routes(routes_str); + std::vector routes = extract_routes(routes_str); logger logger_streamer{ std::move(routes) }; streams.emplace_back(std::make_unique(std::move(logger_streamer))); } diff --git a/programs/rodeos/streams/rabbitmq.hpp b/programs/rodeos/streams/rabbitmq.hpp index db79fd0d1f4..c177a683f6f 100644 --- a/programs/rodeos/streams/rabbitmq.hpp +++ b/programs/rodeos/streams/rabbitmq.hpp @@ -19,7 +19,6 @@ class rabbitmq : public stream_handler { const bool publish_immediately_ = false; const std::string exchange_name_; const std::string queue_name_; - const std::vector routes_; // capture all messages per block and send as one amqp transaction std::deque>> queue_; @@ -35,11 +34,11 @@ class rabbitmq : public stream_handler { } public: - rabbitmq(std::vector routes, const AMQP::Address& address, bool publish_immediately, std::string queue_name) - : address_(address) + rabbitmq(std::vector routes, const AMQP::Address& address, bool publish_immediately, std::string queue_name) + : stream_handler(std::move(routes)) + , address_(address) , publish_immediately_(publish_immediately) , queue_name_( std::move( queue_name)) - , routes_( std::move( routes)) { ilog("Connecting to RabbitMQ address ${a} - Queue: ${q}...", ("a", address)( "q", queue_name_)); bool error = false; @@ -52,12 +51,12 @@ class rabbitmq : public stream_handler { init(); } - rabbitmq(std::vector routes, const AMQP::Address& address, bool publish_immediately, + rabbitmq(std::vector routes, const AMQP::Address& address, bool publish_immediately, std::string exchange_name, std::string exchange_type) - : address_(address) + : stream_handler(std::move(routes)) + , address_(address) , publish_immediately_(publish_immediately) , exchange_name_( std::move( exchange_name)) - , routes_( std::move( routes)) { ilog("Connecting to RabbitMQ address ${a} - Exchange: ${e}...", ("a", address)( "e", exchange_name_)); bool error = false; @@ -70,8 +69,6 @@ class rabbitmq : public stream_handler { init(); } - const std::vector& get_routes() const override { return routes_; } - void start_block(uint32_t block_num) override { queue_.clear(); } @@ -83,7 +80,7 @@ class rabbitmq : public stream_handler { } } - void publish(const std::vector& data, const eosio::name& routing_key) override { + void publish(const std::vector& data, const std::string& routing_key) override { if (exchange_name_.empty()) { if( publish_immediately_ ) { amqp_publisher_->publish_message_direct( queue_name_, data, @@ -95,12 +92,12 @@ class rabbitmq : public stream_handler { } } else { if( publish_immediately_ ) { - amqp_publisher_->publish_message_direct( routing_key.to_string(), data, + amqp_publisher_->publish_message_direct( routing_key, data, []( const std::string& err ) { elog( "AMQP direct message error: ${e}", ("e", err) ); } ); } else { - queue_.emplace_back( std::make_pair( routing_key.to_string(), data ) ); + queue_.emplace_back( std::make_pair( routing_key, data ) ); } } } @@ -133,7 +130,7 @@ class rabbitmq : public stream_handler { // amqp:///vhost//* // inline AMQP::Address parse_rabbitmq_address(const std::string& cmdline_arg, std::string& queue_name_or_exchange_spec, - std::vector& routes) { + std::vector& routes) { // AMQP address starts with "amqp://" or "amqps://". const auto double_slash_pos = cmdline_arg.find("//"); if (double_slash_pos == std::string::npos) { @@ -171,7 +168,7 @@ inline void initialize_rabbits_queue(std::vector const boost::filesystem::path& p) { for (const std::string& rabbit : rabbits) { std::string queue_name; - std::vector routes; + std::vector routes; AMQP::Address address = parse_rabbitmq_address(rabbit, queue_name, routes); @@ -189,7 +186,7 @@ inline void initialize_rabbits_exchange(std::vector routes; + std::vector routes; AMQP::Address address = parse_rabbitmq_address(rabbit, exchange, routes); diff --git a/programs/rodeos/streams/stream.hpp b/programs/rodeos/streams/stream.hpp index 14cccbcfe97..b41f0de527c 100644 --- a/programs/rodeos/streams/stream.hpp +++ b/programs/rodeos/streams/stream.hpp @@ -13,18 +13,20 @@ struct streamer_t { class stream_handler { public: + explicit stream_handler(std::vector routes) + : routes_(std::move(routes)) {} + virtual ~stream_handler() {} - virtual const std::vector& get_routes() const = 0; virtual void start_block(uint32_t block_num) {}; - virtual void publish(const std::vector& data, const eosio::name& routing_key) = 0; + virtual void publish(const std::vector& data, const std::string& routing_key) = 0; virtual void stop_block(uint32_t block_num) {} - bool check_route(const eosio::name& stream_route) { - if (get_routes().size() == 0) { + bool check_route(const std::string& stream_route) { + if (routes_.size() == 0) { return true; } - for (const auto& name : get_routes()) { + for (const auto& name : routes_) { if (name == stream_route) { return true; } @@ -32,10 +34,13 @@ class stream_handler { return false; } + +private: + std::vector routes_; }; -inline std::vector extract_routes(const std::string& routes_str) { - std::vector streaming_routes{}; +inline std::vector extract_routes(const std::string& routes_str) { + std::vector streaming_routes{}; bool star = false; std::string routings = routes_str; while (routings.size() > 0) { @@ -44,7 +49,7 @@ inline std::vector extract_routes(const std::string& routes_str) { std::string route = routings.substr(0, pos); ilog("extracting route ${route}", ("route", route)); if (route != "*") { - streaming_routes.emplace_back(eosio::name(route)); + streaming_routes.emplace_back(std::move(route)); } else { star = true; } diff --git a/programs/rodeos/tests/test_rodeos_cli.cpp b/programs/rodeos/tests/test_rodeos_cli.cpp index c507ef536f0..b21225f9fab 100644 --- a/programs/rodeos/tests/test_rodeos_cli.cpp +++ b/programs/rodeos/tests/test_rodeos_cli.cpp @@ -6,9 +6,9 @@ using namespace eosio::literals; static void parse_and_check(const std::string& cmdline_arg, const std::string& expected_address, - const std::string& expected_queue_name, const std::vector& expected_routes) { + const std::string& expected_queue_name, const std::vector& expected_routes) { std::string queue_name; - std::vector routes; + std::vector routes; auto amqp_address = b1::parse_rabbitmq_address(cmdline_arg, queue_name, routes); @@ -27,11 +27,11 @@ BOOST_AUTO_TEST_CASE(rabbitmq_address_parsing) { // Two slashes (/queue/routes) parse_and_check("amqp://user:pass@host//", "amqp://user:pass@host/", "", {}); - parse_and_check("amqp://user:pass@host//r1,r2", "amqp://user:pass@host/", "", { "r1"_n, "r2"_n }); + parse_and_check("amqp://user:pass@host//r1,r2", "amqp://user:pass@host/", "", { "r1", "r2" }); parse_and_check("amqp://user:pass@host/queue/", "amqp://user:pass@host/", "queue", {}); parse_and_check("amqp://user:pass@host/queue/*", "amqp://user:pass@host/", "queue", {}); - parse_and_check("amqp://user:pass@host/queue/r1", "amqp://user:pass@host/", "queue", { "r1"_n }); - parse_and_check("amqp://user:pass@host/queue/r1,r2", "amqp://user:pass@host/", "queue", { "r1"_n, "r2"_n }); + parse_and_check("amqp://user:pass@host/queue/r1", "amqp://user:pass@host/", "queue", { "r1" }); + parse_and_check("amqp://user:pass@host/queue/r1,r2", "amqp://user:pass@host/", "queue", { "r1", "r2" }); // Three slashes (/vhost/queue/routes) parse_and_check("amqps://user:pass@host/vhost/queue/*", "amqps://user:pass@host:5671/vhost", "queue", {}); @@ -39,7 +39,7 @@ BOOST_AUTO_TEST_CASE(rabbitmq_address_parsing) { // Check that amqp-cpp detects invalid AMQP addresses. std::string queue_name; - std::vector routes; + std::vector routes; BOOST_CHECK_EXCEPTION( b1::parse_rabbitmq_address("user:pass@host", queue_name, routes), std::runtime_error,