Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10618 from EOSIO/EPE-1192-dont-declar-queue
Browse files Browse the repository at this point in the history
amqp_trx_plugin don't declare queue 📦
  • Loading branch information
heifner authored Aug 16, 2021
2 parents db07480 + dcb7eba commit c12d505
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 126 deletions.
269 changes: 159 additions & 110 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <amqpcpp.h>
#include <condition_variable>


namespace eosio {

/// Designed to work with internal io_service running on a dedicated thread.
Expand All @@ -24,61 +23,147 @@ class amqp_handler {
// called from amqp thread on consume of message
using on_consume_t = std::function<void(const AMQP::Message& message, delivery_tag_t delivery_tag, bool redelivered)>;

/// Blocks until connection successful or error
/// @param address AMQP address
/// @param name AMQP routing key
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp_handler( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp_handler( address, std::move(name), "", "", std::move(on_err), std::move(on_consume))
{}
amqp_handler( const std::string& address, on_error_t on_err )
: first_connect_()
, thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread
, amqp_connection_( thread_pool_.get_executor(), address,
[this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} )
, on_error_( std::move( on_err ) )
{
ilog( "Connecting to AMQP address ${a} ...", ("a", amqp_connection_.address()) );

/// @param address AMQP address
/// @param exchange_name AMQP exchange to send message to
/// @param exchange_type AMQP exchange type
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp_handler( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp_handler( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume))
{}
wait();
}

/// Declare a durable exchange, blocks until successful or error
/// @param exchange_name AMQP exchange to declare
/// @param exchange_type AMQP exchange type, empty (direct), "fanout", or "direct" anything else calls on_err
void declare_exchange(std::string exchange_name, const std::string& exchange_type) {
auto type = AMQP::direct;
if( !exchange_type.empty() ) {
if (exchange_type == "fanout") {
type = AMQP::fanout;
} else if (exchange_type != "direct") {
on_error( "AMQP unsupported exchange type: " + exchange_type );
return;
}
}

start_cond cond;
boost::asio::post( thread_pool_.get_executor(),[this, &cond, en=exchange_name, type]() {
try {
if( !channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", amqp_connection_.address()) );
on_error( "AMQP not connected to channel" );
return;
}

auto& exchange = channel_->declareExchange( en, type, AMQP::durable);
exchange.onSuccess( [this, &cond, en]() {
dlog( "AMQP declare exchange successful, exchange ${e}, for ${a}",
("e", en)("a", amqp_connection_.address()) );
cond.set();
} );
exchange.onError([this, &cond, en](const char* error_message) {
elog( "AMQP unable to declare exchange ${e}, for ${a}", ("e", en)("a", amqp_connection_.address()) );
on_error( std::string("AMQP Queue error: ") + error_message );
cond.set();
});
return;
} FC_LOG_AND_DROP()
cond.set();
} );

if( !cond.wait() ) {
elog( "AMQP timeout declaring exchange: ${q} for ${a}", ("q", exchange_name)("a", amqp_connection_.address()) );
on_error( "AMQP timeout declaring exchange: " + exchange_name );
}
}

/// Declare a durable queue, blocks until successful or error
/// @param queue_name AMQP queue name to declare
void declare_queue(std::string queue_name) {
start_cond cond;
boost::asio::post( thread_pool_.get_executor(), [this, &cond, qn=queue_name]() mutable {
try {
if( !channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", amqp_connection_.address()) );
on_error( "AMQP not connected to channel" );
return;
}

auto& queue = channel_->declareQueue( qn, AMQP::durable );
queue.onSuccess(
[this, &cond]( const std::string& name, uint32_t message_count, uint32_t consumer_count ) {
dlog( "AMQP queue ${q}, messages: ${mc}, consumers: ${cc}, for ${a}",
("q", name)("mc", message_count)("cc", consumer_count)("a", amqp_connection_.address()) );
cond.set();
} );
queue.onError( [this, &cond, qn]( const char* error_message ) {
elog( "AMQP error declaring queue ${q} for ${a}", ("q", qn)("a", amqp_connection_.address()) );
on_error( error_message );
cond.set();
} );
return;
} FC_LOG_AND_DROP()
cond.set();
} );

if( !cond.wait() ) {
elog( "AMQP timeout declaring queue: ${q} for ${a}", ("q", queue_name)("a", amqp_connection_.address()) );
on_error( "AMQP timeout declaring queue: " + queue_name );
}
}

/// drain queue and stop thread
~amqp_handler() {
stop();
}

/// publish to AMQP address with routing key name
// on_error() called if not connected
void publish( std::string exchange, std::string correlation_id, std::string reply_to, std::vector<char> buf ) {
/// publish to AMQP, on_error() called if not connected
void publish( std::string exchange, std::string queue_name, std::string correlation_id, std::string reply_to,
std::vector<char> buf ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
buf=std::move(buf)]() mutable {
[my=this, exchange=std::move(exchange), qn=std::move(queue_name),
cid=std::move(correlation_id), rt=std::move(reply_to), buf=std::move(buf)]() mutable {
try {
if( !my->channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", my->amqp_connection_.address()) );
my->on_error( "AMQP not connected to channel" );
return;
}

AMQP::Envelope env( buf.data(), buf.size() );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move( rt ) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
my->on_error( "AMQP Unable to publish: " + cid );
my->channel_->publish( exchange, qn, env, 0 );
} FC_LOG_AND_DROP()
} );
}

/// publish to AMQP calling f() -> std::vector<char> on amqp thread
// on_error() called if not connected
template<typename Func>
void publish( std::string exchange, std::string correlation_id, std::string reply_to, Func f ) {
void publish( std::string exchange, std::string queue_name, std::string correlation_id, std::string reply_to, Func f ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
f=std::move(f)]() mutable {
[my=this, exchange=std::move(exchange), qn=std::move(queue_name),
cid=std::move(correlation_id), rt=std::move(reply_to), f=std::move(f)]() mutable {
try {
if( !my->channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", my->amqp_connection_.address()) );
my->on_error( "AMQP not connected to channel" );
return;
}

std::vector<char> buf = f();
AMQP::Envelope env( buf.data(), buf.size() );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move(rt) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
my->on_error( "AMQP Unable to publish: " + cid );
my->channel_->publish( exchange, qn, env, 0 );
} FC_LOG_AND_DROP()
} );
}

Expand Down Expand Up @@ -141,110 +226,77 @@ class amqp_handler {
}
}

void start_consume(bool recover=true) {
boost::asio::post( thread_pool_.get_executor(), [&]() {
consuming_ = true;
init_consume(recover);
} );
/// Start consuming from provided queue, does not block.
/// Only expected to be called once as changing queue is not supported, will call on_err if called more than once
/// @param queue_name name of queue to consume from, must pre-exist
/// @param on_consume callback for consume on routing key name, called from amqp thread.
/// user required to ack/reject delivery_tag for each callback.
/// @param recover if true recover all messages that were not yet acked
// asks the server to redeliver all unacknowledged messages on the channel
// zero or more messages may be redelivered
void start_consume(std::string queue_name, on_consume_t on_consume, bool recover) {
boost::asio::post( thread_pool_.get_executor(),
[this, qn{std::move(queue_name)}, on_consume{std::move(on_consume)}, recover]() mutable {
try {
if( on_consume_ ) {
on_error("AMQP already consuming from: " + queue_name_ + ", unable to consume from: " + qn);
return;
} else if( !on_consume ) {
on_error("AMQP nullptr passed for on_consume callback");
return;
}
queue_name_ = std::move(qn);
on_consume_ = std::move(on_consume);
init_consume(recover);
} FC_LOG_AND_DROP()
} );
}
private:

amqp_handler(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type,
on_error_t on_err, on_consume_t on_consume)
: first_connect_()
, thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread
, amqp_connection_( thread_pool_.get_executor(), address, [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} )
, name_( std::move( name ) )
, exchange_name_( std::move( exchange_name ) )
, exchange_type_( std::move( exchange_type ) )
, on_error_( std::move( on_err ) )
, on_consume_( std::move( on_consume ) )
{
AMQP::Address amqp_address( address );
ilog( "Connecting to AMQP address ${a} - Queue: ${q}...", ("a", amqp_address)("q", name_) );

wait();
}
private:

// called from non-amqp thread
void wait() {
if( !first_connect_.wait() ) {
boost::asio::post( thread_pool_.get_executor(), [this](){
on_error( "AMQP timeout connecting and declaring queue" );
} );
elog( "AMQP timeout connecting to: ${a}", ("a", amqp_connection_.address()) );
on_error( "AMQP timeout connecting" );
}
}

// called from amqp thread
void channel_ready(AMQP::Channel* c) {
dlog( "AMQP Channel ready: ${id}", ("id", c ? c->id() : 0) );
ilog( "AMQP Channel ready: ${id}, for ${a}", ("id", c ? c->id() : 0)("a", amqp_connection_.address()) );
channel_ = c;
init();
init_consume(true);
first_connect_.set();
}

// called from amqp thread
void channel_failed() {
wlog( "AMQP connection failed." );
elog( "AMQP connection failed to: ${a}", ("a", amqp_connection_.address()) );
channel_ = nullptr;
}

// called from amqp thread
void init() {
if(!channel_) return;
if( !exchange_type_.empty() ) {
auto type = AMQP::direct;
if (exchange_type_ == "fanout") {
type = AMQP::fanout;
} else if (exchange_type_ != "direct") {
on_error( "Unsupported exchange type: " + exchange_type_ );
first_connect_.set();
return;
}

auto& exchange = channel_->declareExchange( exchange_name_, type, AMQP::durable);
exchange.onSuccess( [this]() {
dlog( "AMQP declare exchange Successfully! Exchange ${e}", ("e", exchange_name_) );
init_consume(true);
first_connect_.set();
} );
exchange.onError([this](const char* error_message) {
on_error( std::string("AMQP Queue error: ") + error_message );
first_connect_.set();
});
} else {
auto& queue = channel_->declareQueue( name_, AMQP::durable );
queue.onSuccess( [&]( const std::string& name, uint32_t messagecount, uint32_t consumercount ) {
dlog( "AMQP Connected Successfully! Queue ${q} - Messages: ${mc} - Consumers: ${cc}",
("q", name)( "mc", messagecount )( "cc", consumercount ) );
init_consume(true);
first_connect_.set();
} );
queue.onError( [&]( const char* error_message ) {
on_error( error_message );
first_connect_.set();
} );
}
}

// called from amqp thread
void init_consume(bool recover) {
if( channel_ && on_consume_ && consuming_ ) {
if( channel_ && on_consume_ ) {
if (recover) {
channel_->recover(AMQP::requeue)
.onSuccess([&](){dlog("successfully started channel recovery");})
.onError([&](const char* message){wlog("channel recovery failed ${e}", ("e", message));});
.onSuccess( [&]() { dlog( "successfully started channel recovery" ); } )
.onError( [&]( const char* message ) {
elog( "channel recovery failed ${e}", ("e", message) );
on_error( "AMQP channel recovery failed" );
} );
}

auto &consumer = channel_->consume(name_);
auto &consumer = channel_->consume(queue_name_);
consumer.onSuccess([&](const std::string &consumer_tag) {
dlog("consume started: ${tag}", ("tag", consumer_tag));
ilog("consume started: ${tag}, for ${a}", ("tag", consumer_tag)("a", amqp_connection_.address()));
});
consumer.onError([&](const char *message) {
wlog("consume failed: ${e}", ("e", message));
elog("consume failed: ${e}, for ${a}", ("e", message)("a", amqp_connection_.address()));
on_error(message);
});
static_assert(std::is_same_v<on_consume_t, AMQP::MessageCallback>,
"AMQP::MessageCallback interface changed");
static_assert(std::is_same_v<on_consume_t, AMQP::MessageCallback>, "AMQP::MessageCallback interface changed");
consumer.onReceived(on_consume_);
}
}
Expand Down Expand Up @@ -286,12 +338,9 @@ class amqp_handler {
eosio::chain::named_thread_pool thread_pool_;
single_channel_retrying_amqp_connection amqp_connection_;
AMQP::Channel* channel_ = nullptr; // null when not connected
std::string name_;
std::string exchange_name_;
std::string exchange_type_;
on_error_t on_error_;
on_consume_t on_consume_;
bool consuming_ = false;
std::string queue_name_;
};

} // namespace eosio
Loading

0 comments on commit c12d505

Please sign in to comment.