Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

ack amqp input trx after execution #9303

Merged
merged 1 commit into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ class amqp {
public:
// called from amqp thread on errors
using on_error_t = std::function<void(const std::string& err)>;
// delivery_tag type of consume, use for ack/reject
using delivery_tag_t = uint64_t;
// called from amqp thread on consume of message
// return true for ack, false for reject
using on_consume_t = std::function<bool(const char* buf, size_t s)>;
using on_consume_t = std::function<void(const delivery_tag_t& delivery_tag, const char* buf, size_t s)>;

/// @param address AMQP address
/// @param name AMQP routing key
/// @param on_err callback for errors, called from amqp thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr )
: thread_pool_( "ampq", 1) // amqp is not thread safe, use only one thread
, name_( std::move( name ) )
Expand Down Expand Up @@ -84,6 +86,26 @@ class amqp {
}
}

/// ack consume message
void ack( const delivery_tag_t& delivery_tag ) {
boost::asio::post( *handler_->amqp_strand(),
[my = this, delivery_tag]() {
try {
my->channel_->ack( delivery_tag );
} FC_LOG_AND_DROP()
} );
}

// reject consume message
void reject( const delivery_tag_t& delivery_tag ) {
boost::asio::post( *handler_->amqp_strand(),
[my = this, delivery_tag]() {
try {
my->channel_->reject( delivery_tag );
} FC_LOG_AND_DROP()
} );
}

private:
// called amqp thread
void init( AMQP::TcpConnection* c ) {
Expand Down Expand Up @@ -118,13 +140,8 @@ class amqp {
on_error( message );
connected_.set_value();
} );
consumer.onReceived( [&](const AMQP::Message& message, uint64_t delivery_tag, bool redelivered) {
bool r = on_consume_( message.body(), message.bodySize() );
if( r ) {
channel_->ack( delivery_tag );
} else {
channel_->reject( delivery_tag );
}
consumer.onReceived( [&](const AMQP::Message& message, const delivery_tag_t& delivery_tag, bool redelivered) {
on_consume_( delivery_tag, message.body(), message.bodySize() );
} );
}

Expand Down
25 changes: 14 additions & 11 deletions plugins/amqp_trx_plugin/amqp_trx_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
std::atomic<uint32_t> trx_in_progress_size{0};

// called from amqp thread
bool consume_message( const char* buf, size_t s ) {
void consume_message( const eosio::amqp::delivery_tag_t& delivery_tag, const char* buf, size_t s ) {
try {
fc::datastream<const char*> ds( buf, s );
fc::unsigned_int which;
Expand All @@ -40,29 +40,31 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
chain::packed_transaction_v0 v0;
fc::raw::unpack(ds, v0);
auto ptr = std::make_shared<chain::packed_transaction>( std::move( v0 ), true );
handle_message( std::move( ptr ) );
handle_message( delivery_tag, std::move( ptr ) );
} else if ( which == fc::unsigned_int(transaction_msg::tag<chain::packed_transaction>::value) ) {
auto ptr = std::make_shared<chain::packed_transaction>();
fc::raw::unpack(ds, *ptr);
handle_message( std::move( ptr ) );
handle_message( delivery_tag, std::move( ptr ) );
} else {
FC_THROW_EXCEPTION( fc::out_of_range_exception, "Invalid which ${w} for consume of transaction_type message", ("w", which) );
}
return true;
return;
} FC_LOG_AND_DROP()
return false;

amqp_trx->reject( delivery_tag );
}

private:

// called from amqp thread
void handle_message( chain::packed_transaction_ptr trx ) {
void handle_message( const eosio::amqp::delivery_tag_t& delivery_tag, chain::packed_transaction_ptr trx ) {
const auto& tid = trx->id();
dlog( "received packed_transaction ${id}", ("id", tid) );

auto trx_in_progress = trx_in_progress_size.load();
if( trx_in_progress > def_max_trx_in_progress_size ) {
wlog( "Dropping trx, too many trx in progress ${s} bytes", ("s", trx_in_progress) );
amqp_trx->reject( delivery_tag );
if( trace_plug ) {
std::string err = "Dropped trx, too many trx in progress " + std::to_string( trx_in_progress ) + " bytes";
trace_plug->publish_error( trx->id().str(), chain::tx_resource_exhaustion::code_enum::code_value, std::move(err) );
Expand All @@ -71,9 +73,10 @@ struct amqp_trx_plugin_impl : std::enable_shared_from_this<amqp_trx_plugin_impl>
}

trx_in_progress_size += trx->get_estimated_size();
app().post( priority::medium_low, [my=shared_from_this(), trx{std::move(trx)}]() {
app().post( priority::medium_low, [my=shared_from_this(), delivery_tag, trx{std::move(trx)}]() {
my->chain_plug->accept_transaction( trx,
[my, trx](const fc::static_variant<fc::exception_ptr, chain::transaction_trace_ptr>& result) mutable {
[my, delivery_tag, trx](const fc::static_variant<fc::exception_ptr, chain::transaction_trace_ptr>& result) mutable {
my->amqp_trx->ack( delivery_tag );
// publish to trace plugin as execptions are not reported via controller signal applied_transaction
if( result.contains<chain::exception_ptr>() ) {
auto& eptr = result.get<chain::exception_ptr>();
Expand Down Expand Up @@ -137,9 +140,9 @@ void amqp_trx_plugin::plugin_startup() {
elog( "amqp error: ${e}", ("e", err) );
app().quit();
},
[&]( const char* buf, size_t s ) {
if( app().is_quiting() ) return false;
return my->consume_message( buf, s );
[&]( const eosio::amqp::delivery_tag_t& delivery_tag, const char* buf, size_t s ) {
if( app().is_quiting() ) return; // leave non-ack
my->consume_message( delivery_tag, buf, s );
}
);

Expand Down