Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

amqp_trx_plugin - transaction in, transaction_trace out #9181

Merged
merged 67 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
bb48710
WIP: initial outline of rabbitmq_trx_plugin
heifner May 18, 2020
96d520e
WIP: first cut at publish
heifner May 18, 2020
176d598
WIP: Add publish of drop of trx"
heifner May 18, 2020
b8654a2
WIP: Adding support to cleos
heifner May 19, 2020
d5e591d
Merge remote-tracking branch 'origin/develop' into rabbitmq-trx-dev
heifner May 22, 2020
a5d81c1
Simplify unpacking of message
heifner May 22, 2020
5e27754
Change logging and add quit() on error
heifner May 22, 2020
2b1f167
Add simple support for trx/trace rabbitmq
heifner May 22, 2020
3abc4dd
Merge remote-tracking branch 'origin/develop' into rabbitmq-trx-dev
heifner May 26, 2020
08e4a4f
Make rabbitmq exchange configurable
heifner May 26, 2020
ee0bb75
Implement rabbitmq_publish_all_traces option
heifner May 26, 2020
5627b0a
Use nomenclature AMQP instead of RabbitMQ
heifner May 27, 2020
a5b3f02
Add ability to test with rabbitmq
heifner May 28, 2020
36ca869
destroy amqp before thread_pool to fix crash. ack consumed traces.
heifner May 29, 2020
1e7d097
Better shutdown logic
heifner May 29, 2020
7d28362
Add integration test for rabbitmq
heifner May 29, 2020
d94fac9
Merge remote-tracking branch 'origin/develop' into rabbitmq-trx-dev
heifner May 29, 2020
5080366
Add new amqp_trx logger to example logging.json
heifner May 29, 2020
1be90c7
Turn on debug level logging for launcher for trace_api and ampq_trx l…
heifner May 29, 2020
dc19fed
Add support for empty "" default value
heifner May 29, 2020
ef410ef
Revert "Merge pull request #9134 from EOSIO/test-framework-archiving"
heifner May 29, 2020
f8116cc
Revert "Revert "Merge pull request #9134 from EOSIO/test-framework-ar…
heifner Jun 1, 2020
0dd804d
Populate amqp connection string and conditionally run rabbitmq test
parrinc Jun 3, 2020
8a4b06d
Merge branch 'develop-boxed' into rabbitmq-trx-dev
heifner Jun 4, 2020
754a9ee
Merge branch 'develop-boxed' into rabbitmq-trx-dev
heifner Jun 5, 2020
ef104ae
Drain application queue before disconnecting from transction singal
heifner Jun 5, 2020
299f841
Re-work shutdown logic
heifner Jun 5, 2020
e7a4d16
Don't ack message as other consumers may be interested in the trace
heifner Jun 5, 2020
14f08dd
Temp rm app().quit()
heifner Jun 9, 2020
98c993f
Add some info level logging
heifner Jun 10, 2020
7b01617
ioservice for send
heifner Jun 10, 2020
48026b1
Temp logging of publish result
heifner Jun 10, 2020
4cf9c9b
Revert "ioservice for send"
heifner Jun 11, 2020
eb53f95
Add cerr of amqp publish
heifner Jun 11, 2020
4491a8b
Remove debug output
heifner Jun 11, 2020
cd24672
Initial checkin of amqp_trace_plugin
heifner Jun 11, 2020
c662157
Use amqp_trace_plugin for publish of traces. Address peer review comm…
heifner Jun 11, 2020
8c8c24d
Add new amqp_trace_plugin needed for test
heifner Jun 11, 2020
ea3ca24
trx queue
heifner Jun 11, 2020
44f932d
Make amqp_trace_plugin depend on amqp_trx_plugin
heifner Jun 11, 2020
f08bbc2
Correct check for if amqp is in use
heifner Jun 11, 2020
56db06e
Guard agaisnt getting report of trace twice
heifner Jun 11, 2020
9e177e7
Stop thread pool before destroying amqp objects
heifner Jun 12, 2020
7665a7d
Enable debug level logging of default logger
heifner Jun 12, 2020
7420f95
Add extra debug logging
heifner Jun 12, 2020
a2644a8
Use diff thread pool (io_service) for each amqp as mixing causing issues
heifner Jun 12, 2020
0120831
Work around duplicate message handling in amqp integration test
heifner Jun 12, 2020
5d43e92
Fix description
heifner Jun 16, 2020
d18d39b
Move into same scope as eosio::amqp
heifner Jun 16, 2020
b9b43ec
add rabbit to cicd images
Jun 16, 2020
d03eea1
Merge remote-tracking branch 'origin/develop-boxed' into rabbitmq-trx…
heifner Jun 17, 2020
8b46cd0
Check state of amqp_trace_plugin instead of setting up dependency
heifner Jun 17, 2020
381069d
Remove duplicate code amqp_publish_hander
heifner Jun 17, 2020
7e79625
Verify queue ready before returning
heifner Jun 17, 2020
84ffede
Modify cleos with amqp to only submit trx and not try to report trace
heifner Jun 17, 2020
cd28210
Call declare queue on amqp thread
heifner Jun 18, 2020
3f111d3
Use raw promise to avoid race condition
heifner Jun 18, 2020
fc78974
Always startup amqp_trace_plugin before amqp_trx_plugin if specified
heifner Jun 18, 2020
259da4c
Handle shutdown in case of exception
heifner Jun 18, 2020
0de2a11
Use weak ptr to avoid keeping impl alive after shutdown
heifner Jun 18, 2020
f910787
No coordination between http_plugin posted tasks and amqp_trx_plugin …
heifner Jun 18, 2020
bfd69da
Merge remote-tracking branch 'origin/develop-boxed' into rabbitmq-trx…
heifner Jun 19, 2020
6366fe1
Make sure thread_pool is stopped if exception before amqp destroyed
heifner Jun 19, 2020
6b88873
Refactor amqp handler to encapsoluate thread and provide safer interface
heifner Jun 19, 2020
b8f7acb
Merge remote-tracking branch 'origin/develop' into rabbitmq-trx-dev
heifner Jun 19, 2020
aa67fb5
Merge remote-tracking branch 'origin/develop-boxed' into rabbitmq-trx…
heifner Jun 19, 2020
e3c26d3
Peer review changes and fix problem with multiple calls to stop()
heifner Jun 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .cicd/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ set -eo pipefail
. ./.cicd/helpers/general.sh
mkdir -p $BUILD_DIR
CMAKE_EXTRAS="-DCMAKE_BUILD_TYPE='Release' -DENABLE_MULTIVERSION_PROTOCOL_TEST=true -DENABLE_UNITY_BUILD=ON"
if [[ "$BUILDKITE" == 'true' ]]; then
SECRETS_DIR='/System/Volumes/Data/Network/NAS/MAC_FLEET/ANKA/secrets'
AMQP_CONNECTION_STRING=${AMQP_CONNECTION_STRING:-"$(cat $SECRETS_DIR/amqp-connection-string.txt)"}
CMAKE_EXTRAS="$CMAKE_EXTRAS -DAMQP_CONN_STR='"$AMQP_CONNECTION_STRING"'"
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
fi
if [[ "$(uname)" == 'Darwin' && $FORCE_LINUX != true ]]; then
# You can't use chained commands in execute
if [[ "$GITHUB_ACTIONS" == 'true' ]]; then
Expand Down Expand Up @@ -59,4 +64,4 @@ else # Linux
COMMANDS="$PRE_COMMANDS && $COMMANDS"
echo "$ docker run $ARGS $(buildkite-intrinsics) $FULL_TAG bash -c \"$COMMANDS\""
eval docker run $ARGS $(buildkite-intrinsics) $FULL_TAG bash -c \"$COMMANDS\"
fi
fi
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ add_subdirectory(db_size_api_plugin)
add_subdirectory(login_plugin)
add_subdirectory(test_control_plugin)
add_subdirectory(test_control_api_plugin)
add_subdirectory(amqp_trx_plugin)

