diff --git a/libraries/amqp/include/eosio/amqp/amqp_handler.hpp b/libraries/amqp/include/eosio/amqp/amqp_handler.hpp index ef5b6ac3b40..f55da066c6d 100644 --- a/libraries/amqp/include/eosio/amqp/amqp_handler.hpp +++ b/libraries/amqp/include/eosio/amqp/amqp_handler.hpp @@ -8,7 +8,6 @@ #include #include - namespace eosio { /// Designed to work with internal io_service running on a dedicated thread. @@ -24,61 +23,147 @@ class amqp_handler { // called from amqp thread on consume of message using on_consume_t = std::function; + /// Blocks until connection successful or error /// @param address AMQP address - /// @param name AMQP routing key /// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr - /// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed. - /// user required to ack/reject delivery_tag for each callback. - amqp_handler( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr ) - : amqp_handler( address, std::move(name), "", "", std::move(on_err), std::move(on_consume)) - {} + amqp_handler( const std::string& address, on_error_t on_err ) + : first_connect_() + , thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread + , amqp_connection_( thread_pool_.get_executor(), address, + [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} ) + , on_error_( std::move( on_err ) ) + { + ilog( "Connecting to AMQP address ${a} ...", ("a", amqp_connection_.address()) ); - /// @param address AMQP address - /// @param exchange_name AMQP exchange to send message to - /// @param exchange_type AMQP exchange type - /// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr - /// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed. - /// user required to ack/reject delivery_tag for each callback. - amqp_handler( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr ) - : amqp_handler( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume)) - {} + wait(); + } + + /// Declare a durable exchange, blocks until successful or error + /// @param exchange_name AMQP exchange to declare + /// @param exchange_type AMQP exchange type, empty (direct), "fanout", or "direct" anything else calls on_err + void declare_exchange(std::string exchange_name, const std::string& exchange_type) { + auto type = AMQP::direct; + if( !exchange_type.empty() ) { + if (exchange_type == "fanout") { + type = AMQP::fanout; + } else if (exchange_type != "direct") { + on_error( "AMQP unsupported exchange type: " + exchange_type ); + return; + } + } + + start_cond cond; + boost::asio::post( thread_pool_.get_executor(),[this, &cond, en=exchange_name, type]() { + try { + if( !channel_ ) { + elog( "AMQP not connected to channel ${a}", ("a", amqp_connection_.address()) ); + on_error( "AMQP not connected to channel" ); + return; + } + + auto& exchange = channel_->declareExchange( en, type, AMQP::durable); + exchange.onSuccess( [this, &cond, en]() { + dlog( "AMQP declare exchange successful, exchange ${e}, for ${a}", + ("e", en)("a", amqp_connection_.address()) ); + cond.set(); + } ); + exchange.onError([this, &cond, en](const char* error_message) { + elog( "AMQP unable to declare exchange ${e}, for ${a}", ("e", en)("a", amqp_connection_.address()) ); + on_error( std::string("AMQP Queue error: ") + error_message ); + cond.set(); + }); + return; + } FC_LOG_AND_DROP() + cond.set(); + } ); + + if( !cond.wait() ) { + elog( "AMQP timeout declaring exchange: ${q} for ${a}", ("q", exchange_name)("a", amqp_connection_.address()) ); + on_error( "AMQP timeout declaring exchange: " + exchange_name ); + } + } + + /// Declare a durable queue, blocks until successful or error + /// @param queue_name AMQP queue name to declare + void declare_queue(std::string queue_name) { + start_cond cond; + boost::asio::post( thread_pool_.get_executor(), [this, &cond, qn=queue_name]() mutable { + try { + if( !channel_ ) { + elog( "AMQP not connected to channel ${a}", ("a", amqp_connection_.address()) ); + on_error( "AMQP not connected to channel" ); + return; + } + + auto& queue = channel_->declareQueue( qn, AMQP::durable ); + queue.onSuccess( + [this, &cond]( const std::string& name, uint32_t message_count, uint32_t consumer_count ) { + dlog( "AMQP queue ${q}, messages: ${mc}, consumers: ${cc}, for ${a}", + ("q", name)("mc", message_count)("cc", consumer_count)("a", amqp_connection_.address()) ); + cond.set(); + } ); + queue.onError( [this, &cond, qn]( const char* error_message ) { + elog( "AMQP error declaring queue ${q} for ${a}", ("q", qn)("a", amqp_connection_.address()) ); + on_error( error_message ); + cond.set(); + } ); + return; + } FC_LOG_AND_DROP() + cond.set(); + } ); + + if( !cond.wait() ) { + elog( "AMQP timeout declaring queue: ${q} for ${a}", ("q", queue_name)("a", amqp_connection_.address()) ); + on_error( "AMQP timeout declaring queue: " + queue_name ); + } + } /// drain queue and stop thread ~amqp_handler() { stop(); } - /// publish to AMQP address with routing key name - // on_error() called if not connected - void publish( std::string exchange, std::string correlation_id, std::string reply_to, std::vector buf ) { + /// publish to AMQP, on_error() called if not connected + void publish( std::string exchange, std::string queue_name, std::string correlation_id, std::string reply_to, + std::vector buf ) { boost::asio::post( thread_pool_.get_executor(), - [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to), - buf=std::move(buf)]() mutable { + [my=this, exchange=std::move(exchange), qn=std::move(queue_name), + cid=std::move(correlation_id), rt=std::move(reply_to), buf=std::move(buf)]() mutable { + try { + if( !my->channel_ ) { + elog( "AMQP not connected to channel ${a}", ("a", my->amqp_connection_.address()) ); + my->on_error( "AMQP not connected to channel" ); + return; + } + AMQP::Envelope env( buf.data(), buf.size() ); if(!cid.empty()) env.setCorrelationID( std::move( cid ) ); if(!rt.empty()) env.setReplyTo( std::move( rt ) ); - if( my->channel_ ) - my->channel_->publish( exchange, my->name_, env, 0 ); - else - my->on_error( "AMQP Unable to publish: " + cid ); + my->channel_->publish( exchange, qn, env, 0 ); + } FC_LOG_AND_DROP() } ); } /// publish to AMQP calling f() -> std::vector on amqp thread // on_error() called if not connected template - void publish( std::string exchange, std::string correlation_id, std::string reply_to, Func f ) { + void publish( std::string exchange, std::string queue_name, std::string correlation_id, std::string reply_to, Func f ) { boost::asio::post( thread_pool_.get_executor(), - [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to), - f=std::move(f)]() mutable { + [my=this, exchange=std::move(exchange), qn=std::move(queue_name), + cid=std::move(correlation_id), rt=std::move(reply_to), f=std::move(f)]() mutable { + try { + if( !my->channel_ ) { + elog( "AMQP not connected to channel ${a}", ("a", my->amqp_connection_.address()) ); + my->on_error( "AMQP not connected to channel" ); + return; + } + std::vector buf = f(); AMQP::Envelope env( buf.data(), buf.size() ); if(!cid.empty()) env.setCorrelationID( std::move( cid ) ); if(!rt.empty()) env.setReplyTo( std::move(rt) ); - if( my->channel_ ) - my->channel_->publish( exchange, my->name_, env, 0 ); - else - my->on_error( "AMQP Unable to publish: " + cid ); + my->channel_->publish( exchange, qn, env, 0 ); + } FC_LOG_AND_DROP() } ); } @@ -141,110 +226,77 @@ class amqp_handler { } } - void start_consume(bool recover=true) { - boost::asio::post( thread_pool_.get_executor(), [&]() { - consuming_ = true; - init_consume(recover); - } ); + /// Start consuming from provided queue, does not block. + /// Only expected to be called once as changing queue is not supported, will call on_err if called more than once + /// @param queue_name name of queue to consume from, must pre-exist + /// @param on_consume callback for consume on routing key name, called from amqp thread. + /// user required to ack/reject delivery_tag for each callback. + /// @param recover if true recover all messages that were not yet acked + // asks the server to redeliver all unacknowledged messages on the channel + // zero or more messages may be redelivered + void start_consume(std::string queue_name, on_consume_t on_consume, bool recover) { + boost::asio::post( thread_pool_.get_executor(), + [this, qn{std::move(queue_name)}, on_consume{std::move(on_consume)}, recover]() mutable { + try { + if( on_consume_ ) { + on_error("AMQP already consuming from: " + queue_name_ + ", unable to consume from: " + qn); + return; + } else if( !on_consume ) { + on_error("AMQP nullptr passed for on_consume callback"); + return; + } + queue_name_ = std::move(qn); + on_consume_ = std::move(on_consume); + init_consume(recover); + } FC_LOG_AND_DROP() + } ); } -private: - amqp_handler(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type, - on_error_t on_err, on_consume_t on_consume) - : first_connect_() - , thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread - , amqp_connection_( thread_pool_.get_executor(), address, [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} ) - , name_( std::move( name ) ) - , exchange_name_( std::move( exchange_name ) ) - , exchange_type_( std::move( exchange_type ) ) - , on_error_( std::move( on_err ) ) - , on_consume_( std::move( on_consume ) ) - { - AMQP::Address amqp_address( address ); - ilog( "Connecting to AMQP address ${a} - Queue: ${q}...", ("a", amqp_address)("q", name_) ); - - wait(); - } +private: // called from non-amqp thread void wait() { if( !first_connect_.wait() ) { - boost::asio::post( thread_pool_.get_executor(), [this](){ - on_error( "AMQP timeout connecting and declaring queue" ); - } ); + elog( "AMQP timeout connecting to: ${a}", ("a", amqp_connection_.address()) ); + on_error( "AMQP timeout connecting" ); } } // called from amqp thread void channel_ready(AMQP::Channel* c) { - dlog( "AMQP Channel ready: ${id}", ("id", c ? c->id() : 0) ); + ilog( "AMQP Channel ready: ${id}, for ${a}", ("id", c ? c->id() : 0)("a", amqp_connection_.address()) ); channel_ = c; - init(); + init_consume(true); + first_connect_.set(); } // called from amqp thread void channel_failed() { - wlog( "AMQP connection failed." ); + elog( "AMQP connection failed to: ${a}", ("a", amqp_connection_.address()) ); channel_ = nullptr; } - // called from amqp thread - void init() { - if(!channel_) return; - if( !exchange_type_.empty() ) { - auto type = AMQP::direct; - if (exchange_type_ == "fanout") { - type = AMQP::fanout; - } else if (exchange_type_ != "direct") { - on_error( "Unsupported exchange type: " + exchange_type_ ); - first_connect_.set(); - return; - } - - auto& exchange = channel_->declareExchange( exchange_name_, type, AMQP::durable); - exchange.onSuccess( [this]() { - dlog( "AMQP declare exchange Successfully! Exchange ${e}", ("e", exchange_name_) ); - init_consume(true); - first_connect_.set(); - } ); - exchange.onError([this](const char* error_message) { - on_error( std::string("AMQP Queue error: ") + error_message ); - first_connect_.set(); - }); - } else { - auto& queue = channel_->declareQueue( name_, AMQP::durable ); - queue.onSuccess( [&]( const std::string& name, uint32_t messagecount, uint32_t consumercount ) { - dlog( "AMQP Connected Successfully! Queue ${q} - Messages: ${mc} - Consumers: ${cc}", - ("q", name)( "mc", messagecount )( "cc", consumercount ) ); - init_consume(true); - first_connect_.set(); - } ); - queue.onError( [&]( const char* error_message ) { - on_error( error_message ); - first_connect_.set(); - } ); - } - } - // called from amqp thread void init_consume(bool recover) { - if( channel_ && on_consume_ && consuming_ ) { + if( channel_ && on_consume_ ) { if (recover) { channel_->recover(AMQP::requeue) - .onSuccess([&](){dlog("successfully started channel recovery");}) - .onError([&](const char* message){wlog("channel recovery failed ${e}", ("e", message));}); + .onSuccess( [&]() { dlog( "successfully started channel recovery" ); } ) + .onError( [&]( const char* message ) { + elog( "channel recovery failed ${e}", ("e", message) ); + on_error( "AMQP channel recovery failed" ); + } ); } - auto &consumer = channel_->consume(name_); + auto &consumer = channel_->consume(queue_name_); consumer.onSuccess([&](const std::string &consumer_tag) { - dlog("consume started: ${tag}", ("tag", consumer_tag)); + ilog("consume started: ${tag}, for ${a}", ("tag", consumer_tag)("a", amqp_connection_.address())); }); consumer.onError([&](const char *message) { - wlog("consume failed: ${e}", ("e", message)); + elog("consume failed: ${e}, for ${a}", ("e", message)("a", amqp_connection_.address())); on_error(message); }); - static_assert(std::is_same_v, - "AMQP::MessageCallback interface changed"); + static_assert(std::is_same_v, "AMQP::MessageCallback interface changed"); consumer.onReceived(on_consume_); } } @@ -286,12 +338,9 @@ class amqp_handler { eosio::chain::named_thread_pool thread_pool_; single_channel_retrying_amqp_connection amqp_connection_; AMQP::Channel* channel_ = nullptr; // null when not connected - std::string name_; - std::string exchange_name_; - std::string exchange_type_; on_error_t on_error_; on_consume_t on_consume_; - bool consuming_ = false; + std::string queue_name_; }; } // namespace eosio diff --git a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp index 88f7efdb792..0d421817ebd 100644 --- a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp +++ b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp @@ -111,7 +111,11 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this if (!prod_plugin->paused() || allow_speculative_execution) { if (!started_consuming) { ilog("Starting consuming amqp messages during on_block_start"); - amqp_trx->start_consume(true); + amqp_trx->start_consume(amqp_trx_queue, + [&]( const AMQP::Message& message, const amqp_handler::delivery_tag_t& delivery_tag, bool redelivered ) { + if( app().is_quiting() ) return; // leave non-ack + consume_message( message, delivery_tag, redelivered ); + }, true); started_consuming = true; } @@ -221,9 +225,9 @@ void amqp_trx_plugin::set_program_options(options_description& cli, options_desc auto op = cfg.add_options(); op("amqp-trx-address", bpo::value(), "AMQP address: Format: amqp://USER:PASSWORD@ADDRESS:PORT\n" - "Will consume from amqp-trx-queue-name ('trx') queue."); + "Will consume from amqp-trx-queue-name (amqp-trx-queue-name) queue."); op("amqp-trx-queue-name", bpo::value()->default_value("trx"), - "AMQP queue to consume transactions from."); + "AMQP queue to consume transactions from, must already exist."); op("amqp-trx-queue-size", bpo::value()->default_value(my->trx_processing_queue_size), "The maximum number of transactions to pull from the AMQP queue at any given time."); op("amqp-trx-speculative-execution", bpo::bool_switch()->default_value(false), @@ -232,6 +236,8 @@ void amqp_trx_plugin::set_program_options(options_description& cli, options_desc "AMQP ack when 'received' from AMQP, when 'executed', or when 'in_block' is produced that contains trx.\n" "Options: received, executed, in_block"); op("amqp-trx-trace-reliable-mode", bpo::value()->default_value(amqp_trace_plugin_impl::reliable_mode::queue), + "If AMQP reply-to header is set, transaction trace is sent to default exchange with routing key of the reply-to header.\n" + "If AMQP reply-to header is not set, then transaction trace is discarded.\n" "Reliable mode 'exit', exit application on any AMQP publish error.\n" "Reliable mode 'queue', queue transaction traces to send to AMQP on connection establishment.\n" "Reliable mode 'log', log an error and drop message when unable to directly publish to AMQP."); @@ -259,7 +265,7 @@ void amqp_trx_plugin::plugin_initialize(const variables_map& options) { my->trace_plug.amqp_trace_address = my->amqp_trx_address; my->trace_plug.amqp_trace_queue_name = ""; // not used, reply-to is used for each message - my->trace_plug.amqp_trace_exchange = ""; // not used, reply-to used for routing-key + my->trace_plug.amqp_trace_exchange = ""; // default exchange, reply-to used for routing-key my->trace_plug.pub_reliable_mode = options.at("amqp-trx-trace-reliable-mode").as(); my->chain_plug->enable_accept_transactions(); @@ -311,20 +317,20 @@ void amqp_trx_plugin::plugin_startup() { my->trx_queue_ptr->run(); - my->amqp_trx.emplace( my->amqp_trx_address, my->amqp_trx_queue, + my->amqp_trx.emplace( my->amqp_trx_address, []( const std::string& err ) { elog( "amqp error: ${e}", ("e", err) ); app().quit(); - }, - [&]( const AMQP::Message& message, const amqp_handler::delivery_tag_t& delivery_tag, bool redelivered ) { - if( app().is_quiting() ) return; // leave non-ack - my->consume_message( message, delivery_tag, redelivered ); } ); if (!my->prod_plugin->paused() || my->allow_speculative_execution) { ilog("Starting amqp consumption at startup."); - my->amqp_trx->start_consume(true); + my->amqp_trx->start_consume(my->amqp_trx_queue, + [&]( const AMQP::Message& message, const amqp_handler::delivery_tag_t& delivery_tag, bool redelivered ) { + if( app().is_quiting() ) return; // leave non-ack + my->consume_message( message, delivery_tag, redelivered ); + }, true); my->started_consuming = true; } } diff --git a/programs/cleos/main.cpp b/programs/cleos/main.cpp index 698f2c18cfc..325752315dd 100644 --- a/programs/cleos/main.cpp +++ b/programs/cleos/main.cpp @@ -163,6 +163,7 @@ string default_wallet_url = "unix://" + (determine_home_directory() / "eosio-wal string wallet_url; //to be set to default_wallet_url in main string amqp_address; string amqp_reply_to; +string amqp_queue_name = "trx"; bool no_verify = false; vector headers; @@ -443,14 +444,14 @@ fc::variant push_transaction( signed_transaction& trx, const std::vector(msg).id(); string id = tid.str(); - eosio::amqp_handler qp_trx( amqp_address, "trx", []( const std::string& err ) { + eosio::amqp_handler qp_trx( amqp_address, []( const std::string& err ) { std::cerr << "AMQP trx error: " << err << std::endl; exit( 1 ); } ); result = fc::mutable_variant_object() ( "transaction_id", id ) ( "status", "submitted" ); - qp_trx.publish( "", std::move( id ), amqp_reply_to, std::move( buf ) ); + qp_trx.publish( "", amqp_queue_name, std::move( id ), amqp_reply_to, std::move( buf ) ); return result; } else { try { @@ -2525,6 +2526,7 @@ int main( int argc, char** argv ) { app.add_option( "-u,--url", default_url, localized( "The http/https URL where ${n} is running", ("n", node_executable_name)), true ); app.add_option( "--wallet-url", wallet_url, localized("The http/https URL where ${k} is running", ("k", key_store_executable_name)), true ); app.add_option( "--amqp", amqp_address, localized("The ampq URL where AMQP is running amqp://USER:PASSWORD@ADDRESS:PORT"), false ); + app.add_option( "--amqp-queue-name", amqp_queue_name, localized("The ampq queue to send transaction to"), true ); app.add_option( "--amqp-reply-to", amqp_reply_to, localized("The ampq reply to string"), false ); app.add_option( "-r,--header", header_opt_callback, localized("Pass specific HTTP header; repeat this option to pass multiple headers")); diff --git a/programs/rodeos/streams/rabbitmq.hpp b/programs/rodeos/streams/rabbitmq.hpp index c177a683f6f..afb00bb8a16 100644 --- a/programs/rodeos/streams/rabbitmq.hpp +++ b/programs/rodeos/streams/rabbitmq.hpp @@ -41,12 +41,13 @@ class rabbitmq : public stream_handler { , queue_name_( std::move( queue_name)) { ilog("Connecting to RabbitMQ address ${a} - Queue: ${q}...", ("a", address)( "q", queue_name_)); - bool error = false; - eosio::amqp_handler declare_queue( address_, queue_name_, [&error](const std::string& err){ + std::atomic error = false; + eosio::amqp_handler declare_queue( address_, [&error](const std::string& err){ elog("AMQP Queue error: ${e}", ("e", err)); appbase::app().quit(); error = true; } ); + if( !error ) declare_queue.declare_queue(queue_name_); if( error ) return; init(); } @@ -59,12 +60,13 @@ class rabbitmq : public stream_handler { , exchange_name_( std::move( exchange_name)) { ilog("Connecting to RabbitMQ address ${a} - Exchange: ${e}...", ("a", address)( "e", exchange_name_)); - bool error = false; - eosio::amqp_handler declare_exchange( address_, exchange_name_, exchange_type, [&error](const std::string& err){ + std::atomic error = false; + eosio::amqp_handler declare_exchange( address_, [&error](const std::string& err){ elog("AMQP Exchange error: ${e}", ("e", err)); appbase::app().quit(); error = true; } ); + if( !error ) declare_exchange.declare_exchange(exchange_name_, exchange_type); if( error ) return; init(); } diff --git a/tests/Cluster.py b/tests/Cluster.py index bfd7e0e0bb2..f75e8d880d6 100644 --- a/tests/Cluster.py +++ b/tests/Cluster.py @@ -616,6 +616,13 @@ def doNodesHaveBlockNum(nodes, targetBlockNum, blockType, printCount): ret=Utils.waitForTruth(lam, timeout) return ret + @staticmethod + def createAMQPQueue(queueName): + Utils.runCmdReturnStr("rabbitmq-plugins enable rabbitmq_management") + cmd="curl -u guest:guest -H \"content-type:application/json\" -XPUT http://127.0.0.1:15672/api/queues/%%2F/%s -d'{\"durable\":true}' " % \ + (queueName) + return Utils.runCmdReturnStr(cmd) + @staticmethod def getClientVersion(verbose=False): """Returns client version (string)""" diff --git a/tests/nodeos_high_transaction_test.py b/tests/nodeos_high_transaction_test.py index 94cba574a7f..4d4fcc71a51 100755 --- a/tests/nodeos_high_transaction_test.py +++ b/tests/nodeos_high_transaction_test.py @@ -86,6 +86,7 @@ totalNodes=totalNodes, totalProducers=totalProducers, useBiosBootFile=False, topo="ring") else: + cluster.createAMQPQueue("trx") launched = cluster.launch(pnodes=totalProducerNodes, totalNodes=totalNodes, totalProducers=totalProducers, useBiosBootFile=False, topo="ring", diff --git a/tests/nodeos_run_test.py b/tests/nodeos_run_test.py index d38c8c48e32..23705cca9d8 100755 --- a/tests/nodeos_run_test.py +++ b/tests/nodeos_run_test.py @@ -73,6 +73,7 @@ specificExtraNodeosArgs={ 0 : " --backing-store=chainbase", 1 : " --backing-store=rocksdb" } else: + cluster.createAMQPQueue("trx") specificExtraNodeosArgs={ 0: "--backing-store=chainbase --plugin eosio::amqp_trx_plugin --amqp-trx-address %s --plugin eosio::amqp_trace_plugin --amqp-trace-address %s" % (amqpAddr, amqpAddr), 1 : " --backing-store=rocksdb" } if cluster.launch(totalNodes=3, prodCount=prodCount, onlyBios=onlyBios, dontBootstrap=dontBootstrap, specificExtraNodeosArgs=specificExtraNodeosArgs) is False: