Skip to content

Commit

Permalink
Better handling of Lapin connection issues and error delivered to the…
Browse files Browse the repository at this point in the history
… waiter correctly
  • Loading branch information
Emulator000 committed Oct 13, 2020
1 parent 9236cc0 commit 7ce4b9b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rabbitmq-consumer"
version = "1.0.2"
version = "1.0.3"
authors = ["Dario Cancelliere <dario.cancelliere@facile.it>"]
edition = "2018"

Expand Down
6 changes: 6 additions & 0 deletions src/client/consumer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ impl Connection {
}

pub async fn get_connection(&mut self) -> Result<Arc<LapinConnection>, ConnectionError> {
if let Ok(ref lapin) = self.lapin {
if lapin.status().errored() {
self.lapin = Err(ConnectionError::NotConnected);
}
}

if self.lapin.is_err() {
logger::log(&format!(
"Connecting to RabbitMQ at {}:{}...",
Expand Down
8 changes: 1 addition & 7 deletions src/client/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,7 @@ impl Consumer {

res
}
Err(e) => {
for hook in &self.hooks {
hook.write().await.on_error(&format!("{:?}", e));
}

Err(ConsumerError::ConnectionError(e))
}
Err(e) => Err(ConsumerError::ConnectionError(e)),
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/client/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod waiter;
use async_std::sync::{Arc, RwLock};

use crate::client::consumer::{Consumer, ConsumerError, ConsumerResult};
use crate::client::executor::events::EventsHandler;
use crate::client::executor::events::{Events, EventsHandler};
use crate::client::executor::waiter::Waiter;
use crate::config::Config;

Expand Down Expand Up @@ -41,7 +41,9 @@ impl Executor {
Ok(ConsumerResult::GenericOk) => ExecutorResult::Exit,
Ok(ConsumerResult::Killed) => ExecutorResult::Killed,
Err(e) => {
let waiter = self.waiter.read().await;
let mut waiter = self.waiter.write().await;
waiter.on_error(&format!("{:?}", e));

if waiter.is_to_close() {
return ExecutorResult::Error(e);
}
Expand Down

0 comments on commit 7ce4b9b

Please sign in to comment.