Skip to content

Commit

Permalink
Slightly improve queue usage. Its not perfect yet but it catches some…
Browse files Browse the repository at this point in the history
… edge cases now
  • Loading branch information
MTRNord committed Sep 9, 2023
1 parent c27879b commit 24ed0f7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
49 changes: 35 additions & 14 deletions crates/erooster_smtp/src/servers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use erooster_core::{
};
use futures::{Sink, SinkExt};
use std::sync::Arc;
use tracing::instrument;
use yaque::Receiver;
use tracing::{error, instrument, warn};
use yaque::{recovery::recover, Receiver, Sender};

use self::sending::EmailPayload;

Expand Down Expand Up @@ -66,23 +66,44 @@ pub async fn start(
});

// Start listening for tasks
let mut receiver = Receiver::open(config.task_folder.clone())?;
let mut receiver = Receiver::open(config.task_folder.clone());
if let Err(e) = receiver {
warn!("Unable to open receiver: {:?}. Trying to recover.", e);
recover(&config.task_folder)?;
receiver = Receiver::open(config.task_folder.clone());
}

loop {
let data = receiver.recv().await;
match receiver {
Ok(mut receiver) => loop {
let data = receiver.recv().await;

match data {
Ok(data) => {
let email_bytes = &*data;
let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)?;
match data {
Ok(data) => {
let email_bytes = &*data;
let email_json = serde_json::from_slice::<EmailPayload>(email_bytes)?;

if let Err(e) = send_email_job(data, email_json).await {
tracing::error!("Error while sending email: {:?}", e);
if let Err(e) = send_email_job(&email_json).await {
tracing::error!(
"Error while sending email: {:?}. Adding it to the queue again",
e
);
// FIXME: This can race the lock leading to an error. We should
// probably handle this better.
let mut sender = Sender::open(config.task_folder.clone())?;
let json_bytes = serde_json::to_vec(&email_json)?;
sender.send(json_bytes).await?;
}
// Mark the job as complete
data.commit()?;
}
Err(e) => {
tracing::error!("Error while receiving data from receiver: {:?}", e);
}
}
Err(e) => {
tracing::error!("Error while receiving data from receiver: {:?}", e);
}
},
Err(e) => {
error!("Unable to open receiver: {:?}. Giving up.", e);
Ok(())
}
}
}
20 changes: 8 additions & 12 deletions crates/erooster_smtp/src/servers/sending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use tokio_util::codec::Framed;
use tracing::{debug, error, instrument, warn};
use trust_dns_resolver::TokioAsyncResolver;
use uuid::Uuid;
use yaque::queue::RecvGuard;

#[derive(Debug, Serialize, Deserialize)]
pub struct EmailPayload {
Expand Down Expand Up @@ -272,10 +271,9 @@ where

// Note this is a hack to get max retries. Please fix this
#[allow(clippy::too_many_lines)]
#[instrument(skip(guard, email))]
#[instrument(skip(email))]
pub async fn send_email_job(
guard: RecvGuard<'_, Vec<u8>>,
email: EmailPayload,
email: &EmailPayload,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
debug!("[{}] Starting to send email job: {}", email.id, email.id);
// Decode a JSON payload
Expand Down Expand Up @@ -355,17 +353,17 @@ pub async fn send_email_job(
}
let Some(address) = address else { continue };

match get_secure_connection(address, &email, target, &tls_domain).await {
match get_secure_connection(address, email, target, &tls_domain).await {
Ok(secure_con) => {
if let Err(e) = send_email(secure_con, &email, to, true).await {
if let Err(e) = send_email(secure_con, email, to, true).await {
warn!(
"[{}] Error sending email via tls on port 465 to {}: {}",
email.id, target, e
);
// TODO try starttls first
match get_unsecure_connection(address, &email, target).await {
match get_unsecure_connection(address, email, target).await {
Ok(unsecure_con) => {
if let Err(e) = send_email(unsecure_con, &email, to, false).await {
if let Err(e) = send_email(unsecure_con, email, to, false).await {
return Err(From::from(format!(
"[{}] Error sending email via tcp on port 25 to {}: {}",
email.id, target, e
Expand All @@ -387,8 +385,8 @@ pub async fn send_email_job(
email.id, target, e
);
// TODO try starttls first
let unsecure_con = get_unsecure_connection(address, &email, target).await?;
if let Err(e) = send_email(unsecure_con, &email, to, false).await {
let unsecure_con = get_unsecure_connection(address, email, target).await?;
if let Err(e) = send_email(unsecure_con, email, to, false).await {
return Err(From::from(format!(
"[{}] Error sending email via tcp on port 25 to {}: {}",
email.id, target, e
Expand All @@ -398,8 +396,6 @@ pub async fn send_email_job(
}
}
debug!("[{}] Finished sending email job: {}", email.id, email.id);
// Mark the job as complete
guard.commit()?;

Ok(())
}
Expand Down

0 comments on commit 24ed0f7

Please sign in to comment.