Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

retrying_amqp_connection: commonize more amqp connection management stuff #9294

Merged
merged 9 commits into from
Jul 17, 2020
1 change: 1 addition & 0 deletions libraries/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_subdirectory( abieos )
add_subdirectory( rocksdb EXCLUDE_FROM_ALL )
add_subdirectory( chain_kv )
add_subdirectory( se-helpers )
add_subdirectory( retrying_amqp_connection )
add_subdirectory( reliable_amqp_publisher )

set(USE_EXISTING_SOFTFLOAT ON CACHE BOOL "use pre-exisiting softfloat lib")
Expand Down
1 change: 1 addition & 0 deletions libraries/fc/include/fc/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class fwd {
template<typename U> fwd( U&& u );
template<typename U, typename V> fwd( U&& u, V&& v );
template<typename U, typename V, typename X, typename Y> fwd( U&& u, V&& v, X&&, Y&& );
template<typename U, typename V, typename X, typename Y, typename Z> fwd( U&& u, V&& v, X&&, Y&&, Z&& );
swatanabe-b1 marked this conversation as resolved.
Show resolved Hide resolved
fwd();

fwd( const fwd& f );
Expand Down
7 changes: 6 additions & 1 deletion libraries/fc/include/fc/fwd_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ namespace fc {
check_size<sizeof(T),sizeof(_store)>();
new (this) T( fc::forward<U>(u), fc::forward<V>(v), fc::forward<X>(x), fc::forward<Y>(y) );
}

template<typename T,unsigned int S,typename A>
template<typename U,typename V,typename X,typename Y,typename Z>
fwd<T,S,A>::fwd( U&& u, V&& v, X&& x, Y&& y, Z&& z ) {
check_size<sizeof(T),sizeof(_store)>();
new (this) T( fc::forward<U>(u), fc::forward<V>(v), fc::forward<X>(x), fc::forward<Y>(y), fc::forward<Z>(z) );
}

template<typename T,unsigned int S,typename A>
fwd<T,S,A>::fwd() {
Expand Down
3 changes: 3 additions & 0 deletions libraries/retrying_amqp_connection/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_library(retrying_amqp_connection retrying_amqp_connection.cpp)
target_include_directories(retrying_amqp_connection PUBLIC include)
target_link_libraries(retrying_amqp_connection fc amqpcpp)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include <amqpcpp.h>

#include <boost/asio.hpp>

#include <fc/log/logger.hpp>
#include <fc/exception/exception.hpp>

/*
* A retrying_amqp_connection manages a connection to an AMQP server that will retry the connection
* on failure. Most users should consider single_channel_retrying_amqp_connection instead, which additionally
* manages a single channel.
*/

namespace eosio {
struct retrying_amqp_connection {
using connection_ready_callback_t = std::function<void(AMQP::Connection*)>;
using connection_failed_callback_t = std::function<void()>;

/// \param io_context a strand is created on this io_context for all asio operatoins
/// \param address AMQP address to connect to
/// \param ready a callback when the AMQP connection has been established
/// \param failed a callback when the AMQP connection has failed after being established; should no longer use the AMQP::Connection* after this callback
/// \param logger logger to send logging to
retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready,
connection_failed_callback_t failed, fc::logger logger = fc::logger::get());

const AMQP::Address& address() const;

boost::asio::io_context::strand& strand();

~retrying_amqp_connection();

private:
struct impl;
constexpr static size_t fwd_size = 66424;
fc::fwd<impl,fwd_size> my;
heifner marked this conversation as resolved.
Show resolved Hide resolved
};

struct single_channel_retrying_amqp_connection {
using channel_ready_callback_t = std::function<void(AMQP::Channel*)>;
using failed_callback_t = std::function<void()>;

/// \param io_context a strand is created on this io_context for all asio operatoins
/// \param address AMQP address to connect to
/// \param ready a callback when the AMQP channel has been established, do NOT set the .onError() for the passed AMQP::Channel
/// \param failed a callback when the AMQP channel has failed after being established; should no longer use the AMQP::Channel* within or after this callback
/// \param logger logger to send logging to
single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready,
failed_callback_t failed, fc::logger logger = fc::logger::get());

const AMQP::Address& address() const;

~single_channel_retrying_amqp_connection();

private:
struct impl;
constexpr static size_t fwd_size = 66608;
fc::fwd<impl,fwd_size> my;
};

}

namespace fc {
void to_variant(const AMQP::Address& a, fc::variant& v);
}
278 changes: 278 additions & 0 deletions libraries/retrying_amqp_connection/retrying_amqp_connection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
#include <amqpcpp.h>
#include <fc/variant.hpp>
#include <fc/fwd_impl.hpp>

