Skip to content

Commit

Permalink
Applied Cargo clippy and Cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Emulator000 committed Oct 5, 2020
1 parent a6b22e4 commit c592ab2
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rabbitmq-consumer"
version = "1.0.0"
version = "1.0.1"
authors = ["Dario Cancelliere <dario.cancelliere@facile.it>"]
edition = "2018"

Expand Down
6 changes: 3 additions & 3 deletions src/client/consumer/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl Channel {
let channel = connection
.create_channel()
.await
.map_err(|e| ChannelError::LapinError(e))?;
.map_err(ChannelError::LapinError)?;
channel
.basic_qos(
1,
Expand All @@ -31,7 +31,7 @@ impl Channel {
},
)
.await
.map_err(|e| ChannelError::LapinError(e))?;
.map_err(ChannelError::LapinError)?;

logger::log(format!(
"[{}] Created channel with id: {}",
Expand All @@ -50,7 +50,7 @@ impl Channel {
FieldTable::default(),
)
.await
.map_err(|e| ChannelError::LapinError(e))?;
.map_err(ChannelError::LapinError)?;

Ok((channel, queue))
}
Expand Down
23 changes: 11 additions & 12 deletions src/client/consumer/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ impl Message {
.get_command_timeout(queue_config.id),
)
.map(|_| CommandResult::Timeout);
let output = message_command
.command
.output()
.map(|output| CommandResult::Output(output));
let output = message_command.command.output().map(CommandResult::Output);

let (res, _, _) = select_all(vec![timeout.boxed(), output.boxed()]).await;
match res {
Expand All @@ -151,7 +148,7 @@ impl Message {
delivery.delivery_tag,
BasicAckOptions { multiple: false },
)
.map_err(|e| MessageError::LapinError(e))
.map_err(MessageError::LapinError)
.await?;
}
_ => match output.status.code().unwrap_or(NEGATIVE_ACKNOWLEDGEMENT) {
Expand All @@ -166,7 +163,7 @@ impl Message {
delivery.delivery_tag,
BasicAckOptions { multiple: false },
)
.map_err(|e| MessageError::LapinError(e))
.map_err(MessageError::LapinError)
.await?;

self.queue.write().await.set_queue_wait(
Expand All @@ -192,7 +189,7 @@ impl Message {
delivery.delivery_tag,
BasicRejectOptions { requeue: true },
)
.map_err(|e| MessageError::LapinError(e))
.map_err(MessageError::LapinError)
.await?;

let ms = self
Expand Down Expand Up @@ -231,7 +228,7 @@ impl Message {
delivery.delivery_tag,
BasicRejectOptions { requeue: false },
)
.map_err(|e| MessageError::LapinError(e))
.map_err(MessageError::LapinError)
.await?;
}
},
Expand All @@ -251,7 +248,7 @@ impl Message {
delivery.delivery_tag,
BasicRejectOptions { requeue: false },
)
.map_err(|e| MessageError::LapinError(e))
.map_err(MessageError::LapinError)
.await?;
}
}
Expand All @@ -271,7 +268,7 @@ impl Message {

channel
.basic_reject(delivery.delivery_tag, BasicRejectOptions { requeue: true })
.map_err(|e| MessageError::LapinError(e))
.map_err(MessageError::LapinError)
.await?;

Ok(MessageResult::GenericOk)
Expand All @@ -280,7 +277,7 @@ impl Message {
}

async fn wait_db(&self, index: i32, queue_config: &QueueConfig) {
while let Some(_) = async {
while async {
let is_enabled = self.queue.write().await.is_enabled(queue_config.id);
if is_enabled {
let waiting = self
Expand All @@ -307,13 +304,15 @@ impl Message {
RetryMode::Forced,
);

Some(utils::wait(DEFAULT_WAIT_PART).await)
utils::wait(DEFAULT_WAIT_PART).await;
Some(())
}
} else {
None
}
}
.await
.is_some()
{}
}
}
22 changes: 12 additions & 10 deletions src/client/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,12 @@ impl Consumer {
.on_connect(&self.config.rabbit.host, self.config.rabbit.port);
}

