Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10141 from EOSIO/amqp-trx-trace-split
Browse files Browse the repository at this point in the history
Separate amqp_trx_plugin from amqp_trace_plugin
  • Loading branch information
heifner authored Mar 19, 2021
2 parents 7878958 + 210bb18 commit bfdba24
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 137 deletions.
7 changes: 3 additions & 4 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class amqp_handler {
// delivery_tag type of consume, use for ack/reject
using delivery_tag_t = uint64_t;
// called from amqp thread on consume of message
using on_consume_t = std::function<void(const delivery_tag_t& delivery_tag, const char* buf, size_t s)>;
using on_consume_t = std::function<void(const AMQP::Message& message, delivery_tag_t delivery_tag, bool redelivered)>;

/// @param address AMQP address
/// @param name AMQP routing key
Expand Down Expand Up @@ -229,9 +229,8 @@ class amqp_handler {
on_error( message );
first_connect_.set();
} );
consumer.onReceived( [&](const AMQP::Message& message, const delivery_tag_t& delivery_tag, bool redelivered) {
on_consume_( delivery_tag, message.body(), message.bodySize() );
} );
static_assert( std::is_same_v<on_consume_t, AMQP::MessageCallback>, "AMQP::MessageCallback interface changed" );
consumer.onReceived( on_consume_ );
}

// called from amqp thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ class reliable_amqp_publisher {
/// \param data message to send
void publish_message_raw(std::vector<char>&& data);

/// \param routing_key if empty() uses class provided default routing_key
/// \param correlation_id if not empty() sets as correlation id of the message envelope
/// \param data message to send
void publish_message_raw(const std::string& correlation_id, std::vector<char>&& data);
void publish_message_raw(const std::string& routing_key, const std::string& correlation_id, std::vector<char>&& data);

/// Publish messages. May be called from any thread.
/// \param queue set of messages to send in one transaction <routing_key, message_data>
Expand Down
16 changes: 9 additions & 7 deletions libraries/amqp/reliable_amqp_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct reliable_amqp_publisher_impl {
~reliable_amqp_publisher_impl();
void pump_queue();
void publish_message_raw(std::vector<char>&& data);
void publish_message_raw(const std::string& correlation_id, std::vector<char>&& data);
void publish_message_raw(const std::string& routing_key, const std::string& correlation_id, std::vector<char>&& data);
void publish_messages_raw(std::deque<std::pair<std::string, std::vector<char>>>&& queue);
void publish_message_direct(const std::string& routing_key, const std::string& correlation_id,
std::vector<char> data, reliable_amqp_publisher::error_callback_t on_error);
Expand Down Expand Up @@ -213,17 +213,19 @@ void reliable_amqp_publisher_impl::publish_message_raw(std::vector<char>&& data)
pump_queue();
}

