From c592ab2c734b0a52ef2d01a9dfde136a01cb5579 Mon Sep 17 00:00:00 2001 From: Emulator000 Date: Mon, 5 Oct 2020 01:10:08 +0000 Subject: [PATCH] Applied Cargo clippy and Cargo fmt --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/client/consumer/channel.rs | 6 +++--- src/client/consumer/message.rs | 23 +++++++++++------------ src/client/consumer/mod.rs | 22 ++++++++++++---------- src/client/executor/waiter.rs | 12 +++--------- src/config/database/mod.rs | 10 ++++++---- src/config/mod.rs | 12 ++++++------ 8 files changed, 43 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 483a64d..e6d0bc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1033,7 +1033,7 @@ dependencies = [ [[package]] name = "rabbitmq-consumer" -version = "1.0.0" +version = "1.0.1" dependencies = [ "async-std 1.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 8820ea6..86938a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rabbitmq-consumer" - version = "1.0.0" + version = "1.0.1" authors = ["Dario Cancelliere "] edition = "2018" diff --git a/src/client/consumer/channel.rs b/src/client/consumer/channel.rs index ff5fc19..1faed96 100644 --- a/src/client/consumer/channel.rs +++ b/src/client/consumer/channel.rs @@ -22,7 +22,7 @@ impl Channel { let channel = connection .create_channel() .await - .map_err(|e| ChannelError::LapinError(e))?; + .map_err(ChannelError::LapinError)?; channel .basic_qos( 1, @@ -31,7 +31,7 @@ impl Channel { }, ) .await - .map_err(|e| ChannelError::LapinError(e))?; + .map_err(ChannelError::LapinError)?; logger::log(format!( "[{}] Created channel with id: {}", @@ -50,7 +50,7 @@ impl Channel { FieldTable::default(), ) .await - .map_err(|e| ChannelError::LapinError(e))?; + .map_err(ChannelError::LapinError)?; Ok((channel, queue)) } diff --git a/src/client/consumer/message.rs b/src/client/consumer/message.rs index 216b5b9..8744386 100644 --- a/src/client/consumer/message.rs +++ b/src/client/consumer/message.rs @@ -125,10 +125,7 @@ impl Message { .get_command_timeout(queue_config.id), ) .map(|_| CommandResult::Timeout); - let output = message_command - .command - .output() - .map(|output| CommandResult::Output(output)); + let output = message_command.command.output().map(CommandResult::Output); let (res, _, _) = select_all(vec![timeout.boxed(), output.boxed()]).await; match res { @@ -151,7 +148,7 @@ impl Message { delivery.delivery_tag, BasicAckOptions { multiple: false }, ) - .map_err(|e| MessageError::LapinError(e)) + .map_err(MessageError::LapinError) .await?; } _ => match output.status.code().unwrap_or(NEGATIVE_ACKNOWLEDGEMENT) { @@ -166,7 +163,7 @@ impl Message { delivery.delivery_tag, BasicAckOptions { multiple: false }, ) - .map_err(|e| MessageError::LapinError(e)) + .map_err(MessageError::LapinError) .await?; self.queue.write().await.set_queue_wait( @@ -192,7 +189,7 @@ impl Message { delivery.delivery_tag, BasicRejectOptions { requeue: true }, ) - .map_err(|e| MessageError::LapinError(e)) + .map_err(MessageError::LapinError) .await?; let ms = self @@ -231,7 +228,7 @@ impl Message { delivery.delivery_tag, BasicRejectOptions { requeue: false }, ) - .map_err(|e| MessageError::LapinError(e)) + .map_err(MessageError::LapinError) .await?; } }, @@ -251,7 +248,7 @@ impl Message { delivery.delivery_tag, BasicRejectOptions { requeue: false }, ) - .map_err(|e| MessageError::LapinError(e)) + .map_err(MessageError::LapinError) .await?; } } @@ -271,7 +268,7 @@ impl Message { channel .basic_reject(delivery.delivery_tag, BasicRejectOptions { requeue: true }) - .map_err(|e| MessageError::LapinError(e)) + .map_err(MessageError::LapinError) .await?; Ok(MessageResult::GenericOk) @@ -280,7 +277,7 @@ impl Message { } async fn wait_db(&self, index: i32, queue_config: &QueueConfig) { - while let Some(_) = async { + while async { let is_enabled = self.queue.write().await.is_enabled(queue_config.id); if is_enabled { let waiting = self @@ -307,13 +304,15 @@ impl Message { RetryMode::Forced, ); - Some(utils::wait(DEFAULT_WAIT_PART).await) + utils::wait(DEFAULT_WAIT_PART).await; + Some(()) } } else { None } } .await + .is_some() {} } } diff --git a/src/client/consumer/mod.rs b/src/client/consumer/mod.rs index 6ff989f..b88400f 100644 --- a/src/client/consumer/mod.rs +++ b/src/client/consumer/mod.rs @@ -81,14 +81,12 @@ impl Consumer { .on_connect(&self.config.rabbit.host, self.config.rabbit.port); } - let mut sigint = - signal(SignalKind::interrupt()).map_err(|e| ConsumerError::IoError(e))?; + let mut sigint = signal(SignalKind::interrupt()).map_err(ConsumerError::IoError)?; let sigint = sigint.recv().map(|_| Ok(ConsumerResult::Killed)); - let mut sigquit = - signal(SignalKind::quit()).map_err(|e| ConsumerError::IoError(e))?; + let mut sigquit = signal(SignalKind::quit()).map_err(ConsumerError::IoError)?; let sigquit = sigquit.recv().map(|_| Ok(ConsumerResult::Killed)); let mut sigterm = - signal(SignalKind::terminate()).map_err(|e| ConsumerError::IoError(e))?; + signal(SignalKind::terminate()).map_err(ConsumerError::IoError)?; let sigterm = sigterm.recv().map(|_| Ok(ConsumerResult::Killed)); let mut futures = Vec::new(); @@ -188,17 +186,18 @@ impl Consumer { self.message .handle_message(index, &queue_config, &channel, delivery) .await - .map_err(|e| ConsumerError::MessageError(e))?; + .map_err(ConsumerError::MessageError)?; } if !is_enabled { if !is_changed { - if let Err(_) = channel + if channel .basic_cancel( &consumer_name, BasicCancelOptions { nowait: false }, ) .await + .is_err() { logger::log(&format!( "[{}] Error canceling the consumer #{}, returning...", @@ -209,9 +208,10 @@ impl Consumer { } } - if let Err(_) = channel + if channel .basic_recover(BasicRecoverOptions { requeue: true }) .await + .is_err() { logger::log( &format!( @@ -262,14 +262,16 @@ impl Consumer { } async fn check_consumer(&self, queue_config: &QueueConfig) { - while let Some(_) = async { + while async { if !self.queue.write().await.is_enabled(queue_config.id) { - Some(utils::wait(CONSUMER_WAIT).await) + utils::wait(CONSUMER_WAIT).await; + Some(()) } else { None } } .await + .is_some() {} } } diff --git a/src/client/executor/waiter.rs b/src/client/executor/waiter.rs index 0d3a00a..3adf9bc 100644 --- a/src/client/executor/waiter.rs +++ b/src/client/executor/waiter.rs @@ -19,11 +19,7 @@ impl Waiter { pub fn is_to_close(&self) -> bool { if self.connections > 0 { - if self.waiting_times < self.connections { - false - } else { - true - } + self.waiting_times >= self.connections } else { false } @@ -37,10 +33,8 @@ impl Events for Waiter { } fn on_error(&mut self, _error: &str) { - if self.connections > 0 { - if self.waiting_times < self.connections { - self.waiting_times += 1; - } + if self.connections > 0 && self.waiting_times < self.connections { + self.waiting_times += 1; } if self.waiting < (u64::max_value() / 2) { diff --git a/src/config/database/mod.rs b/src/config/database/mod.rs index 6279e42..fb40697 100644 --- a/src/config/database/mod.rs +++ b/src/config/database/mod.rs @@ -43,10 +43,12 @@ impl Database { )); let manager = ConnectionManager::::new(database_url); - Pool::builder().build(manager).expect(&format!( - "Error connecting to host {} with db name {}.", - config.host, config.db_name - )) + Pool::builder().build(manager).unwrap_or_else(|e| { + panic!( + "Error {:?} connecting to host {} with db name {}.", + e, config.host, config.db_name + ) + }) } pub fn reconnect(&mut self) { diff --git a/src/config/mod.rs b/src/config/mod.rs index 6332625..f876812 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -7,8 +7,6 @@ use std::env; use std::fs::File; use std::path::Path; -use toml; - use serde::Deserialize; use crate::config::queue::config::QueueConfig; @@ -72,10 +70,12 @@ impl Config { configuration.replace(&format!("\"${}\"", key), &format!("\"{}\"", value)); } - toml::from_str(&configuration).expect(&format!( - "Couldn't load the configuration file \"{}\".", - config - )) + toml::from_str(&configuration).unwrap_or_else(|e| { + panic!( + "Couldn't load the configuration file \"{}\": {:?}", + config, e + ) + }) } } }