# Forward variables to top level so packaging picks them up
set(CPACK_DEBIAN_PACKAGE_DEPENDS ${CPACK_DEBIAN_PACKAGE_DEPENDS} PARENT_SCOPE)
7 changes: 7 additions & 0 deletions plugins/amqp_trx_plugin/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
file(GLOB HEADERS "include/eosio/amqp_trx_plugin/*.hpp")
add_library( amqp_trx_plugin
amqp_trx_plugin.cpp
${HEADERS} )

target_link_libraries( amqp_trx_plugin chain_plugin appbase fc amqpcpp )
target_include_directories( amqp_trx_plugin PUBLIC include )
241 changes: 241 additions & 0 deletions plugins/amqp_trx_plugin/amqp_trx_plugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
#include <eosio/amqp_trx_plugin/amqp_trx_plugin.hpp>
#include <eosio/amqp_trx_plugin/amqp_handler.hpp>
#include <eosio/chain_plugin/chain_plugin.hpp>

#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/transaction.hpp>
#include <eosio/chain/thread_utils.hpp>

#include <boost/signals2/connection.hpp>

namespace {

static appbase::abstract_plugin& amqp_trx_plugin_ = appbase::app().register_plugin<eosio::amqp_trx_plugin>();

const fc::string logger_name{"amqp_trx"};
fc::logger logger;

constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB

} // anonymous

