Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Separate amqp_trx_plugin from amqp_trace_plugin #10141

Merged
merged 5 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class amqp_handler {
// delivery_tag type of consume, use for ack/reject
using delivery_tag_t = uint64_t;
// called from amqp thread on consume of message
using on_consume_t = std::function<void(const delivery_tag_t& delivery_tag, const char* buf, size_t s)>;
using on_consume_t = std::function<void(const AMQP::Message& message, delivery_tag_t delivery_tag, bool redelivered)>;

/// @param address AMQP address
/// @param name AMQP routing key
Expand Down Expand Up @@ -229,9 +229,8 @@ class amqp_handler {
on_error( message );
first_connect_.set();
} );
consumer.onReceived( [&](const AMQP::Message& message, const delivery_tag_t& delivery_tag, bool redelivered) {
on_consume_( delivery_tag, message.body(), message.bodySize() );
} );
static_assert( std::is_same_v<on_consume_t, AMQP::MessageCallback>, "AMQP::MessageCallback interface changed" );
consumer.onReceived( on_consume_ );
}

// called from amqp thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ class reliable_amqp_publisher {
/// \param data message to send
void publish_message_raw(std::vector<char>&& data);

/// \param routing_key if empty() uses class provided default routing_key
/// \param correlation_id if not empty() sets as correlation id of the message envelope
/// \param data message to send
void publish_message_raw(const std::string& correlation_id, std::vector<char>&& data);
void publish_message_raw(const std::string& routing_key, const std::string& correlation_id, std::vector<char>&& data);

/// Publish messages. May be called from any thread.
/// \param queue set of messages to send in one transaction <routing_key, message_data>
Expand Down
16 changes: 9 additions & 7 deletions libraries/amqp/reliable_amqp_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct reliable_amqp_publisher_impl {
~reliable_amqp_publisher_impl();
void pump_queue();
void publish_message_raw(std::vector<char>&& data);
void publish_message_raw(const std::string& correlation_id, std::vector<char>&& data);
void publish_message_raw(const std::string& routing_key, const std::string& correlation_id, std::vector<char>&& data);
void publish_messages_raw(std::deque<std::pair<std::string, std::vector<char>>>&& queue);
void publish_message_direct(const std::string& routing_key, const std::string& correlation_id,
std::vector<char> data, reliable_amqp_publisher::error_callback_t on_error);
Expand Down Expand Up @@ -213,17 +213,19 @@ void reliable_amqp_publisher_impl::publish_message_raw(std::vector<char>&& data)
pump_queue();
}

