Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Supervisors to be stopped #48

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 51 additions & 35 deletions examples/supervisor_clear_interval.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,20 @@
use std::time::{Duration, Instant};
use std::time::Duration;
use xactor::{message, Actor, Context, Handler};

#[derive(Debug)]
pub struct PingTimer {
last_ping: Instant,
}

impl Default for PingTimer {
fn default() -> Self {
PingTimer {
last_ping: Instant::now(),
}
}
}
pub struct PingTimer;

#[async_trait::async_trait]
impl Actor for PingTimer {
async fn started(&mut self, ctx: &mut Context<Self>) -> xactor::Result<()> {
println!("PingTimer:: started()");
ctx.send_interval(Ping, Duration::from_millis(1000));
println!("PingTimer :: started()");
ctx.send_interval(Ping, Duration::from_millis(300));
Ok(())
}

/// Called after an actor is stopped.
async fn stopped(&mut self, _: &mut Context<Self>) {
println!("PingTimer:: stopped()");
println!("PingTimer :: stopped()");
}
}

Expand All @@ -34,22 +24,29 @@ struct Ping;

#[async_trait::async_trait]
impl Handler<Ping> for PingTimer {
async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Ping) {
let now = Instant::now();
let delta = (now - self.last_ping).as_millis();
self.last_ping = now;
println!("PingTimer:: Ping {} {:?}", ctx.actor_id(), delta);
async fn handle(&mut self, _: &mut Context<Self>, _msg: Ping) {
println!("PingTimer :: Ping");
}
}
#[message]
struct Halt;
struct Restart;

#[async_trait::async_trait]
impl Handler<Halt> for PingTimer {
async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Halt) {
println!("PingTimer:: received Halt");
impl Handler<Restart> for PingTimer {
async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Restart) {
println!("PingTimer :: received restart");
ctx.stop(None);
println!("PingTimer:: stopped");
}
}

#[message]
struct Shutdown;

#[async_trait::async_trait]
impl Handler<Shutdown> for PingTimer {
async fn handle(&mut self, ctx: &mut Context<Self>, _msg: Shutdown) {
println!("PingTimer :: received Shutdown");
ctx.stop_supervisor(None);
}
}

Expand All @@ -59,29 +56,48 @@ struct Panic;
#[async_trait::async_trait]
impl Handler<Panic> for PingTimer {
async fn handle(&mut self, _: &mut Context<Self>, _msg: Panic) {
println!("PingTimer:: received Panic");
panic!("intentional panic");
println!("PingTimer :: received Panic");
panic!("intentional panic: this should not occur");
}
}

#[xactor::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service_supervisor = xactor::Supervisor::start(PingTimer::default).await?;
let service_supervisor = xactor::Supervisor::start(|| PingTimer).await?;
let service_addr = service_supervisor.clone();
let service_addr2 = service_supervisor.clone();

let supervisor_task = xactor::spawn(async {
service_supervisor.wait_for_stop().await;
});

let send_halt = async {
xactor::sleep(Duration::from_millis(5_200)).await;
println!(" main :: sending Halt");
service_addr.send(Halt).unwrap();
let stop_actor = async {
xactor::sleep(Duration::from_millis(2_000)).await;
println!(" main :: sending Restart");
service_addr.send(Restart).unwrap();
};

let stop_supervisor = async move {
xactor::sleep(Duration::from_millis(3_000)).await;
println!(" main :: sending Shutdown");
service_addr2.send(Shutdown).unwrap();
};

let send_panic = async {
xactor::sleep(Duration::from_millis(5_000)).await;
println!(" main :: sending Panic after stop");
if let Err(error) = service_addr.send(Panic) {
println!(" ok :: cannot send after halting, this is very much expected");
println!(" Failing with \"{}\"", error);
}
};

let _ = futures::join!(supervisor_task, send_halt);
// run this to see that the interval is not properly stopped if the ctx is stopped
// futures::join!(supervisor_task, send_panic); // there is no panic recovery
futures::join!(
supervisor_task,
stop_actor,
stop_supervisor,
send_panic, // there is no panic recovery
);

Ok(())
}
80 changes: 4 additions & 76 deletions src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::addr::ActorEvent;
use crate::runtime::spawn;
use crate::{Addr, Context};
use crate::error::Result;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::{FutureExt, StreamExt};
use crate::{Addr, Context};

