Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add support for specifying reply-to to amqp message 📦 #10412

Merged
merged 1 commit into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class amqp_handler {

/// @param address AMQP address
/// @param exchange_name AMQP exchange to send message to
/// @param exchange_type AMQP exhcnage type
/// @param exchange_type AMQP exchange type
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
Expand All @@ -50,11 +50,13 @@ class amqp_handler {

/// publish to AMQP address with routing key name
// on_error() called if not connected
void publish( std::string exchange, std::string correlation_id, std::vector<char> buf ) {
void publish( std::string exchange, std::string correlation_id, std::string reply_to, std::vector<char> buf ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), buf=std::move(buf)]() {
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
buf=std::move(buf)]() mutable {
AMQP::Envelope env( buf.data(), buf.size() );
env.setCorrelationID( cid );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move( rt ) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
Expand All @@ -65,12 +67,14 @@ class amqp_handler {
/// publish to AMQP calling f() -> std::vector<char> on amqp thread
// on_error() called if not connected
template<typename Func>
void publish( std::string exchange, std::string correlation_id, Func f ) {
void publish( std::string exchange, std::string correlation_id, std::string reply_to, Func f ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), f=std::move(f)]() {
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
f=std::move(f)]() mutable {
std::vector<char> buf = f();
AMQP::Envelope env( buf.data(), buf.size() );
env.setCorrelationID( cid );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move(rt) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
Expand Down
4 changes: 3 additions & 1 deletion programs/cleos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ string default_url = "http://127.0.0.1:8888/";
string default_wallet_url = "unix://" + (determine_home_directory() / "eosio-wallet" / (string(key_store_executable_name) + ".sock")).string();
string wallet_url; //to be set to default_wallet_url in main
string amqp_address;
string amqp_reply_to;
bool no_verify = false;
vector<string> headers;

Expand Down Expand Up @@ -449,7 +450,7 @@ fc::variant push_transaction( signed_transaction& trx, const std::vector<public_
result = fc::mutable_variant_object()
( "transaction_id", id )
( "status", "submitted" );
qp_trx.publish( "", std::move( id ), std::move( buf ) );
qp_trx.publish( "", std::move( id ), amqp_reply_to, std::move( buf ) );
return result;
} else {
try {
Expand Down Expand Up @@ -2524,6 +2525,7 @@ int main( int argc, char** argv ) {
app.add_option( "-u,--url", default_url, localized( "The http/https URL where ${n} is running", ("n", node_executable_name)), true );
app.add_option( "--wallet-url", wallet_url, localized("The http/https URL where ${k} is running", ("k", key_store_executable_name)), true );
app.add_option( "--amqp", amqp_address, localized("The ampq URL where AMQP is running amqp://USER:PASSWORD@ADDRESS:PORT"), false );
app.add_option( "--amqp-reply-to", amqp_reply_to, localized("The ampq reply to string"), false );

app.add_option( "-r,--header", header_opt_callback, localized("Pass specific HTTP header; repeat this option to pass multiple headers"));
app.add_flag( "-n,--no-verify", no_verify, localized("Don't verify peer certificate when using HTTPS"));
Expand Down