Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10615 from EOSIO/amqp-consume-dev-boxed
Browse files Browse the repository at this point in the history
Start consuming again when connection reestablished to AMQP 📦
  • Loading branch information
heifner authored Aug 12, 2021
2 parents 805c9f3 + 472c937 commit ff6516d
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions libraries/amqp/include/eosio/amqp/amqp_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,8 @@ class amqp_handler {

void start_consume(bool recover=true) {
boost::asio::post( thread_pool_.get_executor(), [&]() {
if( channel_ && on_consume_) {
if (recover) {
channel_->recover(AMQP::requeue)
.onSuccess([&](){dlog("successfully started channel recovery");})
.onError([&](const char* message){wlog("channel recovery failed ${e}", ("e", message));});
}

auto &consumer = channel_->consume(name_);
consumer.onSuccess([&](const std::string &consumer_tag) {
dlog("consume started: ${tag}", ("tag", consumer_tag));
});
consumer.onError([&](const char *message) {
wlog("consume failed: ${e}", ("e", message));
on_error(message);
});
static_assert(std::is_same_v<on_consume_t, AMQP::MessageCallback>,
"AMQP::MessageCallback interface changed");
consumer.onReceived(on_consume_);
}
consuming_ = true;
init_consume(recover);
} );
}
private:
Expand Down Expand Up @@ -194,12 +177,14 @@ class amqp_handler {

// called from amqp thread
void channel_ready(AMQP::Channel* c) {
dlog( "AMQP Channel ready: ${id}", ("id", c ? c->id() : 0) );
channel_ = c;
init();
}

// called from amqp thread
void channel_failed() {
wlog( "AMQP connection failed." );
channel_ = nullptr;
}

Expand All @@ -218,8 +203,9 @@ class amqp_handler {

auto& exchange = channel_->declareExchange( exchange_name_, type, AMQP::durable);
exchange.onSuccess( [this]() {
dlog( "AMQP declare exchange Successfully!\n Exchange ${e}", ("e", exchange_name_) );
first_connect_.set();
dlog( "AMQP declare exchange Successfully! Exchange ${e}", ("e", exchange_name_) );
init_consume(true);
first_connect_.set();
} );
exchange.onError([this](const char* error_message) {
on_error( std::string("AMQP Queue error: ") + error_message );
Expand All @@ -228,9 +214,10 @@ class amqp_handler {
} else {
auto& queue = channel_->declareQueue( name_, AMQP::durable );
queue.onSuccess( [&]( const std::string& name, uint32_t messagecount, uint32_t consumercount ) {
dlog( "AMQP Connected Successfully!\n Queue ${q} - Messages: ${mc} - Consumers: ${cc}",
dlog( "AMQP Connected Successfully! Queue ${q} - Messages: ${mc} - Consumers: ${cc}",
("q", name)( "mc", messagecount )( "cc", consumercount ) );
first_connect_.set();
init_consume(true);
first_connect_.set();
} );
queue.onError( [&]( const char* error_message ) {
on_error( error_message );
Expand All @@ -239,6 +226,29 @@ class amqp_handler {
}
}

// called from amqp thread
void init_consume(bool recover) {
if( channel_ && on_consume_ && consuming_ ) {
if (recover) {
channel_->recover(AMQP::requeue)
.onSuccess([&](){dlog("successfully started channel recovery");})
.onError([&](const char* message){wlog("channel recovery failed ${e}", ("e", message));});
}

auto &consumer = channel_->consume(name_);
consumer.onSuccess([&](const std::string &consumer_tag) {
dlog("consume started: ${tag}", ("tag", consumer_tag));
});
consumer.onError([&](const char *message) {
wlog("consume failed: ${e}", ("e", message));
on_error(message);
});
static_assert(std::is_same_v<on_consume_t, AMQP::MessageCallback>,
"AMQP::MessageCallback interface changed");
consumer.onReceived(on_consume_);
}
}

// called from amqp thread
void on_error( const std::string& message ) {
if( on_error_ ) on_error_( message );
Expand Down Expand Up @@ -281,6 +291,7 @@ class amqp_handler {
std::string exchange_type_;
on_error_t on_error_;
on_consume_t on_consume_;
bool consuming_ = false;
};

} // namespace eosio

0 comments on commit ff6516d

Please sign in to comment.