Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Reliable amqp_trx_plugin & amqp_trace_plugin #10127

Merged
merged 11 commits into from
Mar 18, 2021
Merged
163 changes: 92 additions & 71 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
#pragma once

#include <eosio/amqp/retrying_amqp_connection.hpp>
#include <eosio/amqp/util.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <fc/log/logger.hpp>
#include <fc/exception/exception.hpp>
#include <amqpcpp.h>
#include <amqpcpp/libboostasio.h>
#include <amqpcpp/linux_tcp.h>
#include <condition_variable>


namespace eosio {

/// Designed to work with internal io_service running on a dedicated thread.
/// All publish methods can be called from any thread, but should be sync'ed with stop() & destructor.
/// Calls on_consume from internal thread.
/// ack() & reject() & publish methods can be called from any thread, but should be sync'ed with stop() & destructor.
/// Constructor, stop(), destructor should be called from same thread.
class amqp {
class amqp_handler {
public:
// called from amqp thread or calling thread, but not concurrently
using on_error_t = std::function<void(const std::string& err)>;
Expand All @@ -28,8 +29,8 @@ class amqp {
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp( address, std::move(name), "", "", std::move(on_err), std::move(on_consume))
amqp_handler( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp_handler( address, std::move(name), "", "", std::move(on_err), std::move(on_consume))
{}

/// @param address AMQP address
Expand All @@ -38,54 +39,68 @@ class amqp {
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume))
amqp_handler( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp_handler( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume))
{}

/// drain queue and stop thread
~amqp() {
~amqp_handler() {
stop();
}

/// publish to AMQP address with routing key name
// on_error() called if not connected
void publish( std::string exchange, std::string correlation_id, std::vector<char> buf ) {
boost::asio::post( *handler_->amqp_strand(),
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), buf=std::move(buf)]() {
AMQP::Envelope env( buf.data(), buf.size() );
env.setCorrelationID( cid );
my->channel_->publish( exchange, my->name_, env, 0 );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
my->on_error( "AMQP Unable to publish: " + cid );
} );
}

/// publish to AMQP calling f() -> std::vector<char> on amqp thread
// on_error() called if not connected
template<typename Func>
void publish( std::string exchange, std::string correlation_id, Func f ) {
boost::asio::post( *handler_->amqp_strand(),
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), f=std::move(f)]() {
std::vector<char> buf = f();
AMQP::Envelope env( buf.data(), buf.size() );
env.setCorrelationID( cid );
my->channel_->publish( exchange, my->name_, env, 0 );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
my->on_error( "AMQP Unable to publish: " + cid );
} );
}

/// ack consume message
/// @param multiple true if given delivery_tag and all previous should be ack'ed
void ack( const delivery_tag_t& delivery_tag, bool multiple = false ) {
boost::asio::post( *handler_->amqp_strand(),
boost::asio::post( thread_pool_.get_executor(),
[my = this, delivery_tag, multiple]() {
try {
my->channel_->ack( delivery_tag, multiple ? AMQP::multiple : 0 );
if( my->channel_ )
my->channel_->ack( delivery_tag, multiple ? AMQP::multiple : 0 );
else
my->on_error( "AMQP Unable to ack: " + std::to_string(delivery_tag) );
} FC_LOG_AND_DROP()
} );
}

// reject consume message
void reject( const delivery_tag_t& delivery_tag ) {
boost::asio::post( *handler_->amqp_strand(),
boost::asio::post( thread_pool_.get_executor(),
[my = this, delivery_tag]() {
try {
my->channel_->reject( delivery_tag );
if( my->channel_ )
my->channel_->reject( delivery_tag );
else
my->on_error( "AMQP Unable to reject: " + std::to_string(delivery_tag) );
} FC_LOG_AND_DROP()
} );
}
Expand All @@ -94,11 +109,11 @@ class amqp {
/// Not thread safe, call only once from constructor/destructor thread
/// Do not call from lambda's passed to publish or constructor e.g. on_error
void stop() {
if( handler_ ) {
if( first_connect_.un_set() ) {
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
// drain amqp queue
std::promise<void> stop_promise;
auto future = stop_promise.get_future();
boost::asio::post( *handler_->amqp_strand(), [&]() {
boost::asio::post( thread_pool_.get_executor(), [&]() {
if( channel_ && channel_->usable() ) {
auto& cb = channel_->close();
cb.onFinalize( [&]() {
Expand All @@ -119,15 +134,16 @@ class amqp {
future.wait();

thread_pool_.stop();
handler_.reset();
}
}

private:

amqp(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume)
: thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread
, connected_future_( connected_.get_future() )
amqp_handler(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type,
on_error_t on_err, on_consume_t on_consume)
: first_connect_()
, thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread
, amqp_connection_( thread_pool_.get_executor(), address, [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} )
, name_( std::move( name ) )
, exchange_name_( std::move( exchange_name ) )
, exchange_type_( std::move( exchange_type ) )
Expand All @@ -137,23 +153,39 @@ class amqp {
AMQP::Address amqp_address( address );
ilog( "Connecting to AMQP address ${a} - Queue: ${q}...", ("a", amqp_address)("q", name_) );

handler_ = std::make_unique<amqp_handler>( *this, thread_pool_.get_executor() );
boost::asio::post( *handler_->amqp_strand(), [&]() {
connection_ = std::make_unique<AMQP::TcpConnection>( handler_.get(), amqp_address );
});
wait();
}

// called from non-amqp thread
void wait() {
if( !first_connect_.wait() ) {
boost::asio::post( thread_pool_.get_executor(), [this](){
on_error( "AMQP timeout connecting and declaring queue" );
} );
}
}

// called from amqp thread
void init( AMQP::TcpConnection* c ) {
channel_ = std::make_unique<AMQP::TcpChannel>( c );
void channel_ready(AMQP::Channel* c) {
channel_ = c;
init();
}

// called from amqp thread
void channel_failed() {
channel_ = nullptr;
}

// called from amqp thread
void init() {
if(!channel_) return;
if( !exchange_type_.empty() ) {
auto type = AMQP::direct;
if (exchange_type_ == "fanout") {
type = AMQP::fanout;
} else if (exchange_type_ != "direct") {
on_error( "Unsupported exchange type: " + exchange_type_ );
connected_.set_value();
first_connect_.set();
return;
}

Expand All @@ -164,7 +196,7 @@ class amqp {
} );
exchange.onError([this](const char* error_message) {
on_error( std::string("AMQP Queue error: ") + error_message );
connected_.set_value();
first_connect_.set();
});
} else {
auto& queue = channel_->declareQueue( name_, AMQP::durable );
Expand All @@ -175,81 +207,70 @@ class amqp {
} );
queue.onError( [&]( const char* error_message ) {
on_error( error_message );
connected_.set_value();
first_connect_.set();
} );
}
}

// called from amqp thread
void init_consume() {
if( !on_consume_ ) {
connected_.set_value();
first_connect_.set();
return;
}

auto& consumer = channel_->consume( name_ );
consumer.onSuccess( [&]( const std::string& consumer_tag ) {
dlog( "consume started: ${tag}", ("tag", consumer_tag) );
connected_.set_value();
first_connect_.set();
} );
consumer.onError( [&]( const char* message ) {
wlog( "consume failed: ${e}", ("e", message) );
on_error( message );
connected_.set_value();
first_connect_.set();
} );
consumer.onReceived( [&](const AMQP::Message& message, const delivery_tag_t& delivery_tag, bool redelivered) {
on_consume_( delivery_tag, message.body(), message.bodySize() );
} );
}

// called from non-amqp thread
void wait() {
auto r = connected_future_.wait_for( std::chrono::seconds( 10 ) );
if( r == std::future_status::timeout ) {
on_error( "AMQP timeout declaring queue" );
}
}

// called from amqp thread
void on_error( const std::string& message ) {
std::lock_guard<std::mutex> g(mtx_);
if( on_error_ ) on_error_( message );
}

class amqp_handler : public AMQP::LibBoostAsioHandler {
private:
class start_cond {
private:
std::mutex mtx_;
bool started_ = false;
std::condition_variable start_cond_;
public:
explicit amqp_handler( amqp& impl, boost::asio::io_service& io_service )
: AMQP::LibBoostAsioHandler( io_service )
, amqp_(impl)
{}

void onError( AMQP::TcpConnection* connection, const char* message ) override {
elog( "amqp connection failed: ${m}", ("m", message) );
amqp_.on_error( message );
}

uint16_t onNegotiate( AMQP::TcpConnection* connection, uint16_t interval ) override {
return 0; // disable heartbeats
void set() {
{
auto lock = std::scoped_lock(mtx_);
started_ = true;
}
start_cond_.notify_one();
}

void onReady(AMQP::TcpConnection* c) override {
amqp_.init( c );
bool wait() {
std::unique_lock<std::mutex> lk(mtx_);
return start_cond_.wait_for( lk, std::chrono::seconds( 10 ), [&]{ return started_; } );
}

strand_shared_ptr& amqp_strand() {
return _strand;
// only unset on shutdown, return prev value
bool un_set() {
auto lock = std::scoped_lock( mtx_ );
bool s = started_;
started_ = false;
return s;
}

amqp& amqp_;
};

private:
std::mutex mtx_;
start_cond first_connect_;
eosio::chain::named_thread_pool thread_pool_;
std::unique_ptr<amqp_handler> handler_;
std::unique_ptr<AMQP::TcpConnection> connection_;
std::unique_ptr<AMQP::TcpChannel> channel_;
std::promise<void> connected_;
std::future<void> connected_future_;
single_channel_retrying_amqp_connection amqp_connection_;
AMQP::Channel* channel_ = nullptr; // null when not connected
std::string name_;
std::string exchange_name_;
std::string exchange_type_;
Expand Down
21 changes: 17 additions & 4 deletions libraries/amqp/include/eosio/amqp/reliable_amqp_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@ namespace eosio {
* to already exist. Publishing to a non-existing exchange will effectively block
* reliable_amqp_publisher's queue until the exchange exists because the queue will be unable to make progress.
*
* At a large enough uncomfirmed queue depth (currently on the order of 1 million entries) reliable_amqp_publisher
* will start dropping messages after logging an error.
* At a large enough unconfirmed queue depth (currently on the order of 1 million entries) reliable_amqp_publisher
* will call fatal_error() and then start dropping messages after logging an error.
*/

class reliable_amqp_publisher {
public:
// Called from amqp thread when unconfirmed queue depth about to be exceeded.
using error_callback_t = std::function<void(const std::string& err)>;

/// Create a reliable queue to the given server publishing to the given exchange
/// \param server_url server url as amqp://...
/// \param exchange the exchange to publish to
/// \param routing_key on published messages, used if no routing_key provided for publish_message.. calls
/// \param unconfirmed_path path to save/load unconfirmed message to be tried again after stop/start
/// \param on_fatal_error called from amqp thread when unconfirmed queue depth about to be exceeded.
/// \param message_id optional message id to send with each message
reliable_amqp_publisher(const std::string& server_url, const std::string& exchange, const std::string& routing_key,
const boost::filesystem::path& unconfirmed_path, const std::optional<std::string>& message_id = {});
const boost::filesystem::path& unconfirmed_path, error_callback_t on_fatal_error,
const std::optional<std::string>& message_id = {});

/// Publish a message. May be called from any thread.
/// \param t serializable object
Expand All @@ -39,17 +44,25 @@ class reliable_amqp_publisher {
publish_message_raw(std::move(v));
}

/// \param data message to send
void publish_message_raw(std::vector<char>&& data);

/// \param correlation_id if not empty() sets as correlation id of the message envelope
/// \param data message to send
void publish_message_raw(const std::string& correlation_id, std::vector<char>&& data);

/// Publish messages. May be called from any thread.
/// \param queue set of messages to send in one transaction <routing_key, message_data>
void publish_messages_raw(std::deque<std::pair<std::string, std::vector<char>>>&& queue);

/// Publish message directly to AMQP queue
/// Bypasses all reliable mechanisms and just does a simple AMQP publish. Does not queue or retry.
/// \param routing_key if empty() uses class provided default routing_key
/// \param correlation_id if not empty() sets as correlation id of the message envelope
/// \param data message to send
void publish_message_direct(const std::string& routing_key, std::vector<char> data);
/// \param on_error() call from AMQP thread if unable to directly publish (e.g. not currently connected)
void publish_message_direct(const std::string& routing_key, const std::string& correlation_id,
std::vector<char> data, error_callback_t on_error);

/// reliable_amqp_publisher runs its own thread. In some cases it may be desirable to skip a needless thread jump
/// when performing work. This method will allow submission of work to reliable_amqp_publisher's thread.
Expand Down
Loading