Skip to content

Commit

Permalink
Fix: load layer from poller (#354)
Browse files Browse the repository at this point in the history
* fix: backend layers were not loaded

* fix: handle clone
  • Loading branch information
geofmureithi authored Jul 9, 2024
1 parent 2fc018d commit 8bc2899
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 7 deletions.
7 changes: 4 additions & 3 deletions examples/redis/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
};

use anyhow::Result;
use apalis::prelude::*;
use apalis::{layers::limit::RateLimitLayer, redis::RedisStorage};
use apalis::{layers::TimeoutLayer, prelude::*};

use email_service::{send_email, Email};
use tracing::{error, info};
Expand Down Expand Up @@ -46,9 +46,10 @@ async fn main() -> Result<()> {
produce_jobs(storage.clone()).await?;

let worker = WorkerBuilder::new("rango-tango")
.chain(|svc| svc.timeout(Duration::from_millis(500)))
.data(Count::default())
.chain(|svc| svc.map_err(|e| Error::Failed(e)))
.layer(RateLimitLayer::new(5, Duration::from_secs(1)))
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.data(Count::default())
.with_storage(storage)
.build_fn(send_email);

Expand Down
2 changes: 1 addition & 1 deletion packages/apalis-core/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<A: fmt::Display> AckResponse<A> {
}

/// A generic stream that emits (worker_id, task_id)
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AckStream<A>(pub Sender<AckResponse<A>>);

impl<J, A: Send + Clone + 'static> Ack<J> for AckStream<A> {
Expand Down
16 changes: 15 additions & 1 deletion packages/apalis-core/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use futures::{future::BoxFuture, Future, FutureExt};
use tower::Service;
use tower::{Layer, Service};
mod shutdown;

use crate::{
Expand Down Expand Up @@ -91,6 +91,13 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
S::Response: 'static,
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
P::Layer: Layer<S>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Service<Request<J>>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
{
self.workers.push(worker.with_monitor(&self));

Expand Down Expand Up @@ -121,6 +128,13 @@ impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
S::Response: 'static,
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
P::Layer: Layer<S>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Service<Request<J>>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
{
let workers = worker.with_monitor_instances(count, &self);
self.workers.extend(workers);
Expand Down
38 changes: 36 additions & 2 deletions packages/apalis-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context as TaskCtx, Poll, Waker};
use thiserror::Error;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower::{Layer, Service, ServiceBuilder, ServiceExt};

mod buffer;
mod stream;
Expand Down Expand Up @@ -230,12 +230,21 @@ impl<S, P> Worker<Ready<S, P>> {
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
E: Executor + Clone + Send + 'static + Sync,
P::Layer: Layer<S>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Service<Request<J>>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
let backend = self.state.backend;
let poller = backend.poll(self.id.clone());
let polling = poller.heartbeat.shared();
let default_layer = poller.layer;
let service = default_layer.layer(service);
let worker_stream = WorkerStream::new(poller.stream, notifier.clone())
.into_future()
.shared();
Expand All @@ -261,13 +270,22 @@ impl<S, P> Worker<Ready<S, P>> {
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
E: Executor + Clone + Send + 'static + Sync,
P::Layer: Layer<S>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Service<Request<J>>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
let backend = self.state.backend;
let executor = monitor.executor().clone();
let context = monitor.context().clone();
let poller = backend.poll(self.id.clone());
let default_layer = poller.layer;
let service = default_layer.layer(service);
let polling = poller.heartbeat.shared();
let worker_stream = WorkerStream::new(poller.stream, notifier.clone())
.into_future()
Expand Down Expand Up @@ -298,14 +316,23 @@ impl<S, P> Worker<Ready<S, P>> {
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
E: Executor + Clone + Send + 'static + Sync,
P::Layer: Layer<S>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Service<Request<J>>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
{
let notifier = Notify::new();
let service = self.state.service;
let (service, poll_worker) = Buffer::pair(service, instances);
let backend = self.state.backend;
let executor = monitor.executor().clone();
let context = monitor.context().clone();
let poller = backend.poll(self.id.clone());
let default_layer = poller.layer;
let service = default_layer.layer(service);
let (service, poll_worker) = Buffer::pair(service, instances);
let polling = poller.heartbeat.shared();
let worker_stream = WorkerStream::new(poller.stream, notifier.clone())
.into_future()
Expand Down Expand Up @@ -345,6 +372,13 @@ impl<S, P> Worker<Ready<S, P>> {
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
E: Executor + Clone + Send + 'static + Sync,
P::Layer: Layer<S>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Service<Request<J>>,
<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service: Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Future:
Send,
<<<P as Backend<Request<J>>>::Layer as Layer<S>>::Service as Service<Request<J>>>::Error:
Send + std::error::Error + Sync,
{
let worker_id = self.id.clone();
let notifier = Notify::new();
Expand Down

0 comments on commit 8bc2899

Please sign in to comment.