void reliable_amqp_publisher_impl::publish_message_raw(const std::string& correlation_id, std::vector<char>&& data) {
void reliable_amqp_publisher_impl::publish_message_raw(const std::string& routing_key,
const std::string& correlation_id,
std::vector<char>&& data) {
if(!ctx.get_executor().running_in_this_thread()) {
boost::asio::post(user_submitted_work_strand, [this, d=std::move(data), id=correlation_id]() mutable {
publish_message_raw(id, std::move(d));
boost::asio::post(user_submitted_work_strand, [this, d=std::move(data), rk=routing_key, id=correlation_id]() mutable {
publish_message_raw(rk, id, std::move(d));
});
return;
}

verify_max_queue_size();

message_deque.emplace_back(amqp_message{0, "", correlation_id, std::move(data)});
message_deque.emplace_back(amqp_message{0, routing_key, correlation_id, std::move(data)});
pump_queue();
}

Expand Down Expand Up @@ -283,8 +285,8 @@ void reliable_amqp_publisher::publish_message_raw(std::vector<char>&& data) {
my->publish_message_raw( std::move( data ) );
}

void reliable_amqp_publisher::publish_message_raw(const std::string& correlation_id, std::vector<char>&& data) {
my->publish_message_raw( correlation_id, std::move( data ) );
void reliable_amqp_publisher::publish_message_raw(const std::string& routing_key, const std::string& correlation_id, std::vector<char>&& data) {
my->publish_message_raw( routing_key, correlation_id, std::move( data ) );
}

void reliable_amqp_publisher::publish_messages_raw(std::deque<std::pair<std::string, std::vector<char>>>&& queue) {
Expand Down
77 changes: 30 additions & 47 deletions plugins/amqp_trace_plugin/amqp_trace_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ amqp_trace_plugin::amqp_trace_plugin()
amqp_trace_plugin::~amqp_trace_plugin() {}

void amqp_trace_plugin::publish_error( std::string tid, int64_t error_code, std::string error_message ) {
my->publish_error( std::move(tid), error_code, std::move(error_message) );
my->publish_error( std::string(), std::move(tid), error_code, std::move(error_message) );
}

void amqp_trace_plugin::set_program_options(options_description& cli, options_description& cfg) {
Expand Down Expand Up @@ -56,57 +56,40 @@ void amqp_trace_plugin::plugin_initialize(const variables_map& options) {
}

void amqp_trace_plugin::plugin_startup() {
if( !my->started ) {
handle_sighup();
try {
ilog( "Starting amqp_trace_plugin" );
my->started = true;

const boost::filesystem::path trace_data_dir_path = appbase::app().data_dir() / "amqp_trace_plugin";
const boost::filesystem::path trace_data_file_path = trace_data_dir_path / "trace.bin";
if( my->pub_reliable_mode != amqp_trace_plugin_impl::reliable_mode::queue ) {
EOS_ASSERT( !fc::exists(trace_data_file_path), chain::plugin_config_exception,
"Existing queue file when amqp-trace-reliable-mode != 'queue': ${f}",
("f", trace_data_file_path.generic_string()) );
} else if( auto resmon_plugin = app().find_plugin<resource_monitor_plugin>() ) {
resmon_plugin->monitor_directory( trace_data_dir_path );
}

my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name, trace_data_file_path,
[]( const std::string& err ) {
elog( "AMQP fatal error: ${e}", ("e", err) );
appbase::app().quit();
} );

auto chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" );

applied_transaction_connection.emplace(
chain_plug->chain().applied_transaction.connect(
[me = my]( std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t ) {
me->on_applied_transaction( std::get<0>( t ), std::get<1>( t ) );
} ) );

} catch( ... ) {
// always want plugin_shutdown even on exception
plugin_shutdown();
throw;
}
ilog( "Starting amqp_trace_plugin" );

const boost::filesystem::path trace_data_dir_path = appbase::app().data_dir() / "amqp_trace_plugin";
const boost::filesystem::path trace_data_file_path = trace_data_dir_path / "trace.bin";
if( my->pub_reliable_mode != amqp_trace_plugin_impl::reliable_mode::queue ) {
EOS_ASSERT( !fc::exists( trace_data_file_path ), chain::plugin_config_exception,
"Existing queue file when amqp-trace-reliable-mode != 'queue': ${f}",
("f", trace_data_file_path.generic_string()) );
} else if( auto resmon_plugin = app().find_plugin<resource_monitor_plugin>() ) {
resmon_plugin->monitor_directory( trace_data_dir_path );
}
}

void amqp_trace_plugin::plugin_shutdown() {
if( my->started ) {
try {
dlog( "shutdown.." );
my->amqp_trace.emplace( my->amqp_trace_address, my->amqp_trace_exchange, my->amqp_trace_queue_name,
trace_data_file_path,
[]( const std::string& err ) {
elog( "AMQP fatal error: ${e}", ("e", err) );
appbase::app().quit();
} );

applied_transaction_connection.reset();
auto chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" );

dlog( "exit amqp_trace_plugin" );
}
FC_CAPTURE_AND_RETHROW()
my->started = false;
}
applied_transaction_connection.emplace(
chain_plug->chain().applied_transaction.connect(
[me = my]( std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t ) {
me->on_applied_transaction( std::get<0>( t ), std::get<1>( t ) );
} ) );

}

void amqp_trace_plugin::plugin_shutdown() {
applied_transaction_connection.reset();
dlog( "exit amqp_trace_plugin" );
}

void amqp_trace_plugin::handle_sighup() {
Expand Down
19 changes: 10 additions & 9 deletions plugins/amqp_trace_plugin/amqp_trace_plugin_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ std::ostream& operator<<(std::ostream& osm, amqp_trace_plugin_impl::reliable_mod
}

// Can be called from any thread except reliable_amqp_publisher thread
void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code, std::string error_message ) {
void amqp_trace_plugin_impl::publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message ) {
try {
// reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns
amqp_trace->post_on_io_context(
[&amqp_trace = *amqp_trace, mode=pub_reliable_mode,
[&amqp_trace = *amqp_trace, mode=pub_reliable_mode, rk{std::move(routing_key)},
tid{std::move(tid)}, error_code, em{std::move(error_message)}]() mutable {
transaction_trace_msg msg{transaction_trace_exception{error_code}};
std::get<transaction_trace_exception>( msg ).error_message = std::move( em );
std::vector<char> buf = convert_to_bin( msg );
if( mode == reliable_mode::queue) {
amqp_trace.publish_message_raw( tid, std::move( buf ) );
amqp_trace.publish_message_raw( rk, tid, std::move( buf ) );
} else {
amqp_trace.publish_message_direct( std::string(), tid, std::move( buf ),
amqp_trace.publish_message_direct( rk, tid, std::move( buf ),
[mode]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
if( mode == reliable_mode::exit )
Expand All @@ -62,18 +62,19 @@ void amqp_trace_plugin_impl::publish_error( std::string tid, int64_t error_code,
void amqp_trace_plugin_impl::on_applied_transaction( const chain::transaction_trace_ptr& trace,
const chain::packed_transaction_ptr& t ) {
try {
publish_result( t, trace );
publish_result( std::string(), t, trace );
}
FC_LOG_AND_DROP()
}

// called from application thread
void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr& trx,
void amqp_trace_plugin_impl::publish_result( std::string routing_key,
const chain::packed_transaction_ptr& trx,
const chain::transaction_trace_ptr& trace ) {
try {
// reliable_amqp_publisher ensures that any post_on_io_context() is called before its dtor returns
amqp_trace->post_on_io_context(
[&amqp_trace = *amqp_trace, trx, trace, mode=pub_reliable_mode]() {
[&amqp_trace = *amqp_trace, trx, trace, rk=std::move(routing_key), mode=pub_reliable_mode]() {
if( !trace->except ) {
dlog( "chain accepted transaction, bcast ${id}", ("id", trace->id) );
} else {
Expand All @@ -82,9 +83,9 @@ void amqp_trace_plugin_impl::publish_result( const chain::packed_transaction_ptr
transaction_trace_msg msg{ eosio::state_history::convert( *trace ) };
std::vector<char> buf = convert_to_bin( msg );
if( mode == reliable_mode::queue) {
amqp_trace.publish_message_raw( trx->id(), std::move( buf ) );
amqp_trace.publish_message_raw( rk, trx->id(), std::move( buf ) );
} else {
amqp_trace.publish_message_direct( std::string(), trx->id(), std::move( buf ),
amqp_trace.publish_message_direct( rk, trx->id(), std::move( buf ),
[mode]( const std::string& err ) {
elog( "AMQP direct message error: ${e}", ("e", err) );
if( mode == reliable_mode::exit )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@ struct amqp_trace_plugin_impl : std::enable_shared_from_this<amqp_trace_plugin_i
std::string amqp_trace_address;
std::string amqp_trace_queue_name;
std::string amqp_trace_exchange;
reliable_mode pub_reliable_mode;
bool started = false;
reliable_mode pub_reliable_mode = reliable_mode::queue;

public:

// called from any thread
void publish_error( std::string tid, int64_t error_code, std::string error_message );
void publish_error( std::string routing_key, std::string tid, int64_t error_code, std::string error_message );

// called on application thread
void on_applied_transaction(const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t);

private:

// called from application thread
void publish_result( const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
// called from any thread
void publish_result( std::string routing_key, const chain::packed_transaction_ptr& trx, const chain::transaction_trace_ptr& trace );
};

std::istream& operator>>(std::istream& in, amqp_trace_plugin_impl::reliable_mode& m);
Expand Down
Loading