Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10316 from EOSIO/amqp-correlation-id
Browse files Browse the repository at this point in the history
amqp_trx_plugin correlation_id
  • Loading branch information
heifner authored Apr 30, 2021
2 parents 1a50c03 + c9f500e commit 76e2726
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
17 changes: 9 additions & 8 deletions plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ std::ostream& operator<<(std::ostream& osm, amqp_trace_plugin_impl::reliable_mod
}

// Can be called from any thread except reliable_amqp_publisher thread
void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message ) {
void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string correlation_id, int64_t error_code, std::string error_message ) {
try {
// reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns
amqp_trace->post_on_io_context(
[&amqp_trace = *amqp_trace, mode=pub_reliable_mode, rk{std::move(routing_key)},
tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable {
cid{std::move(correlation_id)}, error_code, em{std::move(error_message)}]() mutable {
transaction_trace_msg msg{transaction_trace_exception{error_code}};
std::get<transaction_trace_exception>( msg ).error_message = std::move( em );
std::vector<char> buf = convert_to_bin( msg );
if( mode == reliable_mode::queue) {
amqp_trace.publish_message_raw( rk, tid, std::move( buf ) );
amqp_trace.publish_message_raw( rk, cid, std::move( buf ) );
} else {
amqp_trace.publish_message_direct( rk, tid, std::move( buf ),
amqp_trace.publish_message_direct( rk, cid, std::move( buf ),
[mode]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
if( mode == reliable_mode::exit )
Expand All @@ -62,19 +62,20 @@ void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string
void amqp_trace_plugin_impl::on_applied_transaction( const chain::transaction_trace_ptr& trace,
const chain::packed_transaction_ptr& t ) {
try {
publish_result( std::string(), t, trace );
publish_result( std::string(), std::string{}, t, trace );
}
FC_LOG_AND_DROP()
}

// called from application thread
void amqp_trace_plugin_impl::publish_result( std::string routing_key,
std::string correlation_id,
const chain::packed_transaction_ptr& trx,
const chain::transaction_trace_ptr& trace ) {
try {
// reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns
amqp_trace->post_on_io_context(
[&amqp_trace = *amqp_trace, trx, trace, rk=std::move(routing_key), mode=pub_reliable_mode]() {
[&amqp_trace = *amqp_trace, trx, trace, rk=std::move(routing_key), cid=std::move(correlation_id), mode=pub_reliable_mode]() {
if( !trace->except ) {
dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) );
} else {
Expand All @@ -83,9 +84,9 @@ void amqp_trace_plugin_impl::publish_result( std::string routing_key,
transaction_trace_msg msg{ eosio::state_history::convert( *trace ) };
std::vector<char> buf = convert_to_bin( msg );
if( mode == reliable_mode::queue) {
amqp_trace.publish_message_raw( rk, trx->id(), std::move( buf ) );
amqp_trace.publish_message_raw( rk, cid, std::move( buf ) );
} else {
amqp_trace.publish_message_direct( rk, trx->id(), std::move( buf ),
amqp_trace.publish_message_direct( rk, cid, std::move( buf ),
[mode]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
if( mode == reliable_mode::exit )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ struct amqp_trace_plugin_impl : std::enable_shared_from_this<amqp_trace_plugin_i
public:

// called from any thread
void publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message );
void publish_error( std::string routing_key, std::string correlation_id, int64_t error_code, std::string error_message );

// called on application thread
void on_applied_transaction(const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t);

// called from any thread
void publish_result( std::string routing_key, const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
void publish_result( std::string routing_key, std::string correlation_id,
const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
};

std::istream& operator>>(std::istream& in, amqp_trace_plugin_impl::reliable_mode& m);
Expand Down
17 changes: 10 additions & 7 deletions plugins/amqp_trx_plugin/amqp_trx_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
chain::packed_transaction_v0 v0;
fc::raw::unpack(ds, v0);
auto ptr = std::make_shared<chain::packed_transaction>( std::move( v0 ), true );
handle_message( delivery_tag, message.replyTo(), std::move( ptr ) );
handle_message( delivery_tag, message.replyTo(), message.correlationID(), std::move( ptr ) );
} else if ( which == fc::unsigned_int(fc::get_index<transaction_msg, chain::packed_transaction>()) ) {
auto ptr = std::make_shared<chain::packed_transaction>();
fc::raw::unpack(ds, *ptr);
handle_message( delivery_tag, message.replyTo(), std::move( ptr ) );
handle_message( delivery_tag, message.replyTo(), message.correlationID(), std::move( ptr ) );
} else {
FC_THROW_EXCEPTION( fc::out_of_range_exception, "Invalid which ${w} for consume of transaction_type message", ("w", which) );
}
Expand Down Expand Up @@ -121,7 +121,10 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
private:

// called from amqp thread
void handle_message( const amqp_handler::delivery_tag_t& delivery_tag, const std::string& reply_to, chain::packed_transaction_ptr trx ) {
void handle_message( const amqp_handler::delivery_tag_t& delivery_tag,
const std::string& reply_to,
const std::string& correlation_id,
chain::packed_transaction_ptr trx ) {
const auto& tid = trx->id();
dlog( "received packed_transaction ${id}", ("id", tid) );

Expand All @@ -130,16 +133,16 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
fc_add_tag(trx_span, "trx_id", tid);

trx_queue_ptr->push( trx,
[my=shared_from_this(), token=trx_trace.get_token(), delivery_tag, reply_to, trx]
(const std::variant<fc::exception_ptr, chain::transaction_trace_ptr>& result) {
[my=shared_from_this(), token=trx_trace.get_token(), delivery_tag, reply_to, correlation_id, trx]
(const std::variant<fc::exception_ptr, chain::transaction_trace_ptr>& result) mutable {
auto trx_span = fc_create_span_from_token(token, "Processed");
fc_add_tag(trx_span, "trx_id", trx->id());

// publish to trace plugin as exceptions are not reported via controller signal applied_transaction
if( std::holds_alternative<chain::exception_ptr>(result) ) {
auto& eptr = std::get<chain::exception_ptr>(result);
if( !reply_to.empty() ) {
my->trace_plug.publish_error( reply_to, trx->id().str(), eptr->code(), eptr->to_string() );
my->trace_plug.publish_error( std::move(reply_to), std::move(correlation_id), eptr->code(), eptr->to_string() );
}
fc_add_tag(trx_span, "error", eptr->to_string());
dlog( "accept_transaction ${id} exception: ${e}", ("id", trx->id())("e", eptr->to_string()) );
Expand All @@ -149,7 +152,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
} else {
auto& trace = std::get<chain::transaction_trace_ptr>(result);
if( !reply_to.empty() ) {
my->trace_plug.publish_result( reply_to, trx, trace );
my->trace_plug.publish_result( std::move(reply_to), std::move(correlation_id), trx, trace );
}
fc_add_tag(trx_span, "block_num", trace->block_num);
fc_add_tag(trx_span, "block_time", trace->block_time.to_time_point());
Expand Down

0 comments on commit 76e2726

Please sign in to comment.