namespace eosio {

using boost::signals2::scoped_connection;

struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl> {

chain_plugin* chain_plug = nullptr;
// use thread pool even though only one thread currently since it provides simple interface for ioc
std::optional<eosio::chain::named_thread_pool> thread_pool;
std::optional<amqp> amqp_trx;
std::optional<amqp> amqp_trace;
std::optional<scoped_connection> applied_transaction_connection;

std::string amqp_trx_address;
std::string amqp_trx_exchange;
bool amqp_trx_publish_all_traces = false;
std::atomic<uint32_t> trx_in_progress_size{0};
std::atomic<bool> shutting_down = false;

// called from amqp thread
bool consume_message( const char* buf, size_t s ) {
try {
fc::datastream<const char*> ds( buf, s );
transaction_msg msg;
fc::raw::unpack(ds, msg);
if( msg.contains<chain::packed_transaction_v0>() ) {
auto ptr = std::make_shared<chain::packed_transaction>( std::move( msg.get<chain::packed_transaction_v0>() ), true );
handle_message( std::move( ptr ) );
} else if( msg.contains<chain::packed_transaction>() ) {
auto ptr = std::make_shared<chain::packed_transaction>( std::move( msg.get<chain::packed_transaction>() ) );
handle_message( std::move( ptr ) );
} else {
FC_THROW_EXCEPTION( fc::out_of_range_exception, "Invalid which ${w} for consume of transaction_type message",
("w", msg.which()) );
}
return true;
} FC_LOG_AND_DROP()
return false;
}

// only called if amqp-trx-publish-all-traces=true
// called on application thread
void on_applied_transaction(const chain::transaction_trace_ptr& trace, const chain::packed_transaction_ptr& t) {
try {
boost::asio::post( thread_pool->get_executor(), [my=shared_from_this(), trace, t]() {
my->publish_result( t, trace );
} );
} FC_LOG_AND_DROP()
}

private:

// called from amqp thread
void handle_message( chain::packed_transaction_ptr trx ) {
const auto& tid = trx->id();
fc_dlog( logger, "received packed_transaction ${id}", ("id", tid) );

auto trx_in_progress = trx_in_progress_size.load();
if( trx_in_progress > def_max_trx_in_progress_size ) {
fc_wlog( logger, "Dropping trx, too many trx in progress ${s} bytes", ("s", trx_in_progress) );
transaction_trace_msg msg{ transaction_trace_exception{ chain::tx_resource_exhaustion::code_enum::code_value } };
msg.get<transaction_trace_exception>().error_message =
"Dropped trx, too many trx in progress " + std::to_string( trx_in_progress ) + " bytes";
auto buf = fc::raw::pack( msg );
amqp_trace->publish( amqp_trx_exchange, tid.str(), buf.data(), buf.size() );
return;
}

trx_in_progress_size += trx->get_estimated_size();
app().post( priority::medium_low, [my=shared_from_this(), trx{std::move(trx)}]() {
my->chain_plug->accept_transaction( trx,
[my, trx](const fc::static_variant<fc::exception_ptr, chain::transaction_trace_ptr>& result) mutable {
// if amqp-trx-publish-all-traces=true and a transaction trace then no need to publish here as already published
if( my->amqp_trx_publish_all_traces && result.contains<chain::transaction_trace_ptr>() ) {
my->trx_in_progress_size -= trx->get_estimated_size();
} else {
boost::asio::post( my->thread_pool->get_executor(), [my, trx = std::move( trx ), result = result]() {
my->publish_result( trx, result );
my->trx_in_progress_size -= trx->get_estimated_size();
} );
}
} );
} );
}

// called from amqp thread
void publish_result( const chain::packed_transaction_ptr& trx,
const fc::static_variant<fc::exception_ptr, chain::transaction_trace_ptr>& result ) {

try {
if( result.contains<fc::exception_ptr>() ) {
auto& ex = *result.get<fc::exception_ptr>();
std::string err = ex.to_string();
fc_dlog( logger, "bad packed_transaction : ${e}", ("e", err) );
transaction_trace_exception tex{ ex.code() };
fc::unsigned_int which = transaction_trace_msg::tag<transaction_trace_exception>::value;
// TODO; use fc::datastream<std::vector<char>> when available
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
uint32_t payload_size = fc::raw::pack_size( which );
payload_size += fc::raw::pack_size( tex.error_code );
payload_size += fc::raw::pack_size( err );
std::vector<char> buf( payload_size );
fc::datastream<char*> ds( buf.data(), payload_size );
fc::raw::pack( ds, which );
fc::raw::pack( ds, tex.error_code );
fc::raw::pack( ds, err );
amqp_trace->publish( amqp_trx_exchange, trx->id(), buf.data(), buf.size() );

} else {
const auto& trace = result.get<chain::transaction_trace_ptr>();
if( !trace->except ) {
fc_dlog( logger, "chain accepted transaction, bcast ${id}", ("id", trace->id) );
} else {
fc_dlog( logger, "trace except : ${m}", ("m", trace->except->to_string()) );
}
fc::unsigned_int which = transaction_trace_msg::tag<chain::transaction_trace>::value;
// TODO; use fc::datastream<std::vector<char>> when available
uint32_t payload_size = fc::raw::pack_size( which );
payload_size += fc::raw::pack_size( *trace );
std::vector<char> buf( payload_size );
fc::datastream<char*> ds( buf.data(), payload_size );
fc::raw::pack( ds, which );
fc::raw::pack( ds, *trace );
amqp_trace->publish( amqp_trx_exchange, trx->id(), buf.data(), buf.size() );
}
} FC_LOG_AND_DROP()
}

};

amqp_trx_plugin::amqp_trx_plugin()
: my(std::make_shared<amqp_trx_plugin_impl>()) {}

amqp_trx_plugin::~amqp_trx_plugin() {}

void amqp_trx_plugin::set_program_options(options_description& cli, options_description& cfg) {
auto op = cfg.add_options();
op("amqp-trx-address", bpo::value<std::string>(),
"AMQP address: Format: amqp://USER:PASSWORD@ADDRESS:PORT\n"
"Will consume from 'trx' queue and publish to 'trace' queue.");
op("amqp-trx-exchange", bpo::value<std::string>()->default_value(""),
"Existing AMQP exchange to send transaction trace messages.");
op("amqp-trx-publish-all-traces", bpo::bool_switch()->default_value(false),
"If specified then all traces will be published; otherwise only traces for consumed 'trx' queue transactions.");
}

void amqp_trx_plugin::plugin_initialize(const variables_map& options) {
try {
my->chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( my->chain_plug, chain::missing_chain_plugin_exception, "chain_plugin required" );

EOS_ASSERT( options.count("amqp-trx-address"), chain::plugin_config_exception, "amqp-trx-address required" );
my->amqp_trx_address = options.at("amqp-trx-address").as<std::string>();
my->amqp_trx_exchange = options.at("amqp-trx-exchange").as<std::string>();
my->amqp_trx_publish_all_traces = options.at("amqp-trx-publish-all-traces").as<bool>();
}
FC_LOG_AND_RETHROW()
}

void amqp_trx_plugin::plugin_startup() {
handle_sighup();
try {

my->thread_pool.emplace( "amqp_t", 1 );

my->amqp_trx.emplace( logger, my->thread_pool->get_executor(), my->amqp_trx_address, "trx" );
my->amqp_trace.emplace( logger, my->thread_pool->get_executor(), my->amqp_trx_address, "trace" );

auto& consumer = my->amqp_trx->consume();
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
consumer.onSuccess( []( const std::string& consumer_tag ) {
fc_dlog( logger, "consume started: ${tag}", ("tag", consumer_tag) );
} );
consumer.onError( []( const char* message ) {
fc_wlog( logger, "consume failed: ${e}", ("e", message) );
} );
consumer.onReceived( [my=my](const AMQP::Message& message, uint64_t delivery_tag, bool redelivered) {
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
if( my->shutting_down ) return;
if( my->consume_message( message.body(), message.bodySize() ) ) {
my->amqp_trx->ack( delivery_tag );
} else {
my->amqp_trx->reject( delivery_tag );
}
} );

if( my->amqp_trx_publish_all_traces ) {
auto& chain_plug = app().get_plugin<chain_plugin>();
my->applied_transaction_connection.emplace(
chain_plug.chain().applied_transaction.connect(
[&]( std::tuple<const chain::transaction_trace_ptr&, const chain::packed_transaction_ptr&> t ) {
my->on_applied_transaction( std::get<0>( t ), std::get<1>( t ) );
} ) );
}

} catch( ... ) {
// always want plugin_shutdown even on exception
plugin_shutdown();
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
throw;
}
}

void amqp_trx_plugin::plugin_shutdown() {
try {
fc_dlog( logger, "shutdown.." );
my->shutting_down = true; // stop receiving transactions to consume
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
my->applied_transaction_connection.reset();
// drain queue so all traces are published
app().post( priority::lowest, [me = my](){} );
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
if( my->thread_pool ) {
my->thread_pool->stop();
}
app().post( priority::lowest, [me = my](){} ); // keep my pointer alive until queue is drained
fc_dlog( logger, "exit shutdown" );
}
FC_CAPTURE_AND_RETHROW()
}

void amqp_trx_plugin::handle_sighup() {
fc::logger::update( logger_name, logger );
}

} // namespace eosio
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
#include <amqpcpp.h>
#include <amqpcpp/libboostasio.h>
#include <amqpcpp/linux_tcp.h>

