Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10412 from EOSIO/cleos-reply-to
Browse files Browse the repository at this point in the history
Add support for specifying reply-to to amqp message 📦
  • Loading branch information
heifner authored Jun 3, 2021
2 parents 7b495a4 + 22b3bd1 commit a62dd83
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
18 changes: 11 additions & 7 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class amqp_handler {

/// @param address AMQP address
/// @param exchange_name AMQP exchange to send message to
/// @param exchange_type AMQP exhcnage type
/// @param exchange_type AMQP exchange type
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
Expand All @@ -50,11 +50,13 @@ class amqp_handler {

/// publish to AMQP address with routing key name
// on_error() called if not connected
void publish( std::string exchange, std::string correlation_id, std::vector<char> buf ) {
void publish( std::string exchange, std::string correlation_id, std::string reply_to, std::vector<char> buf ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), buf=std::move(buf)]() {
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
buf=std::move(buf)]() mutable {
AMQP::Envelope env( buf.data(), buf.size() );
env.setCorrelationID( cid );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move( rt ) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
Expand All @@ -65,12 +67,14 @@ class amqp_handler {
/// publish to AMQP calling f() -> std::vector<char> on amqp thread
// on_error() called if not connected
template<typename Func>
void publish( std::string exchange, std::string correlation_id, Func f ) {
void publish( std::string exchange, std::string correlation_id, std::string reply_to, Func f ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), f=std::move(f)]() {
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
f=std::move(f)]() mutable {
std::vector<char> buf = f();
AMQP::Envelope env( buf.data(), buf.size() );
env.setCorrelationID( cid );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move(rt) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
Expand Down
4 changes: 3 additions & 1 deletion programs/cleos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ string default_url = "http://127.0.0.1:8888/";
string default_wallet_url = "unix://" + (determine_home_directory() / "eosio-wallet" / (string(key_store_executable_name) + ".sock")).string();
string wallet_url; //to be set to default_wallet_url in main
string amqp_address;
string amqp_reply_to;
bool no_verify = false;
vector<string> headers;

Expand Down Expand Up @@ -449,7 +450,7 @@ fc::variant push_transaction( signed_transaction& trx, const std::vector<public_
result = fc::mutable_variant_object()
( "transaction_id", id )
( "status", "submitted" );
qp_trx.publish( "", std::move( id ), std::move( buf ) );
qp_trx.publish( "", std::move( id ), amqp_reply_to, std::move( buf ) );
return result;
} else {
try {
Expand Down Expand Up @@ -2524,6 +2525,7 @@ int main( int argc, char** argv ) {
app.add_option( "-u,--url", default_url, localized( "The http/https URL where ${n} is running", ("n", node_executable_name)), true );
app.add_option( "--wallet-url", wallet_url, localized("The http/https URL where ${k} is running", ("k", key_store_executable_name)), true );
app.add_option( "--amqp", amqp_address, localized("The ampq URL where AMQP is running amqp://USER:PASSWORD@ADDRESS:PORT"), false );
app.add_option( "--amqp-reply-to", amqp_reply_to, localized("The ampq reply to string"), false );

app.add_option( "-r,--header", header_opt_callback, localized("Pass specific HTTP header; repeat this option to pass multiple headers"));
app.add_flag( "-n,--no-verify", no_verify, localized("Don't verify peer certificate when using HTTPS"));
Expand Down

0 comments on commit a62dd83

Please sign in to comment.