Skip to content

Commit

Permalink
Merge 2f14fc6 into 8bc2899
Browse files Browse the repository at this point in the history
  • Loading branch information
geofmureithi authored Jul 9, 2024
2 parents 8bc2899 + 2f14fc6 commit fa0457d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
1 change: 0 additions & 1 deletion benches/storages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use apalis::{
postgres::{PgPool, PostgresStorage},
sqlite::{SqlitePool, SqliteStorage},
};
use apalis_redis::Config;
use criterion::*;
use futures::Future;
use paste::paste;
Expand Down
16 changes: 5 additions & 11 deletions examples/redis-mq-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ use apalis::{

use apalis_core::{
codec::json::JsonCodec,
layers::{Ack, AckLayer},
layers::{Ack, AckLayer, AckResponse},
};
use email_service::{send_email, Email};
use futures::{channel::mpsc, SinkExt};
use rsmq_async::{Rsmq, RsmqConnection, RsmqError};
use tokio::time::sleep;
use tower::layer::util::Identity;
use tracing::{error, info};

struct RedisMq<T> {
Expand Down Expand Up @@ -71,13 +70,9 @@ impl<T: Send> Ack<T> for RedisMq<T> {

type Error = RsmqError;

async fn ack(
&mut self,
worker_id: &WorkerId,
data: &Self::Acknowledger,
) -> Result<(), Self::Error> {
println!("Attempting to ACK {}", data);
self.conn.delete_message("email", data).await?;
async fn ack(&mut self, ack: AckResponse<String>) -> Result<(), Self::Error> {
println!("Attempting to ACK {}", ack.acknowledger);
self.conn.delete_message("email", &ack.acknowledger).await?;
Ok(())
}
}
Expand Down Expand Up @@ -141,8 +136,7 @@ async fn main() -> Result<()> {
codec: RedisCodec::new(Box::new(JsonCodec)),
config: Config::default(),
};
// This can be in another part of the program
// produce_jobs(&mut mq).await?;
produce_jobs(&mut mq).await?;

let worker = WorkerBuilder::new("rango-tango")
.layer(TraceLayer::new())
Expand Down
3 changes: 2 additions & 1 deletion packages/apalis-core/src/mq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ pub trait MessageQueue<Message>: Backend<Request<Message>> {
type Error;

/// Enqueues a message to the queue.
fn enqueue(&mut self, message: Message) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn enqueue(&mut self, message: Message)
-> impl Future<Output = Result<(), Self::Error>> + Send;

/// Attempts to dequeue a message from the queue.
/// Returns `None` if the queue is empty.
Expand Down

0 comments on commit fa0457d

Please sign in to comment.