let mut sigint =
signal(SignalKind::interrupt()).map_err(|e| ConsumerError::IoError(e))?;
let mut sigint = signal(SignalKind::interrupt()).map_err(ConsumerError::IoError)?;
let sigint = sigint.recv().map(|_| Ok(ConsumerResult::Killed));
let mut sigquit =
signal(SignalKind::quit()).map_err(|e| ConsumerError::IoError(e))?;
let mut sigquit = signal(SignalKind::quit()).map_err(ConsumerError::IoError)?;
let sigquit = sigquit.recv().map(|_| Ok(ConsumerResult::Killed));
let mut sigterm =
signal(SignalKind::terminate()).map_err(|e| ConsumerError::IoError(e))?;
signal(SignalKind::terminate()).map_err(ConsumerError::IoError)?;
let sigterm = sigterm.recv().map(|_| Ok(ConsumerResult::Killed));

let mut futures = Vec::new();
Expand Down Expand Up @@ -188,17 +186,18 @@ impl Consumer {
self.message
.handle_message(index, &queue_config, &channel, delivery)
.await
.map_err(|e| ConsumerError::MessageError(e))?;
.map_err(ConsumerError::MessageError)?;
}

if !is_enabled {
if !is_changed {
if let Err(_) = channel
if channel
.basic_cancel(
&consumer_name,
BasicCancelOptions { nowait: false },
)
.await
.is_err()
{
logger::log(&format!(
"[{}] Error canceling the consumer #{}, returning...",
Expand All @@ -209,9 +208,10 @@ impl Consumer {
}
}

if let Err(_) = channel
if channel
.basic_recover(BasicRecoverOptions { requeue: true })
.await
.is_err()
{
logger::log(
&format!(
Expand Down Expand Up @@ -262,14 +262,16 @@ impl Consumer {
}

async fn check_consumer(&self, queue_config: &QueueConfig) {
while let Some(_) = async {
while async {
if !self.queue.write().await.is_enabled(queue_config.id) {
Some(utils::wait(CONSUMER_WAIT).await)
utils::wait(CONSUMER_WAIT).await;
Some(())
} else {
None
}
}
.await
.is_some()
{}
}
}
Expand Down
12 changes: 3 additions & 9 deletions src/client/executor/waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ impl Waiter {

pub fn is_to_close(&self) -> bool {
if self.connections > 0 {
if self.waiting_times < self.connections {
false
} else {
true
}
self.waiting_times >= self.connections
} else {
false
}
Expand All @@ -37,10 +33,8 @@ impl Events for Waiter {
}

fn on_error(&mut self, _error: &str) {
if self.connections > 0 {
if self.waiting_times < self.connections {
self.waiting_times += 1;
}
if self.connections > 0 && self.waiting_times < self.connections {
self.waiting_times += 1;
}

if self.waiting < (u64::max_value() / 2) {
Expand Down
10 changes: 6 additions & 4 deletions src/config/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ impl Database {
));

let manager = ConnectionManager::<MysqlConnection>::new(database_url);
Pool::builder().build(manager).expect(&format!(
"Error connecting to host {} with db name {}.",
config.host, config.db_name
))
Pool::builder().build(manager).unwrap_or_else(|e| {
panic!(
"Error {:?} connecting to host {} with db name {}.",
e, config.host, config.db_name
)
})
}

pub fn reconnect(&mut self) {
Expand Down
12 changes: 6 additions & 6 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use std::env;
use std::fs::File;
use std::path::Path;

use toml;

use serde::Deserialize;

use crate::config::queue::config::QueueConfig;
Expand Down Expand Up @@ -72,10 +70,12 @@ impl Config {
configuration.replace(&format!("\"${}\"", key), &format!("\"{}\"", value));
}

toml::from_str(&configuration).expect(&format!(
"Couldn't load the configuration file \"{}\".",
config
))
toml::from_str(&configuration).unwrap_or_else(|e| {
panic!(
"Couldn't load the configuration file \"{}\": {:?}",
config, e
)
})
}
}
}
Expand Down

0 comments on commit c592ab2

Please sign in to comment.