Skip to content

Commit

Permalink
Updated all libraries to the last version including the compiler and …
Browse files Browse the repository at this point in the history
…changed the default logging system with a more advanced one
  • Loading branch information
Emulator000 committed Nov 27, 2020
1 parent e1e047c commit 12ec113
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 444 deletions.
595 changes: 249 additions & 346 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rabbitmq-consumer"
version = "1.0.3"
version = "1.1.0"
authors = ["Dario Cancelliere <dario.cancelliere@facile.it>"]
edition = "2018"

Expand All @@ -12,16 +12,17 @@
lto = true

[dependencies]
async-std = "^1.6"
tokio = { version = "^0.2", features = ["full"] }
async-std = "^1.7"
tokio = { version = "^0.3", features = ["full"] }
futures = "^0.3"
lapin = "^1.2"
lapin = "^1.6"
toml = "^0.5"
serde = "^1.0"
serde_derive = "^1.0"
diesel = { version = "^1.4", features = ["mysql", "chrono", "r2d2"] }
chrono = { version = "^0.4", features = ["serde"] }
env_logger = "^0.7"
base64 = "^0.9"
log = "^0.4"
env_logger = "^0.8"
base64 = "^0.13"
clap = "^2.33"
crystalsoft-utils = "^0.1"
5 changes: 2 additions & 3 deletions src/client/consumer/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use lapin::options::{BasicQosOptions, QueueDeclareOptions};
use lapin::{types::FieldTable, Channel as LapinChannel, Connection, Error as LapinError, Queue};

use crate::config::queue::config::QueueConfig;
use crate::logger;

#[derive(Debug)]
pub enum ChannelError {
Expand Down Expand Up @@ -33,11 +32,11 @@ impl Channel {
.await
.map_err(ChannelError::LapinError)?;

logger::log(format!(
info!(
"[{}] Created channel with id: {}",
queue.queue_name,
channel.id()
));
);

let queue = channel
.queue_declare(
Expand Down
5 changes: 2 additions & 3 deletions src/client/consumer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use lapin::{
};

use crate::config::RabbitConfig;
use crate::logger;

#[derive(Debug, Clone)]
pub enum ConnectionError {
Expand Down Expand Up @@ -35,10 +34,10 @@ impl Connection {
}

if self.lapin.is_err() {
logger::log(&format!(
info!(
"Connecting to RabbitMQ at {}:{}...",
self.config.host, self.config.port
));
);

match LapinConnection::connect_uri(
AMQPUri {
Expand Down
47 changes: 19 additions & 28 deletions src/client/consumer/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use base64::encode as base64_encode;
use crate::client::consumer::DEFAULT_WAIT_PART;
use crate::config::queue::config::QueueConfig;
use crate::config::queue::{Queue, RetryMode, RetryType};
use crate::logger;
use crate::utils;

#[derive(Debug)]
Expand Down Expand Up @@ -100,10 +99,10 @@ impl Message {
}
};

logger::log(&format!(
info!(
"[{}] Executing command \"{}\" on consumer #{}",
queue_config.queue_name, message_command.human, index
));
);

self.process_message(index, queue_config, msg, message_command, channel, delivery)
.await
Expand Down Expand Up @@ -134,13 +133,11 @@ impl Message {
match output {
Ok(output) => match retry_type {
RetryType::Ignored => {
logger::log(
&format!(
"[{}] Command \"{}\" executed on consumer #{} and result ignored, message removed.",
queue_config.queue_name,
message_command.human,
index
)
info!(
"[{}] Command \"{}\" executed on consumer #{} and result ignored, message removed.",
queue_config.queue_name,
message_command.human,
index
);

channel
Expand All @@ -153,10 +150,10 @@ impl Message {
}
_ => match output.status.code().unwrap_or(NEGATIVE_ACKNOWLEDGEMENT) {
ACKNOWLEDGEMENT => {
logger::log(&format!(
info!(
"[{}] Command \"{}\" succeeded on consumer #{}, message removed.",
queue_config.queue_name, message_command.human, index
));
);

channel
.basic_ack(
Expand All @@ -174,15 +171,13 @@ impl Message {
);
}
NEGATIVE_ACKNOWLEDGEMENT_AND_RE_QUEUE => {
logger::log(
&format!(
info!(
"[{}] Command \"{}\" failed on consumer #{}, message rejected and requeued. Output:\n{:#?}",
queue_config.queue_name,
message_command.human,
index,
output
)
);
);

channel
.basic_reject(
Expand All @@ -198,10 +193,10 @@ impl Message {
.await
.get_queue_wait(queue_config.id, index);

logger::log(&format!(
info!(
"[{}] Waiting {} milliseconds for consumer #{}...",
queue_config.queue_name, ms, index
));
);

self.wait_db(index, queue_config).await;

Expand All @@ -213,15 +208,13 @@ impl Message {
);
}
_ => {
logger::log(
&format!(
info!(
"[{}] Command \"{}\" failed on consumer #{}, message rejected. Output:\n{:#?}",
queue_config.queue_name,
message_command.human,
index,
output
)
);
);

channel
.basic_reject(
Expand All @@ -234,14 +227,14 @@ impl Message {
},
},
Err(e) => {
logger::log(&format!(
info!(
"[{}] Error {:?} executing the command \"{}\" on consumer #{}, message \"{}\" rejected...",
queue_config.queue_name,
e,
message_command.human,
index,
msg
));
);

channel
.basic_reject(
Expand All @@ -256,15 +249,13 @@ impl Message {
Ok(MessageResult::GenericOk)
}
CommandResult::Timeout => {
logger::log(
&format!(
info!(
"[{}] Timeout occurred executing the command \"{}\" on consumer #{}, message \"{}\" rejected and requeued...",
queue_config.queue_name,
message_command.human,
index,
msg
)
);
);

channel
.basic_reject(delivery.delivery_tag, BasicRejectOptions { requeue: true })
Expand Down
40 changes: 16 additions & 24 deletions src/client/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::config::file::File;
use crate::config::queue::config::QueueConfig;
use crate::config::queue::Queue;
use crate::config::Config;
use crate::logger;
use crate::utils;

const CONSUMER_WAIT: u64 = 60000;
Expand Down Expand Up @@ -94,7 +93,7 @@ impl Consumer {
futures.push(sigquit.boxed());
futures.push(sigterm.boxed());

logger::log("Managing queues...");
info!("Managing queues...");

let queues = self.queue.write().await.get_queues();
if queues.is_empty() {
Expand All @@ -112,7 +111,7 @@ impl Consumer {
.await
{
Ok((c, q)) => {
logger::log(format!("[{}] Queue created", queue.queue_name));
info!("[{}] Queue created", queue.queue_name);

self.consume(index, queue.clone(), c, q).boxed()
}
Expand Down Expand Up @@ -140,10 +139,10 @@ impl Consumer {
let consumer_name = format!("{}_consumer_{}", queue_config.consumer_name, index);

if !self.queue.write().await.is_enabled(queue_config.id) {
logger::log(format!(
info!(
"[{}] Consumer #{} with \"{}\" not enabled, waiting...",
queue_config.queue_name, index, consumer_name
));
);
}

self.check_consumer(&queue_config).await;
Expand All @@ -161,10 +160,10 @@ impl Consumer {

match consumer {
Ok(mut consumer) => {
logger::log(format!(
info!(
"[{}] Consumer #{} declared \"{}\"",
queue_config.queue_name, index, consumer_name
));
);

while let Some(delivery) = consumer.next().await {
match delivery {
Expand Down Expand Up @@ -193,10 +192,10 @@ impl Consumer {
.await
.is_err()
{
logger::log(&format!(
info!(
"[{}] Error canceling the consumer #{}, returning...",
queue_config.queue_name, index
));
);
} else {
utils::wait(DEFAULT_WAIT_PART).await;
}
Expand All @@ -207,47 +206,40 @@ impl Consumer {
.await
.is_err()
{
logger::log(
&format!(
info!(
"[{}] Error recovering message for consumer #{}, message is not ackable...",
queue_config.queue_name,
index
)
);
);

return Ok(ConsumerResult::GenericOk);
}

logger::log(
&format!(
info!(
"[{}] Consumer #{} not active, messages recovered and consumer canceled...",
queue_config.queue_name,
index
)
);
);

return Ok(ConsumerResult::ConsumerChanged);
} else if is_changed {
logger::log(&format!(
info!(
"[{}] Consumers count changed, messages recovered...",
queue_config.queue_name
));
);

return Ok(ConsumerResult::CountChanged);
}
}
Err(e) => {
logger::log(format!(
"[{}] Error getting messages.",
queue_config.queue_name
));
info!("[{}] Error getting messages.", queue_config.queue_name);

return Err(ConsumerError::LapinError(e));
}
}
}

logger::log("Messages has been processed.");
info!("Messages has been processed.");

Ok(ConsumerResult::GenericOk)
}
Expand Down
15 changes: 5 additions & 10 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::Duration;
use crate::client::consumer::ConsumerError;
use crate::client::executor::{Executor, ExecutorResult};
use crate::config::Config;
use crate::logger;

pub enum ClientResult {
Ok,
Expand All @@ -28,28 +27,24 @@ impl Client {
pub async fn run(&mut self) -> ClientResult {
loop {
match self.executor.execute().await {
ExecutorResult::Restart => logger::log("Consumer count changed, restarting..."),
ExecutorResult::Restart => info!("Consumer count changed, restarting..."),
ExecutorResult::Wait(error, waiting) => {
logger::log(&format!(
"Error ({:?}), waiting {} seconds...",
error,
waiting / 1000
));
info!("Error ({:?}), waiting {} seconds...", error, waiting / 1000);

thread::sleep(Duration::from_millis(waiting));
}
ExecutorResult::Exit => {
logger::log("Process finished, exiting...");
info!("Process finished, exiting...");

break;
}
ExecutorResult::Killed => {
logger::log("Process killed, exiting...");
info!("Process killed, exiting...");

break;
}
ExecutorResult::Error(e) => {
logger::log(&format!("Error ({:?}), exiting...", e));
info!("Error ({:?}), exiting...", e);

return ClientResult::ConsumerError(e);
}
Expand Down
5 changes: 2 additions & 3 deletions src/config/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::config::database::schema::queues;
use crate::config::queue::config::QueueConfig;
use crate::config::queue::model::QueueModel;
use crate::config::DatabaseConfig;
use crate::logger;

pub struct Database {
pub pool: Pool<ConnectionManager<MysqlConnection>>,
Expand Down Expand Up @@ -36,11 +35,11 @@ impl Database {
config.db_name
);

logger::log(&format!(
info!(
"Connecting to MySQL at {}:{}...",
config.host,
config.port.unwrap_or(Self::DEFAULT_PORT)
));
);

let manager = ConnectionManager::<MysqlConnection>::new(database_url);
Pool::builder().build(manager).unwrap_or_else(|e| {
Expand Down
Loading

0 comments on commit 12ec113

Please sign in to comment.