diff --git a/libraries/amqp/include/eosio/amqp/amqp_handler.hpp b/libraries/amqp/include/eosio/amqp/amqp_handler.hpp index 2983d7fddb4..b6df986a657 100644 --- a/libraries/amqp/include/eosio/amqp/amqp_handler.hpp +++ b/libraries/amqp/include/eosio/amqp/amqp_handler.hpp @@ -35,7 +35,7 @@ class amqp_handler { /// @param address AMQP address /// @param exchange_name AMQP exchange to send message to - /// @param exchange_type AMQP exhcnage type + /// @param exchange_type AMQP exchange type /// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr /// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed. /// user required to ack/reject delivery_tag for each callback. @@ -50,11 +50,13 @@ class amqp_handler { /// publish to AMQP address with routing key name // on_error() called if not connected - void publish( std::string exchange, std::string correlation_id, std::vector buf ) { + void publish( std::string exchange, std::string correlation_id, std::string reply_to, std::vector buf ) { boost::asio::post( thread_pool_.get_executor(), - [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), buf=std::move(buf)]() { + [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to), + buf=std::move(buf)]() mutable { AMQP::Envelope env( buf.data(), buf.size() ); - env.setCorrelationID( cid ); + if(!cid.empty()) env.setCorrelationID( std::move( cid ) ); + if(!rt.empty()) env.setReplyTo( std::move( rt ) ); if( my->channel_ ) my->channel_->publish( exchange, my->name_, env, 0 ); else @@ -65,12 +67,14 @@ class amqp_handler { /// publish to AMQP calling f() -> std::vector on amqp thread // on_error() called if not connected template - void publish( std::string exchange, std::string correlation_id, Func f ) { + void publish( std::string exchange, std::string correlation_id, std::string reply_to, Func f ) { boost::asio::post( thread_pool_.get_executor(), - [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), f=std::move(f)]() { + [my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to), + f=std::move(f)]() mutable { std::vector buf = f(); AMQP::Envelope env( buf.data(), buf.size() ); - env.setCorrelationID( cid ); + if(!cid.empty()) env.setCorrelationID( std::move( cid ) ); + if(!rt.empty()) env.setReplyTo( std::move(rt) ); if( my->channel_ ) my->channel_->publish( exchange, my->name_, env, 0 ); else diff --git a/programs/cleos/main.cpp b/programs/cleos/main.cpp index 3edfb1674da..698f2c18cfc 100644 --- a/programs/cleos/main.cpp +++ b/programs/cleos/main.cpp @@ -162,6 +162,7 @@ string default_url = "http://127.0.0.1:8888/"; string default_wallet_url = "unix://" + (determine_home_directory() / "eosio-wallet" / (string(key_store_executable_name) + ".sock")).string(); string wallet_url; //to be set to default_wallet_url in main string amqp_address; +string amqp_reply_to; bool no_verify = false; vector headers; @@ -449,7 +450,7 @@ fc::variant push_transaction( signed_transaction& trx, const std::vector