#include <fc/log/logger.hpp>

namespace eosio {

class amqp_handler : public AMQP::LibBoostAsioHandler {
public:
explicit amqp_handler( boost::asio::io_service& io_service ) : AMQP::LibBoostAsioHandler( io_service ) {}

void onError( AMQP::TcpConnection* connection, const char* message ) override {
elog( "amqp connection failed: ${m}", ("m", message) );
}

uint16_t onNegotiate( AMQP::TcpConnection* connection, uint16_t interval ) override {
return 0; // disable heartbeats
}
};

class amqp {
fc::logger logger_;
std::unique_ptr<amqp_handler> handler_;
std::unique_ptr<AMQP::TcpConnection> connection_;
std::unique_ptr<AMQP::TcpChannel> channel_;
std::string name_;

public:
amqp( fc::logger logger, boost::asio::io_service& io_service, const std::string& address, std::string name )
: logger_(std::move( logger ) )
, name_( std::move( name ) )
{
AMQP::Address amqp_address( address );
fc_dlog( logger_, "Connecting to AMQP address ${a} - Queue: ${q}...",
("a", std::string( amqp_address ))( "q", name_ ) );

handler_ = std::make_unique<amqp_handler>( io_service );
connection_ = std::make_unique<AMQP::TcpConnection>( handler_.get(), amqp_address );
channel_ = std::make_unique<AMQP::TcpChannel>( connection_.get() );
declare_queue();
}

void publish( const string& exchange, const std::string& correlation_id, const char* data, size_t data_size ) {
AMQP::Envelope env( data, data_size );
env.setCorrelationID( correlation_id );
channel_->publish( exchange, name_, env, 0 );
}

auto& consume() { return channel_->consume( name_ ); }

void ack( uint64_t delivery_tag ) {
channel_->ack( delivery_tag );
}

void reject( uint64_t delivery_tag ) {
channel_->reject( delivery_tag );
}

private:
void declare_queue() {
auto& queue = channel_->declareQueue( name_, AMQP::durable );
queue.onSuccess( [this]( const std::string& name, uint32_t messagecount, uint32_t consumercount ) {
fc_dlog( logger_, "AMQP Connected Successfully!\n Queue ${q} - Messages: ${mc} - Consumers: ${cc}",
("q", name)( "mc", messagecount )( "cc", consumercount ) );
} );
queue.onError( [this]( const char* error_message ) {
std::string err = "AMQP Queue error: " + std::string( error_message );
fc_elog( logger_, err );
app().quit();
} );
}
};

} // namespace eosio
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once
#include <eosio/chain_plugin/chain_plugin.hpp>
#include <appbase/application.hpp>

namespace eosio {

// consume message types
using transaction_msg = fc::static_variant<chain::packed_transaction_v0, chain::packed_transaction>;

// publish message types
struct transaction_trace_exception {
int64_t error_code; ///< fc::exception code()
std::string error_message;
};
using transaction_trace_msg = fc::static_variant<transaction_trace_exception, chain::transaction_trace>;

class amqp_trx_plugin : public appbase::plugin<amqp_trx_plugin> {

public:
APPBASE_PLUGIN_REQUIRES((chain_plugin))

amqp_trx_plugin();
virtual ~amqp_trx_plugin();

virtual void set_program_options(options_description& cli, options_description& cfg) override;
void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();
void handle_sighup() override;

private:
std::shared_ptr<struct amqp_trx_plugin_impl> my;
};

} // namespace eosio

FC_REFLECT( eosio::transaction_trace_exception, (error_code)(error_message) );
Loading