From bc9a94fad636044d0e3184eba92044ea61fc6b2b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 29 Apr 2021 14:16:48 -0500 Subject: [PATCH] Set correlation_id of the amqp trace message to correspond to the correlation_id of the incoming amqp trx message. For amqp_trace_plugin amqp trace messages do not set correlation_id since it does not correspond to an incoming amqp message. --- .../amqp_trace_plugin_impl.cpp | 17 +++++++++-------- .../amqp_trace_plugin_impl.hpp | 5 +++-- plugins/amqp_trx_plugin/amqp_trx_plugin.cpp | 15 +++++++++------ 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp b/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp index fbda5302548..1c409d8da48 100644 --- a/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp +++ b/plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp @@ -34,19 +34,19 @@ std::ostream& operator<<(std::ostream& osm, amqp_trace_plugin_impl::reliable_mod } // Can be called from any thread except reliable_amqp_publisher thread -void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message ) { +void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string correlation_id, int64_t error_code, std::string error_message ) { try { // reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns amqp_trace->post_on_io_context( [&amqp_trace = *amqp_trace, mode=pub_reliable_mode, rk{std::move(routing_key)}, - tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable { + cid{std::move(correlation_id)}, error_code, em{std::move(error_message)}]() mutable { transaction_trace_msg msg{transaction_trace_exception{error_code}}; std::get( msg ).error_message = std::move( em ); std::vector buf = convert_to_bin( msg ); if( mode == reliable_mode::queue) { - amqp_trace.publish_message_raw( rk, tid, std::move( buf ) ); + amqp_trace.publish_message_raw( rk, cid, std::move( buf ) ); } else { - amqp_trace.publish_message_direct( rk, tid, std::move( buf ), + amqp_trace.publish_message_direct( rk, cid, std::move( buf ), [mode]( const std::string& err ) { elog( "AMQP direct message error: ${e}", ("e", err) ); if( mode == reliable_mode::exit ) @@ -62,19 +62,20 @@ void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string void amqp_trace_plugin_impl::on_applied_transaction( const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t ) { try { - publish_result( std::string(), t, trace ); + publish_result( std::string(), std::string{}, t, trace ); } FC_LOG_AND_DROP() } // called from application thread void amqp_trace_plugin_impl::publish_result( std::string routing_key, + std::string correlation_id, const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace ) { try { // reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns amqp_trace->post_on_io_context( - [&amqp_trace = *amqp_trace, trx, trace, rk=std::move(routing_key), mode=pub_reliable_mode]() { + [&amqp_trace = *amqp_trace, trx, trace, rk=std::move(routing_key), cid=std::move(correlation_id), mode=pub_reliable_mode]() { if( !trace->except ) { dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) ); } else { @@ -83,9 +84,9 @@ void amqp_trace_plugin_impl::publish_result( std::string routing_key, transaction_trace_msg msg{ eosio::state_history::convert( *trace ) }; std::vector buf = convert_to_bin( msg ); if( mode == reliable_mode::queue) { - amqp_trace.publish_message_raw( rk, trx->id(), std::move( buf ) ); + amqp_trace.publish_message_raw( rk, cid, std::move( buf ) ); } else { - amqp_trace.publish_message_direct( rk, trx->id(), std::move( buf ), + amqp_trace.publish_message_direct( rk, cid, std::move( buf ), [mode]( const std::string& err ) { elog( "AMQP direct message error: ${e}", ("e", err) ); if( mode == reliable_mode::exit ) diff --git a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp index 30ce106f11c..5591d62e685 100644 --- a/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp +++ b/plugins/amqp_trace_plugin/include/eosio/amqp_trace_plugin/amqp_trace_plugin_impl.hpp @@ -23,13 +23,14 @@ struct amqp_trace_plugin_impl : std::enable_shared_from_this>(std::istream& in, amqp_trace_plugin_impl::reliable_mode& m); diff --git a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp index 80934a1cc53..9d7c28d7f79 100644 --- a/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp +++ b/plugins/amqp_trx_plugin/amqp_trx_plugin.cpp @@ -82,11 +82,11 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this chain::packed_transaction_v0 v0; fc::raw::unpack(ds, v0); auto ptr = std::make_shared( std::move( v0 ), true ); - handle_message( delivery_tag, message.replyTo(), std::move( ptr ) ); + handle_message( delivery_tag, message.replyTo(), message.correlationID(), std::move( ptr ) ); } else if ( which == fc::unsigned_int(fc::get_index()) ) { auto ptr = std::make_shared(); fc::raw::unpack(ds, *ptr); - handle_message( delivery_tag, message.replyTo(), std::move( ptr ) ); + handle_message( delivery_tag, message.replyTo(), message.correlationID(), std::move( ptr ) ); } else { FC_THROW_EXCEPTION( fc::out_of_range_exception, "Invalid which ${w} for consume of transaction_type message", ("w", which) ); } @@ -121,7 +121,10 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this private: // called from amqp thread - void handle_message( const amqp_handler::delivery_tag_t& delivery_tag, const std::string& reply_to, chain::packed_transaction_ptr trx ) { + void handle_message( const amqp_handler::delivery_tag_t& delivery_tag, + const std::string& reply_to, + const std::string& correlation_id, + chain::packed_transaction_ptr trx ) { const auto& tid = trx->id(); dlog( "received packed_transaction ${id}", ("id", tid) ); @@ -130,7 +133,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this fc_add_tag(trx_span, "trx_id", tid); trx_queue_ptr->push( trx, - [my=shared_from_this(), token=trx_trace.get_token(), delivery_tag, reply_to, trx] + [my=shared_from_this(), token=trx_trace.get_token(), delivery_tag, reply_to, correlation_id, trx] (const std::variant& result) { auto trx_span = fc_create_span_from_token(token, "Processed"); fc_add_tag(trx_span, "trx_id", trx->id()); @@ -139,7 +142,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this if( std::holds_alternative(result) ) { auto& eptr = std::get(result); if( !reply_to.empty() ) { - my->trace_plug.publish_error( reply_to, trx->id().str(), eptr->code(), eptr->to_string() ); + my->trace_plug.publish_error( reply_to, correlation_id, eptr->code(), eptr->to_string() ); } fc_add_tag(trx_span, "error", eptr->to_string()); dlog( "accept_transaction ${id} exception: ${e}", ("id", trx->id())("e", eptr->to_string()) ); @@ -149,7 +152,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this } else { auto& trace = std::get(result); if( !reply_to.empty() ) { - my->trace_plug.publish_result( reply_to, trx, trace ); + my->trace_plug.publish_result( reply_to, correlation_id, trx, trace ); } fc_add_tag(trx_span, "block_num", trace->block_num); fc_add_tag(trx_span, "block_time", trace->block_time.to_time_point());