Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

amqp_trx_plugin don't declare queue 📦 #10618

Merged
merged 5 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 159 additions & 110 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <amqpcpp.h>
#include <condition_variable>


namespace eosio {

/// Designed to work with internal io_service running on a dedicated thread.
Expand All @@ -24,61 +23,147 @@ class amqp_handler {
// called from amqp thread on consume of message
using on_consume_t = std::function<void(const AMQP::Message& message, delivery_tag_t delivery_tag, bool redelivered)>;

/// Blocks until connection successful or error
/// @param address AMQP address
/// @param name AMQP routing key
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp_handler( const std::string& address, std::string name, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp_handler( address, std::move(name), "", "", std::move(on_err), std::move(on_consume))
{}
amqp_handler( const std::string& address, on_error_t on_err )
: first_connect_()
, thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread
, amqp_connection_( thread_pool_.get_executor(), address,
[this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} )
, on_error_( std::move( on_err ) )
{
ilog( "Connecting to AMQP address ${a} ...", ("a", amqp_connection_.address()) );

/// @param address AMQP address
/// @param exchange_name AMQP exchange to send message to
/// @param exchange_type AMQP exchange type
/// @param on_err callback for errors, called from amqp thread or caller thread, can be nullptr
/// @param on_consume callback for consume on routing key name, called from amqp thread, null if no consume needed.
/// user required to ack/reject delivery_tag for each callback.
amqp_handler( const std::string& address, std::string exchange_name, std::string exchange_type, on_error_t on_err, on_consume_t on_consume = nullptr )
: amqp_handler( address, "", std::move(exchange_name), std::move(exchange_type), std::move(on_err), std::move(on_consume))
{}
wait();
}

/// Declare a durable exchange, blocks until successful or error
/// @param exchange_name AMQP exchange to declare
/// @param exchange_type AMQP exchange type, empty (direct), "fanout", or "direct" anything else calls on_err
void declare_exchange(std::string exchange_name, const std::string& exchange_type) {
auto type = AMQP::direct;
if( !exchange_type.empty() ) {
if (exchange_type == "fanout") {
type = AMQP::fanout;
} else if (exchange_type != "direct") {
on_error( "AMQP unsupported exchange type: " + exchange_type );
return;
}
}

start_cond cond;
boost::asio::post( thread_pool_.get_executor(),[this, &cond, en=exchange_name, type]() {
try {
if( !channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", amqp_connection_.address()) );
on_error( "AMQP not connected to channel" );
return;
}

auto& exchange = channel_->declareExchange( en, type, AMQP::durable);
exchange.onSuccess( [this, &cond, en]() {
dlog( "AMQP declare exchange successful, exchange ${e}, for ${a}",
("e", en)("a", amqp_connection_.address()) );
cond.set();
} );
exchange.onError([this, &cond, en](const char* error_message) {
elog( "AMQP unable to declare exchange ${e}, for ${a}", ("e", en)("a", amqp_connection_.address()) );
on_error( std::string("AMQP Queue error: ") + error_message );
cond.set();
});
return;
} FC_LOG_AND_DROP()
cond.set();
} );

if( !cond.wait() ) {
elog( "AMQP timeout declaring exchange: ${q} for ${a}", ("q", exchange_name)("a", amqp_connection_.address()) );
on_error( "AMQP timeout declaring exchange: " + exchange_name );
}
}

/// Declare a durable queue, blocks until successful or error
/// @param queue_name AMQP queue name to declare
void declare_queue(std::string queue_name) {
start_cond cond;
boost::asio::post( thread_pool_.get_executor(), [this, &cond, qn=queue_name]() mutable {
try {
if( !channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", amqp_connection_.address()) );
on_error( "AMQP not connected to channel" );
return;
}

auto& queue = channel_->declareQueue( qn, AMQP::durable );
queue.onSuccess(
[this, &cond]( const std::string& name, uint32_t message_count, uint32_t consumer_count ) {
dlog( "AMQP queue ${q}, messages: ${mc}, consumers: ${cc}, for ${a}",
("q", name)("mc", message_count)("cc", consumer_count)("a", amqp_connection_.address()) );
cond.set();
} );
queue.onError( [this, &cond, qn]( const char* error_message ) {
elog( "AMQP error declaring queue ${q} for ${a}", ("q", qn)("a", amqp_connection_.address()) );
on_error( error_message );
cond.set();
} );
return;
} FC_LOG_AND_DROP()
cond.set();
} );

if( !cond.wait() ) {
elog( "AMQP timeout declaring queue: ${q} for ${a}", ("q", queue_name)("a", amqp_connection_.address()) );
on_error( "AMQP timeout declaring queue: " + queue_name );
}
}

/// drain queue and stop thread
~amqp_handler() {
stop();
}

/// publish to AMQP address with routing key name
// on_error() called if not connected
void publish( std::string exchange, std::string correlation_id, std::string reply_to, std::vector<char> buf ) {
/// publish to AMQP, on_error() called if not connected
void publish( std::string exchange, std::string queue_name, std::string correlation_id, std::string reply_to,
std::vector<char> buf ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
buf=std::move(buf)]() mutable {
[my=this, exchange=std::move(exchange), qn=std::move(queue_name),
cid=std::move(correlation_id), rt=std::move(reply_to), buf=std::move(buf)]() mutable {
try {
if( !my->channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", my->amqp_connection_.address()) );
my->on_error( "AMQP not connected to channel" );
return;
}

AMQP::Envelope env( buf.data(), buf.size() );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move( rt ) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
my->on_error( "AMQP Unable to publish: " + cid );
my->channel_->publish( exchange, qn, env, 0 );
} FC_LOG_AND_DROP()
} );
}

/// publish to AMQP calling f() -> std::vector<char> on amqp thread
// on_error() called if not connected
template<typename Func>
void publish( std::string exchange, std::string correlation_id, std::string reply_to, Func f ) {
void publish( std::string exchange, std::string queue_name, std::string correlation_id, std::string reply_to, Func f ) {
boost::asio::post( thread_pool_.get_executor(),
[my=this, exchange=std::move(exchange), cid=std::move(correlation_id), rt=std::move(reply_to),
f=std::move(f)]() mutable {
[my=this, exchange=std::move(exchange), qn=std::move(queue_name),
cid=std::move(correlation_id), rt=std::move(reply_to), f=std::move(f)]() mutable {
try {
if( !my->channel_ ) {
elog( "AMQP not connected to channel ${a}", ("a", my->amqp_connection_.address()) );
my->on_error( "AMQP not connected to channel" );
return;
}

std::vector<char> buf = f();
AMQP::Envelope env( buf.data(), buf.size() );
if(!cid.empty()) env.setCorrelationID( std::move( cid ) );
if(!rt.empty()) env.setReplyTo( std::move(rt) );
if( my->channel_ )
my->channel_->publish( exchange, my->name_, env, 0 );
else
my->on_error( "AMQP Unable to publish: " + cid );
my->channel_->publish( exchange, qn, env, 0 );
} FC_LOG_AND_DROP()
} );
}

Expand Down Expand Up @@ -141,110 +226,77 @@ class amqp_handler {
}
}

void start_consume(bool recover=true) {
boost::asio::post( thread_pool_.get_executor(), [&]() {
consuming_ = true;
init_consume(recover);
} );
/// Start consuming from provided queue, does not block.
/// Only expected to be called once as changing queue is not supported, will call on_err if called more than once
/// @param queue_name name of queue to consume from, must pre-exist
/// @param on_consume callback for consume on routing key name, called from amqp thread.
/// user required to ack/reject delivery_tag for each callback.
/// @param recover if true recover all messages that were not yet acked
// asks the server to redeliver all unacknowledged messages on the channel
// zero or more messages may be redelivered
void start_consume(std::string queue_name, on_consume_t on_consume, bool recover) {
boost::asio::post( thread_pool_.get_executor(),
[this, qn{std::move(queue_name)}, on_consume{std::move(on_consume)}, recover]() mutable {
try {
if( on_consume_ ) {
on_error("AMQP already consuming from: " + queue_name_ + ", unable to consume from: " + qn);
return;
} else if( !on_consume ) {
on_error("AMQP nullptr passed for on_consume callback");
return;
}
queue_name_ = std::move(qn);
on_consume_ = std::move(on_consume);
init_consume(recover);
} FC_LOG_AND_DROP()
} );
}
private:

amqp_handler(const std::string& address, std::string name, std::string exchange_name, std::string exchange_type,
on_error_t on_err, on_consume_t on_consume)
: first_connect_()
, thread_pool_( "ampq", 1 ) // amqp is not thread safe, use only one thread
, amqp_connection_( thread_pool_.get_executor(), address, [this](AMQP::Channel* c){channel_ready(c);}, [this](){channel_failed();} )
, name_( std::move( name ) )
, exchange_name_( std::move( exchange_name ) )
, exchange_type_( std::move( exchange_type ) )
, on_error_( std::move( on_err ) )
, on_consume_( std::move( on_consume ) )
{
AMQP::Address amqp_address( address );
ilog( "Connecting to AMQP address ${a} - Queue: ${q}...", ("a", amqp_address)("q", name_) );

wait();
}
private:

// called from non-amqp thread
void wait() {
if( !first_connect_.wait() ) {
boost::asio::post( thread_pool_.get_executor(), [this](){
on_error( "AMQP timeout connecting and declaring queue" );
} );
elog( "AMQP timeout connecting to: ${a}", ("a", amqp_connection_.address()) );
on_error( "AMQP timeout connecting" );
}
}

// called from amqp thread
void channel_ready(AMQP::Channel* c) {
dlog( "AMQP Channel ready: ${id}", ("id", c ? c->id() : 0) );
ilog( "AMQP Channel ready: ${id}, for ${a}", ("id", c ? c->id() : 0)("a", amqp_connection_.address()) );
channel_ = c;
init();
init_consume(true);
first_connect_.set();
}

// called from amqp thread
void channel_failed() {
wlog( "AMQP connection failed." );
elog( "AMQP connection failed to: ${a}", ("a", amqp_connection_.address()) );
channel_ = nullptr;
}

// called from amqp thread
void init() {
if(!channel_) return;
if( !exchange_type_.empty() ) {
auto type = AMQP::direct;
if (exchange_type_ == "fanout") {
type = AMQP::fanout;
} else if (exchange_type_ != "direct") {
on_error( "Unsupported exchange type: " + exchange_type_ );
first_connect_.set();
return;
}

auto& exchange = channel_->declareExchange( exchange_name_, type, AMQP::durable);
exchange.onSuccess( [this]() {
dlog( "AMQP declare exchange Successfully! Exchange ${e}", ("e", exchange_name_) );
init_consume(true);
first_connect_.set();
} );
exchange.onError([this](const char* error_message) {
on_error( std::string("AMQP Queue error: ") + error_message );
first_connect_.set();
});
} else {
auto& queue = channel_->declareQueue( name_, AMQP::durable );
queue.onSuccess( [&]( const std::string& name, uint32_t messagecount, uint32_t consumercount ) {
dlog( "AMQP Connected Successfully! Queue ${q} - Messages: ${mc} - Consumers: ${cc}",
("q", name)( "mc", messagecount )( "cc", consumercount ) );
init_consume(true);
first_connect_.set();
} );
queue.onError( [&]( const char* error_message ) {
on_error( error_message );
first_connect_.set();
} );
}
}

// called from amqp thread
void init_consume(bool recover) {
if( channel_ && on_consume_ && consuming_ ) {
if( channel_ && on_consume_ ) {
if (recover) {
channel_->recover(AMQP::requeue)
.onSuccess([&](){dlog("successfully started channel recovery");})
.onError([&](const char* message){wlog("channel recovery failed ${e}", ("e", message));});
.onSuccess( [&]() { dlog( "successfully started channel recovery" ); } )
.onError( [&]( const char* message ) {
elog( "channel recovery failed ${e}", ("e", message) );
on_error( "AMQP channel recovery failed" );
} );
}

auto &consumer = channel_->consume(name_);
auto &consumer = channel_->consume(queue_name_);
consumer.onSuccess([&](const std::string &consumer_tag) {
dlog("consume started: ${tag}", ("tag", consumer_tag));
ilog("consume started: ${tag}, for ${a}", ("tag", consumer_tag)("a", amqp_connection_.address()));
});
consumer.onError([&](const char *message) {
wlog("consume failed: ${e}", ("e", message));
elog("consume failed: ${e}, for ${a}", ("e", message)("a", amqp_connection_.address()));
on_error(message);
});
static_assert(std::is_same_v<on_consume_t, AMQP::MessageCallback>,
"AMQP::MessageCallback interface changed");
static_assert(std::is_same_v<on_consume_t, AMQP::MessageCallback>, "AMQP::MessageCallback interface changed");
consumer.onReceived(on_consume_);
}
}
Expand Down Expand Up @@ -286,12 +338,9 @@ class amqp_handler {
eosio::chain::named_thread_pool thread_pool_;
single_channel_retrying_amqp_connection amqp_connection_;
AMQP::Channel* channel_ = nullptr; // null when not connected
std::string name_;
std::string exchange_name_;
std::string exchange_type_;
on_error_t on_error_;
on_consume_t on_consume_;
bool consuming_ = false;
std::string queue_name_;
};

} // namespace eosio
Loading