use crate::lifecycle::LifeCycle;

/// Represents a message that can be handled by the actor.
pub trait Message: 'static + Send {
Expand Down Expand Up @@ -109,75 +106,6 @@ pub trait Actor: Sized + Send + 'static {
/// }
/// ```
async fn start(self) -> Result<Addr<Self>> {
ActorManager::new().start_actor(self).await
}
}

pub(crate) struct ActorManager<A: Actor> {
ctx: Context<A>,
tx: std::sync::Arc<UnboundedSender<ActorEvent<A>>>,
rx: UnboundedReceiver<ActorEvent<A>>,
tx_exit: oneshot::Sender<()>,
}

impl<A: Actor> ActorManager<A> {
pub(crate) fn new() -> Self {
let (tx_exit, rx_exit) = oneshot::channel();
let rx_exit = rx_exit.shared();
let (ctx, rx, tx) = Context::new(Some(rx_exit));
Self {
ctx,
rx,
tx,
tx_exit,
}
}

pub(crate) fn address(&self) -> Addr<A> {
self.ctx.address()
}

pub(crate) async fn start_actor(self, mut actor: A) -> Result<Addr<A>> {
let Self {
mut ctx,
mut rx,
tx,
tx_exit,
} = self;

let rx_exit = ctx.rx_exit.clone();
let actor_id = ctx.actor_id();

// Call started
actor.started(&mut ctx).await?;

spawn({
async move {
while let Some(event) = rx.next().await {
match event {
ActorEvent::Exec(f) => f(&mut actor, &mut ctx).await,
ActorEvent::Stop(_err) => break,
ActorEvent::RemoveStream(id) => {
if ctx.streams.contains(id) {
ctx.streams.remove(id);
}
}
}
}

actor.stopped(&mut ctx).await;

ctx.abort_streams();
ctx.abort_intervals();

tx_exit.send(()).ok();
}
});

Ok(Addr {
actor_id,
tx,
rx_exit,
})
LifeCycle::new().start_actor(self).await
}
}
9 changes: 9 additions & 0 deletions src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub(crate) type ExecFn<A> =
pub(crate) enum ActorEvent<A> {
Exec(ExecFn<A>),
Stop(Option<Error>),
StopSupervisor(Option<Error>),
RemoveStream(usize),
}

Expand Down Expand Up @@ -71,6 +72,14 @@ impl<A: Actor> Addr<A> {
Ok(())
}

/// Stop the supervisor.
///
/// this is ignored by normal actors
pub fn stop_supervisor(&mut self, err: Option<Error>) -> Result<()> {
mpsc::UnboundedSender::clone(&*self.tx).start_send(ActorEvent::StopSupervisor(err))?;
Ok(())
}

/// Send a message `msg` to the actor and wait for the return value.
pub async fn call<T: Message>(&self, msg: T) -> Result<T::Result>
where
Expand Down
11 changes: 11 additions & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ impl<A> Context<A> {
}
}

/// Stop the supervisor.
///
/// this is ignored by normal actors
pub fn stop_supervisor(&self, err: Option<Error>) {
if let Some(tx) = self.tx.upgrade() {
mpsc::UnboundedSender::clone(&*tx)
.start_send(ActorEvent::StopSupervisor(err))
.ok();
}
}

pub fn abort_intervals(&mut self) {
for handle in self.intervals.drain() {
handle.abort()
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod addr;
mod broker;
mod caller;
mod context;
mod lifecycle;
mod runtime;
mod service;
mod supervisor;
Expand Down
Loading