From fe2ddff20bd2121dd9596a70314d88ddd336c0ba Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 11 Mar 2021 08:06:26 -0600 Subject: [PATCH 01/11] Add ability to specify correlation id to publish_message_direct --- .../eosio/amqp/reliable_amqp_publisher.hpp | 3 ++- libraries/amqp/reliable_amqp_publisher.cpp | 15 +++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp index ed733a3261c..80c41f630db 100644 --- a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp +++ b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp @@ -48,8 +48,9 @@ class reliable_amqp_publisher { /// Publish message directly to AMQP queue /// Bypasses all reliable mechanisms and just does a simple AMQP publish. Does not queue or retry. /// \param routing_key if empty() uses class provided default routing_key + /// \param correlation_id if not empty() sets as correlation id of the message envelope /// \param data message to send - void publish_message_direct(const std::string& routing_key, std::vector data); + void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, std::vector data); /// reliable_amqp_publisher runs its own thread. In some cases it may be desirable to skip a needless thread jump /// when performing work. This method will allow submission of work to reliable_amqp_publisher's thread. diff --git a/libraries/amqp/reliable_amqp_publisher.cpp b/libraries/amqp/reliable_amqp_publisher.cpp index 0da835bac60..fa364377893 100644 --- a/libraries/amqp/reliable_amqp_publisher.cpp +++ b/libraries/amqp/reliable_amqp_publisher.cpp @@ -30,7 +30,7 @@ struct reliable_amqp_publisher_impl { void pump_queue(); void publish_message_raw(std::vector&& data); void publish_messages_raw(std::deque>>&& queue); - void publish_message_direct(const std::string& routing_key, std::vector data); + void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, std::vector data); bool verify_max_queue_size(); void channel_ready(AMQP::Channel* channel); @@ -224,10 +224,10 @@ void reliable_amqp_publisher_impl::publish_messages_raw(std::deque data) { +void reliable_amqp_publisher_impl::publish_message_direct(const std::string& rk, const std::string& correlation_id, std::vector data) { if(!ctx.get_executor().running_in_this_thread()) { - boost::asio::post(user_submitted_work_strand, [this, rk, d=std::move(data)]() mutable { - publish_message_direct(rk, std::move(d)); + boost::asio::post(user_submitted_work_strand, [this, rk, correlation_id, d=std::move(data)]() mutable { + publish_message_direct(rk, correlation_id, std::move(d)); }); return; } @@ -242,6 +242,8 @@ void reliable_amqp_publisher_impl::publish_message_direct(const std::string& rk, envelope.setPersistent(); if(message_id) envelope.setMessageID(*message_id); + if(!correlation_id.empty()) + envelope.setCorrelationID(correlation_id); channel->publish(exchange, rk.empty() ? routing_key : rk, envelope); } @@ -258,8 +260,9 @@ void reliable_amqp_publisher::publish_messages_raw(std::dequepublish_messages_raw( std::move( queue ) ); } -void reliable_amqp_publisher::publish_message_direct(const std::string& routing_key, std::vector data) { - my->publish_message_direct( routing_key, std::move(data) ); +void reliable_amqp_publisher::publish_message_direct(const std::string& routing_key, const std::string& correlation_id, + std::vector data) { + my->publish_message_direct( routing_key, correlation_id, std::move(data) ); } void reliable_amqp_publisher::post_on_io_context(std::function&& f) { From 213cad6cb6de8084812950ca8c5939a3ac106104 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 11 Mar 2021 08:06:44 -0600 Subject: [PATCH 02/11] Add ability to specify correlation id to publish_message_direct --- programs/rodeos/streams/rabbitmq.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/programs/rodeos/streams/rabbitmq.hpp b/programs/rodeos/streams/rabbitmq.hpp index 0bbc915b447..78867222816 100644 --- a/programs/rodeos/streams/rabbitmq.hpp +++ b/programs/rodeos/streams/rabbitmq.hpp @@ -79,13 +79,13 @@ class rabbitmq : public stream_handler { void publish(const std::vector& data, const eosio::name& routing_key) override { if (exchange_name_.empty()) { if( publish_immediately_ ) { - amqp_publisher_->publish_message_direct( queue_name_, data ); + amqp_publisher_->publish_message_direct( queue_name_, std::string(), data ); } else { queue_.emplace_back( std::make_pair( queue_name_, data ) ); } } else { if( publish_immediately_ ) { - amqp_publisher_->publish_message_direct( routing_key.to_string(), data ); + amqp_publisher_->publish_message_direct( routing_key.to_string(), std::string(), data ); } else { queue_.emplace_back( std::make_pair( routing_key.to_string(), data ) ); } From 3c2e80d9e08d758e4e779ec390b695dc9927855c Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 11 Mar 2021 08:07:35 -0600 Subject: [PATCH 03/11] Use reliable_amqp_publisher for retry capability on lost connection to amqp --- .../amqp_trace_plugin/amqp_trace_plugin.cpp | 13 ++------- .../amqp_trace_plugin_impl.cpp | 29 +++++++++++-------- .../amqp_trace_plugin_impl.hpp | 4 +-- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp index b2fe209d1fd..9719fe2f489 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp @@ -51,11 +51,9 @@ void amqp_trace_plugin::plugin_startup() { ilog( "Starting amqp_trace_plugin" ); my->started = true; - my->amqp_trace.emplace( my->amqp_trace_address, "trace", - []( const std::string& err ) { - elog( "amqp error: ${e}", ("e", err) ); - app().quit(); - } ); + const boost::filesystem::path trace_data_file_path = appbase::app().data_dir() / "amqp_trace_plugin" / "trace.bin"; + + my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, "trace", trace_data_file_path ); auto chain_plug = app().find_plugin(); EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" ); @@ -80,11 +78,6 @@ void amqp_trace_plugin::plugin_shutdown() { dlog( "shutdown.." ); applied_transaction_connection.reset(); - if( my->amqp_trace ) { - // use stop() instead of destroying amqp_trace as amqp_trx_plugin keeps a pointer to amqp_trace - // and needs to live until amqp_trx_plugin shutdown. - my->amqp_trace->stop(); - } dlog( "exit amqp_trace_plugin" ); } diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp index 14b77eab8d0..e7f290ee57b 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp @@ -12,10 +12,13 @@ namespace eosio { // called from any thread void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code, std::string error_message ) { try { - transaction_trace_msg msg{transaction_trace_exception{error_code}}; - std::get( msg ).error_message = std::move( error_message ); - auto buf = convert_to_bin( msg ); - amqp_trace->publish( amqp_trace_exchange, tid, std::move( buf ) ); + // reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns + amqp_trace->post_on_io_context([&amqp_trace = *amqp_trace, tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable { + transaction_trace_msg msg{transaction_trace_exception{error_code}}; + std::get( msg ).error_message = std::move( em ); + std::vector buf = convert_to_bin( msg ); + amqp_trace.publish_message_direct(std::string(), tid, std::move(buf)); + }); } FC_LOG_AND_DROP() } @@ -33,15 +36,17 @@ void amqp_trace_plugin_impl::on_applied_transaction( const chain::transaction_tr void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace ) { try { - if( !trace->except ) { - dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) ); - } else { - dlog( "trace except : ${m}", ("m", trace->except->to_string()) ); - } - amqp_trace->publish( amqp_trace_exchange, trx->id(), [trace]() { + // reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns + amqp_trace->post_on_io_context([&amqp_trace = *amqp_trace, trx, trace]() { + if( !trace->except ) { + dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) ); + } else { + dlog( "trace except : ${m}", ("m", trace->except->to_string()) ); + } transaction_trace_msg msg{ eosio::state_history::convert( *trace ) }; - return convert_to_bin( msg ); - } ); + std::vector buf = convert_to_bin( msg ); + amqp_trace.publish_message_direct(std::string(), trx->id(), std::move(buf)); + }); } FC_LOG_AND_DROP() } diff --git a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp index 052c3932ea6..a2832cd3c60 100644 --- a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp +++ b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp @@ -1,5 +1,5 @@ #pragma once -#include +#include #include #include @@ -7,7 +7,7 @@ namespace eosio { struct amqp_trace_plugin_impl : std::enable_shared_from_this { - std::optional amqp_trace; + std::optional amqp_trace; std::string amqp_trace_address; std::string amqp_trace_exchange; From dabc1141c123ead4d75632d200c0677076bb7f5a Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 11 Mar 2021 17:09:22 -0600 Subject: [PATCH 04/11] Reimplement amqp_handler via single_channel_retrying_amqp_connection to provide retry on connection loss --- .../amqp/include/eosio/amqp/amqp_handler.hpp | 163 ++++++++++-------- plugins/amqp_trx_plugin/amqp_trx_plugin.cpp | 10 +- programs/cleos/main.cpp | 2 +- programs/rodeos/streams/rabbitmq.hpp | 4 +- 4 files changed, 100 insertions(+), 79 deletions(-) diff --git a/libraries/amqp/include/eosio/amqp/amqp_handler.hpp b/libraries/amqp/include/eosio/amqp/amqp_handler.hpp index d24a638ba0d..32680702358 100644 --- a/libraries/amqp/include/eosio/amqp/amqp_handler.hpp +++ b/libraries/amqp/include/eosio/amqp/amqp_handler.hpp @@ -1,20 +1,21 @@ #pragma once +#include #include #include #include #include #include -#include -#include +#include namespace eosio { /// Designed to work with internal io_service running on a dedicated thread. -/// All publish methods can be called from any thread, but should be sync'ed with stop() & destructor. +/// Calls on_consume from internal thread. +/// ack() & reject() & publish methods can be called from any thread, but should be sync'ed with stop() & destructor. /// Constructor, stop(), destructor should be called from same thread. -class amqp { +class amqp_handler { public: // called from amqp thread or calling thread, but not concurrently using on_error_t = std::function; @@ -28,8 +29,8 @@ class amqp { /// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr /// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed. /// user required to ack/reject delivery_tag for each callback. - amqp( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr ) - : amqp( address, std::move(name), "", "", std::move(on_err), std::move(on_consume)) + amqp_handler( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr ) + : amqp_handler( address, std::move(name), "", "", std::move(on_err), std::move(on_consume)) {} /// @param address AMQP address @@ -38,54 +39,68 @@ class amqp { /// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr /// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed. /// user required to ack/reject delivery_tag for each callback. - amqp( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr ) - : amqp( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume)) + amqp_handler( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr ) + : amqp_handler( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume)) {} /// drain queue and stop thread - ~amqp() { + ~amqp_handler() { stop(); } /// publish to AMQP address with routing key name + // on_error() called if not connected void publish( std::string exchange, std::string correlation_id, std::vector buf ) { - boost::asio::post( *handler_->amqp_strand(), + boost::asio::post( thread_pool_.get_executor(), [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), buf=std::move(buf)]() { AMQP::Envelope env( buf.data(), buf.size() ); env.setCorrelationID( cid ); - my->channel_->publish( exchange, my->name_, env, 0 ); + if( my->channel_ ) + my->channel_->publish( exchange, my->name_, env, 0 ); + else + my->on_error( "AMQP Unable to publish: " + cid ); } ); } /// publish to AMQP calling f() -> std::vector on amqp thread + // on_error() called if not connected template void publish( std::string exchange, std::string correlation_id, Func f ) { - boost::asio::post( *handler_->amqp_strand(), + boost::asio::post( thread_pool_.get_executor(), [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), f=std::move(f)]() { std::vector buf = f(); AMQP::Envelope env( buf.data(), buf.size() ); env.setCorrelationID( cid ); - my->channel_->publish( exchange, my->name_, env, 0 ); + if( my->channel_ ) + my->channel_->publish( exchange, my->name_, env, 0 ); + else + my->on_error( "AMQP Unable to publish: " + cid ); } ); } /// ack consume message /// @param multiple true if given delivery_tag and all previous should be ack'ed void ack( const delivery_tag_t& delivery_tag, bool multiple = false ) { - boost::asio::post( *handler_->amqp_strand(), + boost::asio::post( thread_pool_.get_executor(), [my = this, delivery_tag, multiple]() { try { - my->channel_->ack( delivery_tag, multiple ? AMQP::multiple : 0 ); + if( my->channel_ ) + my->channel_->ack( delivery_tag, multiple ? AMQP::multiple : 0 ); + else + my->on_error( "AMQP Unable to ack: " + std::to_string(delivery_tag) ); } FC_LOG_AND_DROP() } ); } // reject consume message void reject( const delivery_tag_t& delivery_tag ) { - boost::asio::post( *handler_->amqp_strand(), + boost::asio::post( thread_pool_.get_executor(), [my = this, delivery_tag]() { try { - my->channel_->reject( delivery_tag ); + if( my->channel_ ) + my->channel_->reject( delivery_tag ); + else + my->on_error( "AMQP Unable to reject: " + std::to_string(delivery_tag) ); } FC_LOG_AND_DROP() } ); } @@ -94,11 +109,11 @@ class amqp { /// Not thread safe, call only once from constructor/destructor thread /// Do not call from lambda's passed to publish or constructor e.g. on_error void stop() { - if( handler_ ) { + if( first_connect_.un_set() ) { // drain amqp queue std::promise stop_promise; auto future = stop_promise.get_future(); - boost::asio::post( *handler_->amqp_strand(), [&]() { + boost::asio::post( thread_pool_.get_executor(), [&]() { if( channel_ && channel_->usable() ) { auto& cb = channel_->close(); cb.onFinalize( [&]() { @@ -119,15 +134,16 @@ class amqp { future.wait(); thread_pool_.stop(); - handler_.reset(); } } private: - amqp(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume) - : thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread - , connected_future_( connected_.get_future() ) + amqp_handler(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type, + on_error_t on_err, on_consume_t on_consume) + : first_connect_() + , thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread + , amqp_connection_( thread_pool_.get_executor(), address, [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} ) , name_( std::move( name ) ) , exchange_name_( std::move( exchange_name ) ) , exchange_type_( std::move( exchange_type ) ) @@ -137,23 +153,39 @@ class amqp { AMQP::Address amqp_address( address ); ilog( "Connecting to AMQP address ${a} - Queue: ${q}...", ("a", amqp_address)("q", name_) ); - handler_ = std::make_unique( *this, thread_pool_.get_executor() ); - boost::asio::post( *handler_->amqp_strand(), [&]() { - connection_ = std::make_unique( handler_.get(), amqp_address ); - }); wait(); } + // called from non-amqp thread + void wait() { + if( !first_connect_.wait() ) { + boost::asio::post( thread_pool_.get_executor(), [this](){ + on_error( "AMQP timeout connecting and declaring queue" ); + } ); + } + } + // called from amqp thread - void init( AMQP::TcpConnection* c ) { - channel_ = std::make_unique( c ); + void channel_ready(AMQP::Channel* c) { + channel_ = c; + init(); + } + + // called from amqp thread + void channel_failed() { + channel_ = nullptr; + } + + // called from amqp thread + void init() { + if(!channel_) return; if( !exchange_type_.empty() ) { auto type = AMQP::direct; if (exchange_type_ == "fanout") { type = AMQP::fanout; } else if (exchange_type_ != "direct") { on_error( "Unsupported exchange type: " + exchange_type_ ); - connected_.set_value(); + first_connect_.set(); return; } @@ -164,7 +196,7 @@ class amqp { } ); exchange.onError([this](const char* error_message) { on_error( std::string("AMQP Queue error: ") + error_message ); - connected_.set_value(); + first_connect_.set(); }); } else { auto& queue = channel_->declareQueue( name_, AMQP::durable ); @@ -175,7 +207,7 @@ class amqp { } ); queue.onError( [&]( const char* error_message ) { on_error( error_message ); - connected_.set_value(); + first_connect_.set(); } ); } } @@ -183,73 +215,62 @@ class amqp { // called from amqp thread void init_consume() { if( !on_consume_ ) { - connected_.set_value(); + first_connect_.set(); return; } auto& consumer = channel_->consume( name_ ); consumer.onSuccess( [&]( const std::string& consumer_tag ) { dlog( "consume started: ${tag}", ("tag", consumer_tag) ); - connected_.set_value(); + first_connect_.set(); } ); consumer.onError( [&]( const char* message ) { wlog( "consume failed: ${e}", ("e", message) ); on_error( message ); - connected_.set_value(); + first_connect_.set(); } ); consumer.onReceived( [&](const AMQP::Message& message, const delivery_tag_t& delivery_tag, bool redelivered) { on_consume_( delivery_tag, message.body(), message.bodySize() ); } ); } - // called from non-amqp thread - void wait() { - auto r = connected_future_.wait_for( std::chrono::seconds( 10 ) ); - if( r == std::future_status::timeout ) { - on_error( "AMQP timeout declaring queue" ); - } - } - + // called from amqp thread void on_error( const std::string& message ) { - std::lock_guard g(mtx_); if( on_error_ ) on_error_( message ); } - class amqp_handler : public AMQP::LibBoostAsioHandler { +private: + class start_cond { + private: + std::mutex mtx_; + bool started_ = false; + std::condition_variable start_cond_; public: - explicit amqp_handler( amqp& impl, boost::asio::io_service& io_service ) - : AMQP::LibBoostAsioHandler( io_service ) - , amqp_(impl) - {} - - void onError( AMQP::TcpConnection* connection, const char* message ) override { - elog( "amqp connection failed: ${m}", ("m", message) ); - amqp_.on_error( message ); - } - - uint16_t onNegotiate( AMQP::TcpConnection* connection, uint16_t interval ) override { - return 0; // disable heartbeats + void set() { + { + auto lock = std::scoped_lock(mtx_); + started_ = true; + } + start_cond_.notify_one(); } - - void onReady(AMQP::TcpConnection* c) override { - amqp_.init( c ); + bool wait() { + std::unique_lock lk(mtx_); + return start_cond_.wait_for( lk, std::chrono::seconds( 10 ), [&]{ return started_; } ); } - - strand_shared_ptr& amqp_strand() { - return _strand; + // only unset on shutdown, return prev value + bool un_set() { + auto lock = std::scoped_lock( mtx_ ); + bool s = started_; + started_ = false; + return s; } - - amqp& amqp_; }; private: - std::mutex mtx_; + start_cond first_connect_; eosio::chain::named_thread_pool thread_pool_; - std::unique_ptr handler_; - std::unique_ptr connection_; - std::unique_ptr channel_; - std::promise connected_; - std::future connected_future_; + single_channel_retrying_amqp_connection amqp_connection_; + AMQP::Channel* channel_ = nullptr; // null when not connected std::string name_; std::string exchange_name_; std::string exchange_type_; diff --git a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp index 0b9861b620f..9d3641bb2cd 100644 --- a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp +++ b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp @@ -56,11 +56,11 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this chain_plugin* chain_plug = nullptr; amqp_trace_plugin* trace_plug = nullptr; - std::optional amqp_trx; + std::optional amqp_trx; std::string amqp_trx_address; ack_mode acked = ack_mode::executed; - std::map tracked_delivery_tags; // block, highest delivery_tag for block + std::map tracked_delivery_tags; // block, highest delivery_tag for block uint32_t trx_processing_queue_size = 1000; bool allow_speculative_execution = false; std::shared_ptr> trx_queue_ptr; @@ -71,7 +71,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this // called from amqp thread - void consume_message( const eosio::amqp::delivery_tag_t& delivery_tag, const char* buf, size_t s ) { + void consume_message( const eosio::amqp_handler::delivery_tag_t& delivery_tag, const char* buf, size_t s ) { try { fc::datastream ds( buf, s ); fc::unsigned_int which; @@ -119,7 +119,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this private: // called from amqp thread - void handle_message( const eosio::amqp::delivery_tag_t& delivery_tag, chain::packed_transaction_ptr trx ) { + void handle_message( const eosio::amqp_handler::delivery_tag_t& delivery_tag, chain::packed_transaction_ptr trx ) { const auto& tid = trx->id(); dlog( "received packed_transaction ${id}", ("id", tid) ); @@ -250,7 +250,7 @@ void amqp_trx_plugin::plugin_startup() { elog( "amqp error: ${e}", ("e", err) ); app().quit(); }, - [&]( const eosio::amqp::delivery_tag_t& delivery_tag, const char* buf, size_t s ) { + [&]( const eosio::amqp_handler::delivery_tag_t& delivery_tag, const char* buf, size_t s ) { if( app().is_quiting() ) return; // leave non-ack my->consume_message( delivery_tag, buf, s ); } diff --git a/programs/cleos/main.cpp b/programs/cleos/main.cpp index f426971e0b5..f81f41fb84c 100644 --- a/programs/cleos/main.cpp +++ b/programs/cleos/main.cpp @@ -442,7 +442,7 @@ fc::variant push_transaction( signed_transaction& trx, const std::vector(msg).id(); string id = tid.str(); - eosio::amqp qp_trx( amqp_address, "trx", []( const std::string& err ) { + eosio::amqp_handler qp_trx( amqp_address, "trx", []( const std::string& err ) { std::cerr << "AMQP trx error: " << err << std::endl; exit( 1 ); } ); diff --git a/programs/rodeos/streams/rabbitmq.hpp b/programs/rodeos/streams/rabbitmq.hpp index 78867222816..38165821119 100644 --- a/programs/rodeos/streams/rabbitmq.hpp +++ b/programs/rodeos/streams/rabbitmq.hpp @@ -35,7 +35,7 @@ class rabbitmq : public stream_handler { { ilog("Connecting to RabbitMQ address ${a} - Queue: ${q}...", ("a", address)( "q", queue_name_)); bool error = false; - eosio::amqp declare_queue( address_, queue_name_, [&error](const std::string& err){ + eosio::amqp_handler declare_queue( address_, queue_name_, [&error](const std::string& err){ elog("AMQP Queue error: ${e}", ("e", err)); appbase::app().quit(); error = true; @@ -54,7 +54,7 @@ class rabbitmq : public stream_handler { { ilog("Connecting to RabbitMQ address ${a} - Exchange: ${e}...", ("a", address)( "e", exchange_name_)); bool error = false; - eosio::amqp declare_exchange( address_, exchange_name_, exchange_type, [&error](const std::string& err){ + eosio::amqp_handler declare_exchange( address_, exchange_name_, exchange_type, [&error](const std::string& err){ elog("AMQP Exchange error: ${e}", ("e", err)); appbase::app().quit(); error = true; From 1f3d09c3f579506f4a4b4dfb14821ea475ca2994 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 12 Mar 2021 08:37:08 -0600 Subject: [PATCH 05/11] Make amqp queue names configurable --- plugins/amqp_trace_plugin/amqp_trace_plugin.cpp | 8 ++++++-- .../eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp | 1 + plugins/amqp_trx_plugin/amqp_trx_plugin.cpp | 10 ++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp index 9719fe2f489..421cc155edc 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp @@ -30,7 +30,9 @@ void amqp_trace_plugin::set_program_options(options_description& cli, options_de auto op = cfg.add_options(); op("amqp-trace-address", bpo::value(), "AMQP address: Format: amqp://USER:PASSWORD@ADDRESS:PORT\n" - "Will publish to 'trace' queue."); + "Will publish to amqp-trace-queue-name ('trace') queue."); + op("amqp-trace-queue-name", bpo::value()->default_value("trace"), + "AMQP queue to publish transaction traces."); op("amqp-trace-exchange", bpo::value()->default_value(""), "Existing AMQP exchange to send transaction trace messages."); } @@ -39,6 +41,8 @@ void amqp_trace_plugin::plugin_initialize(const variables_map& options) { try { EOS_ASSERT( options.count("amqp-trace-address"), chain::plugin_config_exception, "amqp-trace-address required" ); my->amqp_trace_address = options.at("amqp-trace-address").as(); + my->amqp_trace_queue_name = options.at("amqp-trace-queue-name").as(); + EOS_ASSERT( !my->amqp_trace_queue_name.empty(), chain::plugin_config_exception, "amqp-trace-queue-name required" ); my->amqp_trace_exchange = options.at("amqp-trace-exchange").as(); } FC_LOG_AND_RETHROW() @@ -53,7 +57,7 @@ void amqp_trace_plugin::plugin_startup() { const boost::filesystem::path trace_data_file_path = appbase::app().data_dir() / "amqp_trace_plugin" / "trace.bin"; - my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, "trace", trace_data_file_path ); + my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name, trace_data_file_path ); auto chain_plug = app().find_plugin(); EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" ); diff --git a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp index a2832cd3c60..f8cb33f5200 100644 --- a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp +++ b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp @@ -10,6 +10,7 @@ struct amqp_trace_plugin_impl : std::enable_shared_from_this amqp_trace; std::string amqp_trace_address; + std::string amqp_trace_queue_name; std::string amqp_trace_exchange; bool started = false; diff --git a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp index 9d3641bb2cd..a642df2a18c 100644 --- a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp +++ b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp @@ -59,6 +59,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this std::optional amqp_trx; std::string amqp_trx_address; + std::string amqp_trx_queue; ack_mode acked = ack_mode::executed; std::map tracked_delivery_tags; // block, highest delivery_tag for block uint32_t trx_processing_queue_size = 1000; @@ -176,7 +177,9 @@ void amqp_trx_plugin::set_program_options(options_description& cli, options_desc auto op = cfg.add_options(); op("amqp-trx-address", bpo::value(), "AMQP address: Format: amqp://USER:PASSWORD@ADDRESS:PORT\n" - "Will consume from 'trx' queue."); + "Will consume from amqp-trx-queue-name ('trx') queue."); + op("amqp-trx-queue-name", bpo::value()->default_value("trx"), + "AMQP queue to consume transactions from."); op("amqp-trx-queue-size", bpo::value()->default_value(my->trx_processing_queue_size), "The maximum number of transactions to pull from the AMQP queue at any given time."); op("amqp-trx-speculative-execution", bpo::bool_switch()->default_value(false), @@ -196,6 +199,9 @@ void amqp_trx_plugin::plugin_initialize(const variables_map& options) { EOS_ASSERT( options.count("amqp-trx-address"), chain::plugin_config_exception, "amqp-trx-address required" ); my->amqp_trx_address = options.at("amqp-trx-address").as(); + my->amqp_trx_queue = options.at("amqp-trx-queue-name").as(); + EOS_ASSERT( !my->amqp_trx_queue.empty(), chain::plugin_config_exception, "amqp-trx-queue-name required" ); + my->acked = options.at("amqp-trx-ack-mode").as(); my->trx_processing_queue_size = options.at("amqp-trx-queue-size").as(); @@ -245,7 +251,7 @@ void amqp_trx_plugin::plugin_startup() { my->trx_queue_ptr->run(); - my->amqp_trx.emplace( my->amqp_trx_address, "trx", + my->amqp_trx.emplace( my->amqp_trx_address, my->amqp_trx_queue, [](const std::string& err) { elog( "amqp error: ${e}", ("e", err) ); app().quit(); From dd8efc3014f9055b3100197fa5ff530790b209ac Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 12 Mar 2021 11:07:52 -0600 Subject: [PATCH 06/11] Add shutdown hook to reliable_amqp_publisher --- .../eosio/amqp/reliable_amqp_publisher.hpp | 11 +++++++--- libraries/amqp/reliable_amqp_publisher.cpp | 22 +++++++++++++++---- .../amqp_compressed_proof_plugin.cpp | 7 +++++- .../amqp_trace_plugin/amqp_trace_plugin.cpp | 6 ++++- .../amqp_witness_plugin.cpp | 8 ++++++- programs/rodeos/streams/rabbitmq.hpp | 12 ++++++++-- 6 files changed, 54 insertions(+), 12 deletions(-) diff --git a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp index 80c41f630db..f6f8f0ad37e 100644 --- a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp +++ b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp @@ -16,20 +16,25 @@ namespace eosio { * to already exist. Publishing to a non-existing exchange will effectively block * reliable_amqp_publisher's queue until the exchange exists because the queue will be unable to make progress. * - * At a large enough uncomfirmed queue depth (currently on the order of 1 million entries) reliable_amqp_publisher - * will start dropping messages after logging an error. + * At a large enough unconfirmed queue depth (currently on the order of 1 million entries) reliable_amqp_publisher + * will call fatal_error() and then start dropping messages after logging an error. */ class reliable_amqp_publisher { public: + // Called from amqp thread when unconfirmed queue depth about to be exceeded. + using error_callback_t = std::function; + /// Create a reliable queue to the given server publishing to the given exchange /// \param server_url server url as amqp://... /// \param exchange the exchange to publish to /// \param routing_key on published messages, used if no routing_key provided for publish_message.. calls /// \param unconfirmed_path path to save/load unconfirmed message to be tried again after stop/start + /// \param on_fatal_error called from amqp thread when unconfirmed queue depth about to be exceeded. /// \param message_id optional message id to send with each message reliable_amqp_publisher(const std::string& server_url, const std::string& exchange, const std::string& routing_key, - const boost::filesystem::path& unconfirmed_path, const std::optional& message_id = {}); + const boost::filesystem::path& unconfirmed_path, error_callback_t on_fatal_error, + const std::optional& message_id = {}); /// Publish a message. May be called from any thread. /// \param t serializable object diff --git a/libraries/amqp/reliable_amqp_publisher.cpp b/libraries/amqp/reliable_amqp_publisher.cpp index fa364377893..6a6f242bc89 100644 --- a/libraries/amqp/reliable_amqp_publisher.cpp +++ b/libraries/amqp/reliable_amqp_publisher.cpp @@ -25,7 +25,9 @@ struct reliable_amqp_publisher_handler; struct reliable_amqp_publisher_impl { reliable_amqp_publisher_impl(const std::string& url, const std::string& exchange, const std::string& routing_key, - const boost::filesystem::path& unconfirmed_path, const std::optional& message_id); + const boost::filesystem::path& unconfirmed_path, + reliable_amqp_publisher::error_callback_t on_fatal_error, + const std::optional& message_id); ~reliable_amqp_publisher_impl(); void pump_queue(); void publish_message_raw(std::vector&& data); @@ -52,6 +54,7 @@ struct reliable_amqp_publisher_impl { boost::asio::io_context ctx; single_channel_retrying_amqp_connection retrying_connection; + reliable_amqp_publisher::error_callback_t on_fatal_error; const boost::filesystem::path data_file_path; @@ -64,8 +67,11 @@ struct reliable_amqp_publisher_impl { }; reliable_amqp_publisher_impl::reliable_amqp_publisher_impl(const std::string& url, const std::string& exchange, const std::string& routing_key, - const boost::filesystem::path& unconfirmed_path, const std::optional& message_id) : + const boost::filesystem::path& unconfirmed_path, + reliable_amqp_publisher::error_callback_t on_fatal_error, + const std::optional& message_id) : retrying_connection(ctx, url, [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();}), + on_fatal_error(std::move(on_fatal_error)), data_file_path(unconfirmed_path), exchange(exchange), routing_key(routing_key), message_id(message_id) { boost::system::error_code ec; @@ -79,6 +85,8 @@ reliable_amqp_publisher_impl::reliable_amqp_publisher_impl(const std::string& ur fc::raw::unpack(file, message_deque); if( !message_deque.empty() ) batch_num = message_deque.back().num; + ilog("AMQP existing persistent file ${f} loaded with ${c} unconfirmed messages for ${a} publishing to \"${e}\".", + ("f", data_file_path.generic_string())("a", retrying_connection.address())("e", exchange)); } FC_RETHROW_EXCEPTIONS(error, "Failed to load previously unconfirmed AMQP messages from ${f}", ("f", (fc::path)data_file_path)); } else { @@ -182,6 +190,10 @@ void reliable_amqp_publisher_impl::pump_queue() { bool reliable_amqp_publisher_impl::verify_max_queue_size() { constexpr unsigned max_queued_messages = 1u << 20u; + if(message_deque.size() > max_queued_messages - 1000) { + std::string err = "AMQP publishing to " + exchange + " has reached " + std::to_string(message_deque.size()) + " unconfirmed messages"; + on_fatal_error(err); + } if(message_deque.size() > max_queued_messages) { if(logged_exceeded_max_depth == false) elog("AMQP connection ${a} publishing to \"${e}\" has reached ${max} unconfirmed messages; dropping messages", @@ -249,8 +261,10 @@ void reliable_amqp_publisher_impl::publish_message_direct(const std::string& rk, reliable_amqp_publisher::reliable_amqp_publisher(const std::string& url, const std::string& exchange, const std::string& routing_key, - const boost::filesystem::path& unconfirmed_path, const std::optional& message_id) : - my(new reliable_amqp_publisher_impl(url, exchange, routing_key, unconfirmed_path, message_id)) {} + const boost::filesystem::path& unconfirmed_path, + reliable_amqp_publisher::error_callback_t on_fatal_error, + const std::optional& message_id) : + my(new reliable_amqp_publisher_impl(url, exchange, routing_key, unconfirmed_path, std::move(on_fatal_error), message_id)) {} void reliable_amqp_publisher::publish_message_raw(std::vector&& data) { my->publish_message_raw( std::move( data ) ); diff --git a/plugins/amqp_compressed_proof_plugin/amqp_compressed_proof_plugin.cpp b/plugins/amqp_compressed_proof_plugin/amqp_compressed_proof_plugin.cpp index 96a064e925c..edbd11ded48 100644 --- a/plugins/amqp_compressed_proof_plugin/amqp_compressed_proof_plugin.cpp +++ b/plugins/amqp_compressed_proof_plugin/amqp_compressed_proof_plugin.cpp @@ -253,7 +253,12 @@ void amqp_compressed_proof_plugin::plugin_initialize(const variables_map& option EOS_ASSERT(filtered_receivers.size(), chain::plugin_config_exception, "Cannot have empty filter list for compressed-proof"); const boost::filesystem::path amqp_unconfimed_file = dir / (file_prefix + "-unconfirmed-"s + name + ".bin"s); - reliable_amqp_publisher& publisher = my->publishers.emplace_back(tokens[2], tokens[3], "", amqp_unconfimed_file, "eosio.node.compressed_proof_v0"); + reliable_amqp_publisher& publisher = my->publishers.emplace_back(tokens[2], tokens[3], "", amqp_unconfimed_file, + [](const std::string& err){ + elog("AMQP fatal error: ${e}", ("e", err)); + appbase::app().quit(); + }, + "eosio.node.compressed_proof_v0"); //the presence of an empty set means any action on that receiver std::map> filter_on_names_and_actions; diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp index 421cc155edc..64b80aca46e 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp @@ -57,7 +57,11 @@ void amqp_trace_plugin::plugin_startup() { const boost::filesystem::path trace_data_file_path = appbase::app().data_dir() / "amqp_trace_plugin" / "trace.bin"; - my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name, trace_data_file_path ); + my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name, trace_data_file_path, + []( const std::string& err ) { + elog( "AMQP fatal error: ${e}", ("e", err) ); + appbase::app().quit(); + } ); auto chain_plug = app().find_plugin(); EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" ); diff --git a/plugins/amqp_witness_plugin/amqp_witness_plugin.cpp b/plugins/amqp_witness_plugin/amqp_witness_plugin.cpp index 7f51c6f7d3c..9400ddaf16e 100644 --- a/plugins/amqp_witness_plugin/amqp_witness_plugin.cpp +++ b/plugins/amqp_witness_plugin/amqp_witness_plugin.cpp @@ -44,7 +44,13 @@ void amqp_witness_plugin::plugin_startup() { if(boost::filesystem::exists(witness_data_file_path) && my->delete_previous) boost::filesystem::remove(witness_data_file_path); - my->rqueue = std::make_unique(my->amqp_server, my->exchange, my->routing_key, witness_data_file_path, "eosio.node.witness_v0"); + my->rqueue = std::make_unique(my->amqp_server, my->exchange, my->routing_key, + witness_data_file_path, + [](const std::string& err){ + elog("AMQP fatal error: ${e}", ("e", err)); + appbase::app().quit(); + }, + "eosio.node.witness_v0"); app().get_plugin().add_on_witness_sig([rqueue=my->rqueue.get()](const chain::block_state_ptr& bsp, const chain::signature_type& sig) { rqueue->post_on_io_context([rqueue, bsp, sig]() { diff --git a/programs/rodeos/streams/rabbitmq.hpp b/programs/rodeos/streams/rabbitmq.hpp index 38165821119..9509993aaf6 100644 --- a/programs/rodeos/streams/rabbitmq.hpp +++ b/programs/rodeos/streams/rabbitmq.hpp @@ -41,7 +41,11 @@ class rabbitmq : public stream_handler { error = true; } ); if( error ) return; - amqp_publisher_ = std::make_shared(address_, "", "", unconfirmed_path_); + amqp_publisher_ = std::make_shared(address_, "", "", unconfirmed_path_, + [](const std::string& err) { + elog("AMQP fatal error: ${e}", ("e", err)); + appbase::app().quit(); + }); } rabbitmq(std::vector routes, const AMQP::Address& address, bool publish_immediately, @@ -60,7 +64,11 @@ class rabbitmq : public stream_handler { error = true; } ); if( error ) return; - amqp_publisher_ = std::make_shared( address_, exchange_name_, "", unconfirmed_path_ ); + amqp_publisher_ = std::make_shared(address_, exchange_name_, "", unconfirmed_path_, + [](const std::string& err){ + elog("AMQP fatal error: ${e}", ("e", err)); + appbase::app().quit(); + }); } const std::vector& get_routes() const override { return routes_; } From 6c1d997247a0d7fdfc77b8e4ac3d270d7da0d323 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 12 Mar 2021 14:13:15 -0600 Subject: [PATCH 07/11] Add error callback for direct publish. Add support for correlation_id to queued messages. --- .../eosio/amqp/reliable_amqp_publisher.hpp | 9 +++- libraries/amqp/reliable_amqp_publisher.cpp | 51 ++++++++++++++----- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp index f6f8f0ad37e..08b5ad33ef8 100644 --- a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp +++ b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp @@ -44,8 +44,13 @@ class reliable_amqp_publisher { publish_message_raw(std::move(v)); } + /// \param data message to send void publish_message_raw(std::vector&& data); + /// \param correlation_id if not empty() sets as correlation id of the message envelope + /// \param data message to send + void publish_message_raw(const std::string& correlation_id, std::vector&& data); + /// Publish messages. May be called from any thread. /// \param queue set of messages to send in one transaction void publish_messages_raw(std::deque>>&& queue); @@ -55,7 +60,9 @@ class reliable_amqp_publisher { /// \param routing_key if empty() uses class provided default routing_key /// \param correlation_id if not empty() sets as correlation id of the message envelope /// \param data message to send - void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, std::vector data); + /// \param on_error() call from AMQP thread if unable to directly publish (e.g. not currently connected) + void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, + std::vector data, error_callback_t on_error); /// reliable_amqp_publisher runs its own thread. In some cases it may be desirable to skip a needless thread jump /// when performing work. This method will allow submission of work to reliable_amqp_publisher's thread. diff --git a/libraries/amqp/reliable_amqp_publisher.cpp b/libraries/amqp/reliable_amqp_publisher.cpp index 6a6f242bc89..8ee04349a9a 100644 --- a/libraries/amqp/reliable_amqp_publisher.cpp +++ b/libraries/amqp/reliable_amqp_publisher.cpp @@ -21,8 +21,6 @@ namespace eosio { -struct reliable_amqp_publisher_handler; - struct reliable_amqp_publisher_impl { reliable_amqp_publisher_impl(const std::string& url, const std::string& exchange, const std::string& routing_key, const boost::filesystem::path& unconfirmed_path, @@ -31,8 +29,10 @@ struct reliable_amqp_publisher_impl { ~reliable_amqp_publisher_impl(); void pump_queue(); void publish_message_raw(std::vector&& data); + void publish_message_raw(const std::string& correlation_id, std::vector&& data); void publish_messages_raw(std::deque>>&& queue); - void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, std::vector data); + void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, + std::vector data, reliable_amqp_publisher::error_callback_t on_error); bool verify_max_queue_size(); void channel_ready(AMQP::Channel* channel); @@ -46,6 +46,7 @@ struct reliable_amqp_publisher_impl { struct amqp_message { fc::unsigned_int num = 0; ///< unique numbers indicates amqp transaction set (reset on clean restart) std::string routing_key; + std::string correlation_id; std::vector data; }; std::deque message_deque; @@ -86,7 +87,7 @@ reliable_amqp_publisher_impl::reliable_amqp_publisher_impl(const std::string& ur if( !message_deque.empty() ) batch_num = message_deque.back().num; ilog("AMQP existing persistent file ${f} loaded with ${c} unconfirmed messages for ${a} publishing to \"${e}\".", - ("f", data_file_path.generic_string())("a", retrying_connection.address())("e", exchange)); + ("f", data_file_path.generic_string())("c",message_deque.size())("a", retrying_connection.address())("e", exchange)); } FC_RETHROW_EXCEPTIONS(error, "Failed to load previously unconfirmed AMQP messages from ${f}", ("f", (fc::path)data_file_path)); } else { @@ -167,6 +168,8 @@ void reliable_amqp_publisher_impl::pump_queue() { envelope.setPersistent(); if(message_id) envelope.setMessageID(*message_id); + if(!msg.correlation_id.empty()) + envelope.setCorrelationID(msg.correlation_id); channel->publish(exchange, msg.routing_key.empty() ? routing_key : msg.routing_key, envelope); prev = msg.num; @@ -214,7 +217,21 @@ void reliable_amqp_publisher_impl::publish_message_raw(std::vector&& data) if( !verify_max_queue_size() ) return; - message_deque.emplace_back(amqp_message{0, "", std::move(data)}); + message_deque.emplace_back(amqp_message{0, "", "", std::move(data)}); + pump_queue(); +} + +void reliable_amqp_publisher_impl::publish_message_raw(const std::string& correlation_id, std::vector&& data) { + if(!ctx.get_executor().running_in_this_thread()) { + boost::asio::post(user_submitted_work_strand, [this, d=std::move(data), id=correlation_id]() mutable { + publish_message_raw(id, std::move(d)); + }); + return; + } + + if( !verify_max_queue_size() ) return; + + message_deque.emplace_back(amqp_message{0, "", correlation_id, std::move(data)}); pump_queue(); } @@ -231,22 +248,26 @@ void reliable_amqp_publisher_impl::publish_messages_raw(std::deque data) { +void reliable_amqp_publisher_impl::publish_message_direct(const std::string& rk, const std::string& correlation_id, + std::vector data, + reliable_amqp_publisher::error_callback_t on_error) { if(!ctx.get_executor().running_in_this_thread()) { - boost::asio::post(user_submitted_work_strand, [this, rk, correlation_id, d=std::move(data)]() mutable { - publish_message_direct(rk, correlation_id, std::move(d)); + boost::asio::post(user_submitted_work_strand, + [this, rk, correlation_id, d=std::move(data), on_e=std::move(on_error)]() mutable { + publish_message_direct(rk, correlation_id, std::move(d), std::move(on_e)); }); return; } if(stopping || !channel) { - elog( "AMQP connection ${a} to ${e} not connected dropping message ${rk}", - ("a", retrying_connection.address())("e", exchange)("rk", rk)); + std::string err = "AMQP connection " + fc::variant(retrying_connection.address()).as_string() + + " to " + exchange + " not connected, dropping message " + rk; + on_error(err); return; } @@ -270,13 +291,17 @@ void reliable_amqp_publisher::publish_message_raw(std::vector&& data) { my->publish_message_raw( std::move( data ) ); } +void reliable_amqp_publisher::publish_message_raw(const std::string& correlation_id, std::vector&& data) { + my->publish_message_raw( correlation_id, std::move( data ) ); +} + void reliable_amqp_publisher::publish_messages_raw(std::deque>>&& queue) { my->publish_messages_raw( std::move( queue ) ); } void reliable_amqp_publisher::publish_message_direct(const std::string& routing_key, const std::string& correlation_id, - std::vector data) { - my->publish_message_direct( routing_key, correlation_id, std::move(data) ); + std::vector data, error_callback_t on_error) { + my->publish_message_direct( routing_key, correlation_id, std::move(data), std::move(on_error) ); } void reliable_amqp_publisher::post_on_io_context(std::function&& f) { From cb30f0e4a5338a37ffd047d7a6f6a391610c6a31 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 12 Mar 2021 14:15:17 -0600 Subject: [PATCH 08/11] Add configurable reliable mode to amqp_trace_plugin --- .../amqp_trace_plugin/amqp_trace_plugin.cpp | 21 ++++++- .../amqp_trace_plugin_impl.cpp | 55 +++++++++++++++++-- .../amqp_trace_plugin_impl.hpp | 10 ++++ programs/rodeos/streams/rabbitmq.hpp | 10 +++- 4 files changed, 86 insertions(+), 10 deletions(-) diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp index 64b80aca46e..cfac0f3a4d0 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin.cpp @@ -1,7 +1,7 @@ #include #include -#include #include +#include #include #include @@ -18,7 +18,9 @@ static appbase::abstract_plugin& amqp_trace_plugin_ = appbase::app().register_pl namespace eosio { amqp_trace_plugin::amqp_trace_plugin() -: my(std::make_shared()) {} +: my(std::make_shared()) { + app().register_config_type(); +} amqp_trace_plugin::~amqp_trace_plugin() {} @@ -35,6 +37,10 @@ void amqp_trace_plugin::set_program_options(options_description& cli, options_de "AMQP queue to publish transaction traces."); op("amqp-trace-exchange", bpo::value()->default_value(""), "Existing AMQP exchange to send transaction trace messages."); + op("amqp-trace-reliable-mode", bpo::value()->default_value(amqp_trace_plugin_impl::reliable_mode::queue), + "Reliable mode 'exit', exit application on any AMQP publish error.\n" + "Reliable mode 'queue', queue transaction traces to send to AMQP on connection establishment.\n" + "Reliable mode 'log', log an error and drop message when unable to directly publish to AMQP."); } void amqp_trace_plugin::plugin_initialize(const variables_map& options) { @@ -44,6 +50,7 @@ void amqp_trace_plugin::plugin_initialize(const variables_map& options) { my->amqp_trace_queue_name = options.at("amqp-trace-queue-name").as(); EOS_ASSERT( !my->amqp_trace_queue_name.empty(), chain::plugin_config_exception, "amqp-trace-queue-name required" ); my->amqp_trace_exchange = options.at("amqp-trace-exchange").as(); + my->pub_reliable_mode = options.at("amqp-trace-reliable-mode").as(); } FC_LOG_AND_RETHROW() } @@ -55,7 +62,15 @@ void amqp_trace_plugin::plugin_startup() { ilog( "Starting amqp_trace_plugin" ); my->started = true; - const boost::filesystem::path trace_data_file_path = appbase::app().data_dir() / "amqp_trace_plugin" / "trace.bin"; + const boost::filesystem::path trace_data_dir_path = appbase::app().data_dir() / "amqp_trace_plugin"; + const boost::filesystem::path trace_data_file_path = trace_data_dir_path / "trace.bin"; + if( my->pub_reliable_mode != amqp_trace_plugin_impl::reliable_mode::queue ) { + EOS_ASSERT( !fc::exists(trace_data_file_path), chain::plugin_config_exception, + "Existing queue file when amqp-trace-reliable-mode != 'queue': ${f}", + ("f", trace_data_file_path.generic_string()) ); + } else if( auto resmon_plugin = app().find_plugin() ) { + resmon_plugin->monitor_directory( trace_data_dir_path ); + } my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name, trace_data_file_path, []( const std::string& err ) { diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp index e7f290ee57b..3e9c5942eee 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp @@ -3,21 +3,56 @@ #include #include #include - #include #include +#include namespace eosio { +std::istream& operator>>(std::istream& in, amqp_trace_plugin_impl::reliable_mode& m) { + std::string s; + in >> s; + if( s == "exit" ) + m = amqp_trace_plugin_impl::reliable_mode::exit; + else if( s == "log" ) + m = amqp_trace_plugin_impl::reliable_mode::log; + else if( s == "queue" ) + m = amqp_trace_plugin_impl::reliable_mode::queue; + else + in.setstate( std::ios_base::failbit ); + return in; +} + +std::ostream& operator<<(std::ostream& osm, amqp_trace_plugin_impl::reliable_mode m) { + if( m == amqp_trace_plugin_impl::reliable_mode::exit ) + osm << "exit"; + else if( m == amqp_trace_plugin_impl::reliable_mode::log ) + osm << "log"; + else if( m == amqp_trace_plugin_impl::reliable_mode::queue ) + osm << "queue"; + return osm; +} + // called from any thread void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code, std::string error_message ) { try { // reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns - amqp_trace->post_on_io_context([&amqp_trace = *amqp_trace, tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable { + amqp_trace->post_on_io_context( + [&amqp_trace = *amqp_trace, mode=pub_reliable_mode, + tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable { transaction_trace_msg msg{transaction_trace_exception{error_code}}; std::get( msg ).error_message = std::move( em ); std::vector buf = convert_to_bin( msg ); - amqp_trace.publish_message_direct(std::string(), tid, std::move(buf)); + if( mode == reliable_mode::queue) { + amqp_trace.publish_message_raw( tid, std::move( buf ) ); + } else { + amqp_trace.publish_message_direct( std::string(), tid, std::move( buf ), + [mode]( const std::string& err ) { + elog( "AMQP direct message error: ${e}", ("e", err) ); + if( mode == reliable_mode::exit ) + appbase::app().quit(); + } ); + } }); } FC_LOG_AND_DROP() @@ -37,7 +72,8 @@ void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr const chain::transaction_trace_ptr& trace ) { try { // reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns - amqp_trace->post_on_io_context([&amqp_trace = *amqp_trace, trx, trace]() { + amqp_trace->post_on_io_context( + [&amqp_trace = *amqp_trace, trx, trace, mode=pub_reliable_mode]() { if( !trace->except ) { dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) ); } else { @@ -45,7 +81,16 @@ void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr } transaction_trace_msg msg{ eosio::state_history::convert( *trace ) }; std::vector buf = convert_to_bin( msg ); - amqp_trace.publish_message_direct(std::string(), trx->id(), std::move(buf)); + if( mode == reliable_mode::queue) { + amqp_trace.publish_message_raw( trx->id(), std::move( buf ) ); + } else { + amqp_trace.publish_message_direct( std::string(), trx->id(), std::move( buf ), + [mode]( const std::string& err ) { + elog( "AMQP direct message error: ${e}", ("e", err) ); + if( mode == reliable_mode::exit ) + appbase::app().quit(); + } ); + } }); } FC_LOG_AND_DROP() diff --git a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp index f8cb33f5200..3dda1cd6736 100644 --- a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp +++ b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp @@ -7,11 +7,18 @@ namespace eosio { struct amqp_trace_plugin_impl : std::enable_shared_from_this { + enum class reliable_mode { + exit, + log, + queue + }; + std::optional amqp_trace; std::string amqp_trace_address; std::string amqp_trace_queue_name; std::string amqp_trace_exchange; + reliable_mode pub_reliable_mode; bool started = false; public: @@ -28,4 +35,7 @@ struct amqp_trace_plugin_impl : std::enable_shared_from_this>(std::istream& in, amqp_trace_plugin_impl::reliable_mode& m); +std::ostream& operator<<(std::ostream& osm, amqp_trace_plugin_impl::reliable_mode m); + } // namespace eosio diff --git a/programs/rodeos/streams/rabbitmq.hpp b/programs/rodeos/streams/rabbitmq.hpp index 9509993aaf6..2ed57be53ec 100644 --- a/programs/rodeos/streams/rabbitmq.hpp +++ b/programs/rodeos/streams/rabbitmq.hpp @@ -87,13 +87,19 @@ class rabbitmq : public stream_handler { void publish(const std::vector& data, const eosio::name& routing_key) override { if (exchange_name_.empty()) { if( publish_immediately_ ) { - amqp_publisher_->publish_message_direct( queue_name_, std::string(), data ); + amqp_publisher_->publish_message_direct( queue_name_, std::string(), data, + []( const std::string& err ) { + elog( "AMQP direct message error: ${e}", ("e", err) ); + } ); } else { queue_.emplace_back( std::make_pair( queue_name_, data ) ); } } else { if( publish_immediately_ ) { - amqp_publisher_->publish_message_direct( routing_key.to_string(), std::string(), data ); + amqp_publisher_->publish_message_direct( routing_key.to_string(), std::string(), data, + []( const std::string& err ) { + elog( "AMQP direct message error: ${e}", ("e", err) ); + } ); } else { queue_.emplace_back( std::make_pair( routing_key.to_string(), data ) ); } From d29c8b31cc6774875904d4b46beddcbfd1eda4b6 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 12 Mar 2021 15:44:03 -0600 Subject: [PATCH 09/11] Add correlation_id to reflection --- libraries/amqp/reliable_amqp_publisher.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/amqp/reliable_amqp_publisher.cpp b/libraries/amqp/reliable_amqp_publisher.cpp index 8ee04349a9a..e84a17cb8eb 100644 --- a/libraries/amqp/reliable_amqp_publisher.cpp +++ b/libraries/amqp/reliable_amqp_publisher.cpp @@ -312,4 +312,4 @@ reliable_amqp_publisher::~reliable_amqp_publisher() = default; } -FC_REFLECT( eosio::reliable_amqp_publisher_impl::amqp_message, (num)(routing_key)(data) ) +FC_REFLECT( eosio::reliable_amqp_publisher_impl::amqp_message, (num)(routing_key)(correlation_id)(data) ) From 78a92e5b4a2a2cdd52c69b8561d3523a3cff7c90 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 17 Mar 2021 10:51:50 -0500 Subject: [PATCH 10/11] Modify reliable_amqp_publisher to call on_fatal_error instead of dropping messages when max queue size it hit --- .../eosio/amqp/reliable_amqp_publisher.hpp | 4 ++-- libraries/amqp/reliable_amqp_publisher.cpp | 24 +++++++------------ .../amqp_trace_plugin_impl.cpp | 2 +- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp index 08b5ad33ef8..e20a4a0896b 100644 --- a/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp +++ b/libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp @@ -17,7 +17,7 @@ namespace eosio { * reliable_amqp_publisher's queue until the exchange exists because the queue will be unable to make progress. * * At a large enough unconfirmed queue depth (currently on the order of 1 million entries) reliable_amqp_publisher - * will call fatal_error() and then start dropping messages after logging an error. + * will call on_fatal_error() but continue queuing messages. */ class reliable_amqp_publisher { @@ -30,7 +30,7 @@ class reliable_amqp_publisher { /// \param exchange the exchange to publish to /// \param routing_key on published messages, used if no routing_key provided for publish_message.. calls /// \param unconfirmed_path path to save/load unconfirmed message to be tried again after stop/start - /// \param on_fatal_error called from amqp thread when unconfirmed queue depth about to be exceeded. + /// \param on_fatal_error called from AMQP thread when unconfirmed queue depth is exceeded. /// \param message_id optional message id to send with each message reliable_amqp_publisher(const std::string& server_url, const std::string& exchange, const std::string& routing_key, const boost::filesystem::path& unconfirmed_path, error_callback_t on_fatal_error, diff --git a/libraries/amqp/reliable_amqp_publisher.cpp b/libraries/amqp/reliable_amqp_publisher.cpp index e84a17cb8eb..7ea79eac5c2 100644 --- a/libraries/amqp/reliable_amqp_publisher.cpp +++ b/libraries/amqp/reliable_amqp_publisher.cpp @@ -33,7 +33,7 @@ struct reliable_amqp_publisher_impl { void publish_messages_raw(std::deque>>&& queue); void publish_message_direct(const std::string& routing_key, const std::string& correlation_id, std::vector data, reliable_amqp_publisher::error_callback_t on_error); - bool verify_max_queue_size(); + void verify_max_queue_size(); void channel_ready(AMQP::Channel* channel); void channel_failed(); @@ -62,7 +62,6 @@ struct reliable_amqp_publisher_impl { const std::string exchange; const std::string routing_key; const std::optional message_id; - bool logged_exceeded_max_depth = false; boost::asio::strand user_submitted_work_strand = boost::asio::make_strand(ctx); }; @@ -177,7 +176,6 @@ void reliable_amqp_publisher_impl::pump_queue() { channel->commitTransaction().onSuccess([this](){ message_deque.erase(message_deque.begin(), message_deque.begin()+in_flight); - logged_exceeded_max_depth = false; }) .onFinalize([this]() { in_flight = 0; @@ -190,21 +188,15 @@ void reliable_amqp_publisher_impl::pump_queue() { }); } -bool reliable_amqp_publisher_impl::verify_max_queue_size() { +void reliable_amqp_publisher_impl::verify_max_queue_size() { constexpr unsigned max_queued_messages = 1u << 20u; - if(message_deque.size() > max_queued_messages - 1000) { + if(message_deque.size() > max_queued_messages) { + elog("AMQP connection ${a} publishing to \"${e}\" has reached ${max} unconfirmed messages", + ("a", retrying_connection.address())("e", exchange)("max", max_queued_messages)); std::string err = "AMQP publishing to " + exchange + " has reached " + std::to_string(message_deque.size()) + " unconfirmed messages"; on_fatal_error(err); } - if(message_deque.size() > max_queued_messages) { - if(logged_exceeded_max_depth == false) - elog("AMQP connection ${a} publishing to \"${e}\" has reached ${max} unconfirmed messages; dropping messages", - ("a", retrying_connection.address())("e", exchange)("max", max_queued_messages)); - logged_exceeded_max_depth = true; - return false; - } - return true; } void reliable_amqp_publisher_impl::publish_message_raw(std::vector&& data) { @@ -215,7 +207,7 @@ void reliable_amqp_publisher_impl::publish_message_raw(std::vector&& data) return; } - if( !verify_max_queue_size() ) return; + verify_max_queue_size(); message_deque.emplace_back(amqp_message{0, "", "", std::move(data)}); pump_queue(); @@ -229,7 +221,7 @@ void reliable_amqp_publisher_impl::publish_message_raw(const std::string& correl return; } - if( !verify_max_queue_size() ) return; + verify_max_queue_size(); message_deque.emplace_back(amqp_message{0, "", correlation_id, std::move(data)}); pump_queue(); @@ -243,7 +235,7 @@ void reliable_amqp_publisher_impl::publish_messages_raw(std::deque Date: Wed, 17 Mar 2021 13:07:38 -0500 Subject: [PATCH 11/11] Verify valid std::fuction before calling --- libraries/amqp/reliable_amqp_publisher.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/amqp/reliable_amqp_publisher.cpp b/libraries/amqp/reliable_amqp_publisher.cpp index 7ea79eac5c2..5181be4d10d 100644 --- a/libraries/amqp/reliable_amqp_publisher.cpp +++ b/libraries/amqp/reliable_amqp_publisher.cpp @@ -195,7 +195,7 @@ void reliable_amqp_publisher_impl::verify_max_queue_size() { elog("AMQP connection ${a} publishing to \"${e}\" has reached ${max} unconfirmed messages", ("a", retrying_connection.address())("e", exchange)("max", max_queued_messages)); std::string err = "AMQP publishing to " + exchange + " has reached " + std::to_string(message_deque.size()) + " unconfirmed messages"; - on_fatal_error(err); + if( on_fatal_error) on_fatal_error(err); } }