#include <eosio/retrying_amqp_connection/retrying_amqp_connection.hpp>

namespace eosio {

struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler {
impl(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready,
connection_failed_callback_t failed, fc::logger logger = fc::logger::get()) :
_strand(io_context), _resolver(_strand), _sock(_strand), _timer(_strand), _address(address),
_ready_callback(ready), _failed_callback(failed), _logger(logger) {

FC_ASSERT(!_address.secure(), "Only amqp:// URIs are supported for AMQP addresses (${a})", ("a", _address));
start_connection();
}

void onReady(AMQP::Connection* connection) override {
fc_ilog(_logger, "AMQP connection to ${s} is fully operational", ("s", _address));

_ready_callback(connection);
_indicated_ready = true;
}

void onData(AMQP::Connection* connection, const char* data, size_t size) override {
if(!_sock.is_open())
return;
_state->outgoing_queue.emplace_back(data, data+size);
send_some();
}

void onError(AMQP::Connection* connection, const char* message) override {
fc_elog(_logger, "AMQP connection to ${s} suffered an error; will retry shortly: ${m}", ("s", _address)("m", message));
schedule_retry();
}

void onClosed(AMQP::Connection *connection) override {
fc_wlog(_logger, "AMQP connection to ${s} closed AMQP connection", ("s", _address));
schedule_retry();
}

void start_connection() {
_resolver.async_resolve(_address.hostname(), std::to_string(_address.port()), boost::asio::bind_executor(_strand, [this](const auto ec, const auto endpoints) {
if(ec) {
if(ec != boost::asio::error::operation_aborted) {
fc_wlog(_logger, "Failed resolving AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message()));
schedule_retry();
}
return;
}
//AMQP::Connection's dtor will attempt to send a last gasp message. Resetting state here is a little easier to prove
// as being safe as it requires pumping the event loop once vs placing the state reset directly in schedule_retry()
_state.emplace();
boost::asio::async_connect(_sock, endpoints, boost::asio::bind_executor(_strand, [this](const auto ec, const auto endpoint) {
if(ec) {
if(ec != boost::asio::error::operation_aborted) {
fc_wlog(_logger, "Failed connecting AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message()));
schedule_retry();
}
return;
}
fc_ilog(_logger, "TCP connection to AMQP server at ${s} is up", ("s", _address));
receive_some();
_state->amqp_connection.emplace(this, _address.login(), _address.vhost());
}));
}));
}

void schedule_retry() {
//in some cases, such as an async_read & async_write both outstanding at the same time during socket failure,
// schedule_retry() can be called multiple times in quick succession. nominally this causes an already closed _sock
// to be closed(), and cancels the pending 1 second timer when restarting the 1 second timer. In theory though, if thread
// timing is particularly slow, the one second timer may have already expired (and the callback can no longer be cancelled)
// which could potentially queue up two start_connection()s.
//Bail out early if a pending timer is already running and the callback hasn't been called.
if(_retry_scheduled)
return;

_sock.close();
_resolver.cancel();

//calling the failure callback will likely cause downstream users to take action such as closing an AMQP::Channel which
// will attempt to send data. Ensure that _sock is closed before then so onData() will drop those attempts
if(_indicated_ready)
_failed_callback();
_indicated_ready = false;

boost::system::error_code ec;
_timer.expires_from_now(std::chrono::seconds(1), ec);
if(ec)
return;
_retry_scheduled = true;
_timer.async_wait(boost::asio::bind_executor(_strand, [this](const auto ec) {
_retry_scheduled = false;
if(ec)
return;
start_connection();
}));
}

void send_some() {
if(_state->send_outstanding || _state->outgoing_queue.empty())
return;
_state->send_outstanding = true;
boost::asio::async_write(_sock, boost::asio::buffer(_state->outgoing_queue.front()), boost::asio::bind_executor(_strand, [this](const auto& ec, size_t wrote) {
if(ec) {
if(ec != boost::asio::error::operation_aborted) {
fc_wlog(_logger, "Failed writing to AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message()));
schedule_retry();
}
return;
}
_state->outgoing_queue.pop_front();
_state->send_outstanding = false;
send_some();
}));
}

void receive_some() {
_sock.async_read_some(boost::asio::buffer(_read_buff), boost::asio::bind_executor(_strand, [this](const auto& ec, size_t sz) {
if(ec) {
if(ec != boost::asio::error::operation_aborted) {
fc_wlog(_logger, "Failed reading from AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message()));
schedule_retry();
}
return;
}
_state->read_queue.insert(_state->read_queue.end(), _read_buff, _read_buff + sz);
auto used = _state->amqp_connection->parse(_state->read_queue.data(), _state->read_queue.size());
_state->read_queue.erase(_state->read_queue.begin(), _state->read_queue.begin()+used);

//parse() could have resulted in an error on an AMQP channel or on the AMQP connection (causing a onError() or
// onClosed() to be called). An error on an AMQP channel is outside the scope of retrying_amqp_connection, but an
// onError() or onClosed() would call schedule_retry() and thus _sock.close(). Check that the socket is still open before
// looping back around for another async_read
if(_sock.is_open())
receive_some();
}));
}

char _read_buff[64*1024];

boost::asio::io_context::strand _strand;

boost::asio::ip::tcp::resolver _resolver;
boost::asio::ip::tcp::socket _sock;
boost::asio::steady_timer _timer;

AMQP::Address _address;

connection_ready_callback_t _ready_callback;
connection_failed_callback_t _failed_callback;
bool _indicated_ready = false;
bool _retry_scheduled = false;

fc::logger _logger;

struct state {
state() {}

std::deque<std::vector<char>> outgoing_queue;
bool send_outstanding = false;

std::vector<char> read_queue;

std::optional<AMQP::Connection> amqp_connection;
};
std::optional<state> _state;
//be aware that AMQP::Connection's dtor will attempt to send a last gasp message on dtor. This means _state's
// destruction will cause onData() to be called when _state's amqp_connection dtor is fired. So be mindful of member
// dtor ordering here as _state & _sock will be accessed during dtor
};


struct single_channel_retrying_amqp_connection::impl {
using channel_ready_callback_t = single_channel_retrying_amqp_connection::channel_ready_callback_t;
using failed_callback_t = single_channel_retrying_amqp_connection::failed_callback_t;

impl(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready,
failed_callback_t failed, fc::logger logger) :
_connection(io_context, address, [this](AMQP::Connection* c){conn_ready(c);},[this](){conn_failed();}, logger),
_timer(_connection.strand()), _channel_ready(ready), _failed(failed)
{}

void conn_ready(AMQP::Connection* c) {
_amqp_connection = c;
bring_up_channel();
}

void start_retry() {
boost::system::error_code ec;
_timer.expires_from_now(std::chrono::seconds(1), ec);
if(ec)
return;
_timer.async_wait(boost::asio::bind_executor(_connection.strand(), [this](const auto ec) {
if(ec)
return;
bring_up_channel();
}));
}

void bring_up_channel() {
try {
_amqp_channel.emplace(_amqp_connection);
}
catch(...) {
wlog("AMQP channel could not start for AMQP connection ${c}; retrying", ("c", _connection.address()));
start_retry();
}
_amqp_channel->onError([this](const char* e) {
wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e));
_failed();
start_retry();
});
_amqp_channel->onReady([this]() {
_channel_ready(&*_amqp_channel);
});
}

void conn_failed() {
_amqp_connection = nullptr;
_amqp_channel.reset();
boost::system::error_code ec;
_timer.cancel(ec);
_failed();
}

retrying_amqp_connection _connection;
boost::asio::steady_timer _timer;
std::optional<AMQP::Channel> _amqp_channel;
AMQP::Connection* _amqp_connection = nullptr;

channel_ready_callback_t _channel_ready;
failed_callback_t _failed;
};

retrying_amqp_connection::retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready,
connection_failed_callback_t failed, fc::logger logger) :
my(io_context, address, ready, failed, logger) {}


const AMQP::Address& retrying_amqp_connection::address() const {
return my->_address;
}

boost::asio::io_context::strand& retrying_amqp_connection::strand() {
return my->_strand;
}

retrying_amqp_connection::~retrying_amqp_connection() = default;

single_channel_retrying_amqp_connection::single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready,
failed_callback_t failed, fc::logger logger) :
my(io_context, address, ready, failed, logger) {}

const AMQP::Address& single_channel_retrying_amqp_connection::address() const {
return my->_connection.address();
}

single_channel_retrying_amqp_connection::~single_channel_retrying_amqp_connection() = default;

}

namespace fc {
void to_variant(const AMQP::Address& a, fc::variant& v) {
std::string str(a.secure() ? "amqps://" : "amqp://");
str.append(a.login().user()).append(":********").append("@");
str.append(a.hostname().empty() ? "localhost" : a.hostname());
if(a.port() != 5672)
str.append(":").append(std::to_string(a.port()));
str.append("/");
if (a.vhost() != "/")
str.append(a.vhost());

v = str;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be: v = std::move(str); as fc::variant takes string by value.

}
}