Skip to content

Commit

Permalink
Merge pull request #8 from facile-it/new-version
Browse files Browse the repository at this point in the history
New version v1.2.3
  • Loading branch information
Emulator000 authored May 27, 2021
2 parents 169b96c + ce6e3f5 commit 39bd579
Show file tree
Hide file tree
Showing 18 changed files with 337 additions and 350 deletions.
347 changes: 158 additions & 189 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 22 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
[package]
name = "rabbitmq-consumer"
version = "1.2.2"
authors = ["Dario Cancelliere <dario.cancelliere@facile.it>"]
edition = "2018"
name = "rabbitmq-consumer"
version = "1.2.3"
authors = ["Dario Cancelliere <emulator000@gmail.com>"]
edition = "2018"

[lib]
name = "rabbitmq_consumer_lib"
path = "src/lib.rs"
name = "rabbitmq_consumer_lib"
path = "src/lib.rs"

[profile.release]
lto = true
lto = true

[dependencies]
async-std = "^1.9"
tokio = { version = "^1.3", features = ["full"] }
tokio-stream = "^0.1"
futures = "^0.3"
lapin = "^1.6"
toml = "^0.5"
serde = "^1.0"
serde_derive = "^1.0"
diesel = { version = "^1.4", features = ["mysql", "chrono", "r2d2"] }
chrono = { version = "^0.4", features = ["serde"] }
log = "^0.4"
env_logger = "^0.8"
base64 = "^0.13"
clap = "^2.33"
crystalsoft-utils = "^0.1"
async-std = "^1.9"
tokio = { version = "^1.6", features = ["full"] }
tokio-stream = "^0.1"
futures = "^0.3"
lapin = "^1.7"
toml = "^0.5"
serde = "^1.0"
serde_derive = "^1.0"
diesel = { version = "^1.4", features = ["mysql", "chrono", "r2d2"] }
chrono = { version = "^0.4", features = ["serde"] }
log = "^0.4"
env_logger = "^0.8"
base64 = "^0.13"
clap = "^2.33"
crystalsoft-utils = "^0.1"
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

A configurable RabbitMQ consumer made in Rust, useful for a stable and reliable CLI commands processor.

## 1.2.0 warning
## Version 1.2.0 and 1.2.3 warning

In order to use this version correctly and only if you use the MySQL configuration for queues, you have to update your `queues` table schema adding a new `nack_code` integer field:
In order to use this version correctly and only if you use the MySQL configuration for queues, you have to update your `queues` table schema adding a new `nack_code` and `prefetch_count` integer columns:

```sql
ALTER TABLE queues ADD nack_code INT(11) DEFAULT 2 NULL;
ALTER TABLE queues ADD prefetch_count INT(11) DEFAULT 1 NULL;
```

## Installation
Expand Down
4 changes: 2 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::env;

fn main() {
if "release" == env::var("PROFILE").unwrap_or("".into()) {
if "release" == env::var("PROFILE").unwrap_or_else(|_| "".into()) {
println!(r"cargo:rustc-link-lib=static=mysqlclient");
println!(r"cargo:rustc-link-lib=static-nobundle=stdc++");
println!(r"cargo:rustc-link-lib=static:-bundle=stdc++");
println!(r"cargo:rustc-link-lib=static=z");
println!(r"cargo:rustc-link-lib=static=ssl");
println!(r"cargo:rustc-link-lib=static=crypto");
Expand Down
22 changes: 8 additions & 14 deletions src/client/consumer/channel.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use async_std::sync::Arc;

use log::info;

use lapin::options::{BasicQosOptions, QueueDeclareOptions};
use lapin::{types::FieldTable, Channel as LapinChannel, Connection, Error as LapinError, Queue};

use crate::config::queue::config::QueueConfig;

#[derive(Debug)]
pub enum ChannelError {
LapinError(LapinError),
}
type ChannelResult = Result<(LapinChannel, Queue), LapinError>;

pub struct Channel {}

Expand All @@ -17,20 +16,16 @@ impl Channel {
connection: Arc<Connection>,
queue: QueueConfig,
prefix: S,
) -> Result<(LapinChannel, Queue), ChannelError> {
let channel = connection
.create_channel()
.await
.map_err(ChannelError::LapinError)?;
) -> ChannelResult {
let channel = connection.create_channel().await?;
channel
.basic_qos(
1,
queue.prefetch_count.unwrap_or(1) as u16,
BasicQosOptions {
..Default::default()
},
)
.await
.map_err(ChannelError::LapinError)?;
.await?;

info!(
"[{}] Created channel with id: {}",
Expand All @@ -48,8 +43,7 @@ impl Channel {
},
FieldTable::default(),
)
.await
.map_err(ChannelError::LapinError)?;
.await?;

Ok((channel, queue))
}
Expand Down
8 changes: 6 additions & 2 deletions src/client/consumer/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use async_std::sync::Arc;

use log::info;

use lapin::{
uri::AMQPAuthority, uri::AMQPUri, uri::AMQPUserInfo, Connection as LapinConnection,
ConnectionProperties, Error,
Expand All @@ -13,9 +15,11 @@ pub enum ConnectionError {
LapinError(Error),
}

type ConnectionResult = Result<Arc<LapinConnection>, ConnectionError>;

pub struct Connection {
config: RabbitConfig,
lapin: Result<Arc<LapinConnection>, ConnectionError>,
lapin: ConnectionResult,
}

impl Connection {
Expand All @@ -26,7 +30,7 @@ impl Connection {
}
}

pub async fn get_connection(&mut self) -> Result<Arc<LapinConnection>, ConnectionError> {
pub async fn get_connection(&mut self) -> ConnectionResult {
if let Ok(ref lapin) = self.lapin {
if lapin.status().errored() {
self.lapin = Err(ConnectionError::NotConnected);
Expand Down
104 changes: 52 additions & 52 deletions src/client/consumer/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::str;

use async_std::sync::{Arc, RwLock};

use log::{error, info};

use tokio::process::Command;

use futures::future::select_all;
Expand All @@ -26,9 +28,7 @@ pub enum MessageError {
LapinError(LapinError),
}

pub enum MessageResult {
GenericOk,
}
type MessageResult<T> = Result<T, MessageError>;

pub enum CommandResult {
Timeout,
Expand Down Expand Up @@ -59,7 +59,7 @@ impl Message {
queue_config: &QueueConfig,
channel: &Channel,
delivery: Delivery,
) -> Result<MessageResult, MessageError> {
) -> MessageResult<()> {
let msg = {
match str::from_utf8(&delivery.data) {
Ok(msg) => {
Expand Down Expand Up @@ -116,7 +116,7 @@ impl Message {
mut message_command: MessageCommand,
channel: &Channel,
delivery: Delivery,
) -> Result<MessageResult, MessageError> {
) -> MessageResult<()> {
let timeout = utils::wait(
self.queue
.write()
Expand All @@ -133,20 +133,20 @@ impl Message {
match output {
Ok(output) => match retry_type {
RetryType::Ignored => {
info!(
"[{}] Command \"{}\" executed on consumer #{} and result ignored, message removed.",
queue_config.queue_name,
message_command.human,
index
);

channel
.basic_ack(
delivery.delivery_tag,
BasicAckOptions { multiple: false },
)
.map_err(MessageError::LapinError)
.await?;

info!(
"[{}] Command \"{}\" executed on consumer #{} and result ignored, message removed.",
queue_config.queue_name,
message_command.human,
index
);
}
_ => {
let exit_code =
Expand All @@ -166,11 +166,6 @@ impl Message {

match exit_code {
ACKNOWLEDGEMENT => {
info!(
"[{}] Command \"{}\" succeeded on consumer #{}, message removed.",
queue_config.queue_name, message_command.human, index
);

channel
.basic_ack(
delivery.delivery_tag,
Expand All @@ -179,6 +174,11 @@ impl Message {
.map_err(MessageError::LapinError)
.await?;

info!(
"[{}] Command \"{}\" succeeded on consumer #{}, message removed.",
queue_config.queue_name, message_command.human, index
);

self.queue.write().await.set_queue_wait(
queue_config.id,
queue_config.retry_wait,
Expand All @@ -187,14 +187,6 @@ impl Message {
);
}
NEGATIVE_ACKNOWLEDGEMENT_AND_RE_QUEUE => {
info!(
"[{}] Command \"{}\" failed on consumer #{}, message rejected and requeued. Output:\n{:#?}",
queue_config.queue_name,
message_command.human,
index,
output
);

channel
.basic_reject(
delivery.delivery_tag,
Expand All @@ -203,6 +195,14 @@ impl Message {
.map_err(MessageError::LapinError)
.await?;

info!(
"[{}] Command \"{}\" failed on consumer #{}, message rejected and requeued. Output:\n{:#?}",
queue_config.queue_name,
message_command.human,
index,
output
);

let ms = self
.queue
.write()
Expand All @@ -224,62 +224,62 @@ impl Message {
);
}
_ => {
info!(
"[{}] Command \"{}\" failed on consumer #{}, message rejected. Output:\n{:#?}",
queue_config.queue_name,
message_command.human,
index,
output
);

channel
.basic_reject(
delivery.delivery_tag,
BasicRejectOptions { requeue: false },
)
.map_err(MessageError::LapinError)
.await?;

info!(
"[{}] Command \"{}\" failed on consumer #{}, message rejected. Output:\n{:#?}",
queue_config.queue_name,
message_command.human,
index,
output
);
}
}
}
},
Err(e) => {
info!(
"[{}] Error {:?} executing the command \"{}\" on consumer #{}, message \"{}\" rejected...",
queue_config.queue_name,
e,
message_command.human,
index,
msg
);

channel
.basic_reject(
delivery.delivery_tag,
BasicRejectOptions { requeue: false },
)
.map_err(MessageError::LapinError)
.await?;

error!(
"[{}] Error {:?} executing the command \"{}\" on consumer #{}, message \"{}\" rejected...",
queue_config.queue_name,
e,
message_command.human,
index,
msg
);
}
}

Ok(MessageResult::GenericOk)
Ok(())
}
CommandResult::Timeout => {
info!(
"[{}] Timeout occurred executing the command \"{}\" on consumer #{}, message \"{}\" rejected and requeued...",
queue_config.queue_name,
message_command.human,
index,
msg
);

channel
.basic_reject(delivery.delivery_tag, BasicRejectOptions { requeue: true })
.map_err(MessageError::LapinError)
.await?;

Ok(MessageResult::GenericOk)
info!(
"[{}] Timeout occurred executing the command \"{}\" on consumer #{}, message \"{}\" rejected and requeued...",
queue_config.queue_name,
message_command.human,
index,
msg
);

Ok(())
}
}
}
Expand Down
Loading

0 comments on commit 39bd579

Please sign in to comment.