void reliable_amqp_publisher_impl::publish_message_raw(const std::string& correlation_id, std::vector<char>&& data) {
void reliable_amqp_publisher_impl::publish_message_raw(const std::string& routing_key,
const std::string& correlation_id,
std::vector<char>&& data) {
if(!ctx.get_executor().running_in_this_thread()) {
boost::asio::post(user_submitted_work_strand, [this, d=std::move(data), id=correlation_id]() mutable {
publish_message_raw(id, std::move(d));
boost::asio::post(user_submitted_work_strand, [this, d=std::move(data), rk=routing_key, id=correlation_id]() mutable {
publish_message_raw(rk, id, std::move(d));
});
return;
}

verify_max_queue_size();

message_deque.emplace_back(amqp_message{0, "", correlation_id, std::move(data)});
message_deque.emplace_back(amqp_message{0, routing_key, correlation_id, std::move(data)});
pump_queue();
}

Expand Down Expand Up @@ -283,8 +285,8 @@ void reliable_amqp_publisher::publish_message_raw(std::vector<char>&& data) {
my->publish_message_raw( std::move( data ) );
}

void reliable_amqp_publisher::publish_message_raw(const std::string& correlation_id, std::vector<char>&& data) {
my->publish_message_raw( correlation_id, std::move( data ) );
void reliable_amqp_publisher::publish_message_raw(const std::string& routing_key, const std::string& correlation_id, std::vector<char>&& data) {
my->publish_message_raw( routing_key, correlation_id, std::move( data ) );
}

void reliable_amqp_publisher::publish_messages_raw(std::deque<std::pair<std::string, std::vector<char>>>&& queue) {
Expand Down
77 changes: 30 additions & 47 deletions plugins/amqp_trace_plugin/amqp_trace_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ amqp_trace_plugin::amqp_trace_plugin()
amqp_trace_plugin::~amqp_trace_plugin() {}

void amqp_trace_plugin::publish_error( std::string tid, int64_t error_code, std::string error_message ) {
my->publish_error( std::move(tid), error_code, std::move(error_message) );
my->publish_error( std::string(), std::move(tid), error_code, std::move(error_message) );
}

void amqp_trace_plugin::set_program_options(options_description& cli, options_description& cfg) {
Expand Down Expand Up @@ -56,57 +56,40 @@ void amqp_trace_plugin::plugin_initialize(const variables_map& options) {
}

void amqp_trace_plugin::plugin_startup() {
if( !my->started ) {
handle_sighup();
try {
ilog( "Starting amqp_trace_plugin" );
my->started = true;

const boost::filesystem::path trace_data_dir_path = appbase::app().data_dir() / "amqp_trace_plugin";
const boost::filesystem::path trace_data_file_path = trace_data_dir_path / "trace.bin";
if( my->pub_reliable_mode != amqp_trace_plugin_impl::reliable_mode::queue ) {
EOS_ASSERT( !fc::exists(trace_data_file_path), chain::plugin_config_exception,
"Existing queue file when amqp-trace-reliable-mode != 'queue': ${f}",
("f", trace_data_file_path.generic_string()) );
} else if( auto resmon_plugin = app().find_plugin<resource_monitor_plugin>() ) {
resmon_plugin->monitor_directory( trace_data_dir_path );
}

my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name, trace_data_file_path,
[]( const std::string& err ) {
elog( "AMQP fatal error: ${e}", ("e", err) );
appbase::app().quit();
} );

auto chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" );

applied_transaction_connection.emplace(
chain_plug->chain().applied_transaction.connect(
[me = my]( std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t ) {
me->on_applied_transaction( std::get<0>( t ), std::get<1>( t ) );
} ) );

} catch( ... ) {
// always want plugin_shutdown even on exception
plugin_shutdown();
throw;
}
ilog( "Starting amqp_trace_plugin" );

const boost::filesystem::path trace_data_dir_path = appbase::app().data_dir() / "amqp_trace_plugin";
const boost::filesystem::path trace_data_file_path = trace_data_dir_path / "trace.bin";
if( my->pub_reliable_mode != amqp_trace_plugin_impl::reliable_mode::queue ) {
EOS_ASSERT( !fc::exists( trace_data_file_path ), chain::plugin_config_exception,
"Existing queue file when amqp-trace-reliable-mode != 'queue': ${f}",
("f", trace_data_file_path.generic_string()) );
} else if( auto resmon_plugin = app().find_plugin<resource_monitor_plugin>() ) {
resmon_plugin->monitor_directory( trace_data_dir_path );
}
}

void amqp_trace_plugin::plugin_shutdown() {
if( my->started ) {
try {
dlog( "shutdown.." );
my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name,
trace_data_file_path,
[]( const std::string& err ) {
elog( "AMQP fatal error: ${e}", ("e", err) );
appbase::app().quit();
} );

applied_transaction_connection.reset();
auto chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" );

dlog( "exit amqp_trace_plugin" );
}
FC_CAPTURE_AND_RETHROW()
my->started = false;
}
applied_transaction_connection.emplace(
chain_plug->chain().applied_transaction.connect(
[me = my]( std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t ) {
me->on_applied_transaction( std::get<0>( t ), std::get<1>( t ) );
} ) );

}

void amqp_trace_plugin::plugin_shutdown() {
applied_transaction_connection.reset();
dlog( "exit amqp_trace_plugin" );
}

void amqp_trace_plugin::handle_sighup() {
Expand Down
19 changes: 10 additions & 9 deletions plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ std::ostream& operator<<(std::ostream& osm, amqp_trace_plugin_impl::reliable_mod
}

// Can be called from any thread except reliable_amqp_publisher thread
void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code, std::string error_message ) {
void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message ) {
try {
// reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns
amqp_trace->post_on_io_context(
[&amqp_trace = *amqp_trace, mode=pub_reliable_mode,
[&amqp_trace = *amqp_trace, mode=pub_reliable_mode, rk{std::move(routing_key)},
tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable {
transaction_trace_msg msg{transaction_trace_exception{error_code}};
std::get<transaction_trace_exception>( msg ).error_message = std::move( em );
std::vector<char> buf = convert_to_bin( msg );
if( mode == reliable_mode::queue) {
amqp_trace.publish_message_raw( tid, std::move( buf ) );
amqp_trace.publish_message_raw( rk, tid, std::move( buf ) );
} else {
amqp_trace.publish_message_direct( std::string(), tid, std::move( buf ),
amqp_trace.publish_message_direct( rk, tid, std::move( buf ),
[mode]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
if( mode == reliable_mode::exit )
Expand All @@ -62,18 +62,19 @@ void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code,
void amqp_trace_plugin_impl::on_applied_transaction( const chain::transaction_trace_ptr& trace,
const chain::packed_transaction_ptr& t ) {
try {
publish_result( t, trace );
publish_result( std::string(), t, trace );
}
FC_LOG_AND_DROP()
}

// called from application thread
void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr& trx,
void amqp_trace_plugin_impl::publish_result( std::string routing_key,
const chain::packed_transaction_ptr& trx,
const chain::transaction_trace_ptr& trace ) {
try {
// reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns
amqp_trace->post_on_io_context(
[&amqp_trace = *amqp_trace, trx, trace, mode=pub_reliable_mode]() {
[&amqp_trace = *amqp_trace, trx, trace, rk=std::move(routing_key), mode=pub_reliable_mode]() {
if( !trace->except ) {
dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) );
} else {
Expand All @@ -82,9 +83,9 @@ void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr
transaction_trace_msg msg{ eosio::state_history::convert( *trace ) };
std::vector<char> buf = convert_to_bin( msg );
if( mode == reliable_mode::queue) {
amqp_trace.publish_message_raw( trx->id(), std::move( buf ) );
amqp_trace.publish_message_raw( rk, trx->id(), std::move( buf ) );
} else {
amqp_trace.publish_message_direct( std::string(), trx->id(), std::move( buf ),
amqp_trace.publish_message_direct( rk, trx->id(), std::move( buf ),
[mode]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
if( mode == reliable_mode::exit )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@ struct amqp_trace_plugin_impl : std::enable_shared_from_this<amqp_trace_plugin_i
std::string amqp_trace_address;
std::string amqp_trace_queue_name;
std::string amqp_trace_exchange;
reliable_mode pub_reliable_mode;
bool started = false;
reliable_mode pub_reliable_mode = reliable_mode::queue;

public:

// called from any thread
void publish_error( std::string tid, int64_t error_code, std::string error_message );
void publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message );

// called on application thread
void on_applied_transaction(const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t);

private:

// called from application thread
void publish_result( const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
// called from any thread
void publish_result( std::string routing_key, const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
};

std::istream& operator>>(std::istream& in, amqp_trace_plugin_impl::reliable_mode& m);
Expand Down
Loading

0 comments on commit bfdba24

Please sign in to comment.