Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand ActorGroup API #621

Merged
merged 9 commits into from
Mar 31, 2024
Merged
4 changes: 2 additions & 2 deletions inbox/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,12 @@ impl<T> Sender<T> {

/// Returns `true` if senders send into the same channel.
pub fn same_channel(&self, other: &Sender<T>) -> bool {
self.channel == other.channel
ptr::addr_eq(self.channel.as_ptr(), other.channel.as_ptr())
}

/// Returns `true` if this sender sends to the `receiver`.
pub fn sends_to(&self, receiver: &Receiver<T>) -> bool {
self.channel == receiver.channel
ptr::addr_eq(self.channel.as_ptr(), receiver.channel.as_ptr())
}

/// Returns the id of this sender.
Expand Down
16 changes: 14 additions & 2 deletions remote/src/net_relay/routers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::future::Future;
use std::future::{ready, Ready};
use std::net::SocketAddr;

use heph::actor_ref::{ActorGroup, ActorRef, Delivery, SendError, SendValue};
use heph::actor_ref::{ActorGroup, ActorRef, SendError, SendValue};

use crate::net_relay::Route;

Expand Down Expand Up @@ -72,6 +72,15 @@ pub struct RelayGroup<M> {
delivery: Delivery,
}

/// The kind of delivery to use.
#[derive(Copy, Clone, Debug)]
pub enum Delivery {
/// Delivery a copy of the message to all actors in the group.
ToAll,
/// Delivery the message to one of the actors.
ToOne,
}

impl<M> RelayGroup<M> {
/// Relay all remote messages to the `actor_group`.
pub const fn to(actor_group: ActorGroup<M>, delivery: Delivery) -> RelayGroup<M> {
Expand Down Expand Up @@ -100,7 +109,10 @@ where
where Self: 'a;

fn route<'a>(&'a mut self, msg: M, _: SocketAddr) -> Self::Route<'a> {
_ = self.actor_group.try_send(msg, self.delivery);
_ = match self.delivery {
Delivery::ToAll => self.actor_group.try_send_to_all(msg),
Delivery::ToOne => self.actor_group.try_send_to_one(msg),
};
ready(Ok(()))
}
}
Expand Down
4 changes: 2 additions & 2 deletions rt/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::{Duration, Instant};
use std::{fmt, io, process};

use a10::signals::{ReceiveSignals, Signals};
use heph::actor_ref::{ActorGroup, Delivery};
use heph::actor_ref::ActorGroup;
use log::{debug, error, info, trace};

use crate::setup::{host_id, host_info, Uuid};
Expand Down Expand Up @@ -256,7 +256,7 @@ impl Coordinator {

trace!(signal:? = signal; "relaying process signal to actors");
self.signal_refs.remove_disconnected();
_ = self.signal_refs.try_send(signal, Delivery::ToAll);
_ = self.signal_refs.try_send_to_all(signal);
}
Poll::Ready(Some(Err(err))) => {
return Err(rt::Error::coordinator(Error::SignalHandling(err)))
Expand Down
4 changes: 2 additions & 2 deletions rt/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::panic::{self, AssertUnwindSafe};
use std::rc::Rc;
use std::sync::Arc;

use heph::actor_ref::{ActorGroup, Delivery, SendError};
use heph::actor_ref::{ActorGroup, SendError};
use log::{info, trace};

use crate::scheduler::Scheduler;
Expand Down Expand Up @@ -86,7 +86,7 @@ impl RuntimeInternals {

let mut receivers = self.signal_receivers.borrow_mut();
receivers.remove_disconnected();
match receivers.try_send(signal, Delivery::ToAll) {
match receivers.try_send_to_all(signal) {
Err(SendError) if signal.should_stop() => {
self.set_err(worker::Error::ProcessInterrupted);
}
Expand Down
32 changes: 15 additions & 17 deletions rt/tests/functional/actor_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::pin::Pin;
use std::task::Poll;

use heph::actor::{self, actor_fn};
use heph::actor_ref::{ActorGroup, Delivery};
use heph::actor_ref::ActorGroup;
use heph_rt::test::{init_local_actor, poll_actor, poll_future};
use heph_rt::ThreadLocal;

Expand All @@ -31,8 +31,6 @@ fn is_send_sync() {
#[test]
fn empty() {
let group = ActorGroup::<()>::empty();
assert!(group.try_send((), Delivery::ToAll).is_err());
assert!(group.try_send((), Delivery::ToOne).is_err());
assert_eq!(group.len(), 0);
assert!(group.is_empty());
}
Expand Down Expand Up @@ -62,7 +60,7 @@ fn new() {
assert_eq!(group.len(), 3);
assert!(!group.is_empty());

assert!(group.try_send(123usize, Delivery::ToAll).is_ok());
assert!(group.try_send_to_all(123usize).is_ok());
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -77,7 +75,7 @@ fn from_actor_ref() {
let group = ActorGroup::from(actor_ref);
assert_eq!(group.len(), 1);

assert!(group.try_send((), Delivery::ToAll).is_ok());
assert!(group.try_send_to_all(()).is_ok());
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}

Expand All @@ -95,7 +93,7 @@ fn from_iter() {
.collect();
assert_eq!(group.len(), 3);

assert!(group.try_send((), Delivery::ToAll).is_ok());
assert!(group.try_send_to_all(()).is_ok());
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -110,7 +108,7 @@ fn add_to_empty_group() {
let mut actor = Box::pin(actor);
group.add(actor_ref);

group.try_send(1_usize, Delivery::ToAll).unwrap();
group.try_send_to_all(1_usize).unwrap();
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}

Expand All @@ -133,9 +131,9 @@ fn add_actor_to_group() {
actors.push(Box::pin(actor));
group.add(actor_ref);

group.try_send(123_usize, Delivery::ToAll).unwrap();
group.try_send(456_usize, Delivery::ToAll).unwrap();
group.try_send(789_usize, Delivery::ToAll).unwrap();
group.try_send_to_all(123_usize).unwrap();
group.try_send_to_all(456_usize).unwrap();
group.try_send_to_all(789_usize).unwrap();
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -161,7 +159,7 @@ fn add_unique() {

assert_eq!(group.len(), N);

group.try_send(123usize, Delivery::ToAll).unwrap();
group.try_send_to_all(123usize).unwrap();
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -179,7 +177,7 @@ fn extend_empty_actor_group() {
actor_ref
}));

group.try_send(123usize, Delivery::ToAll).unwrap();
group.try_send_to_all(123usize).unwrap();
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -204,7 +202,7 @@ fn extend_actor_group() {
actor_ref
}));

group.try_send(123usize, Delivery::ToAll).unwrap();
group.try_send_to_all(123usize).unwrap();
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand Down Expand Up @@ -282,7 +280,7 @@ fn make_unique() {
group.make_unique();
assert_eq!(group.len(), N);

group.try_send(123usize, Delivery::ToAll).unwrap();
group.try_send_to_all(123usize).unwrap();
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -309,7 +307,7 @@ fn make_unique_one() {
group.make_unique();
assert_eq!(group.len(), 1);

group.try_send(123usize, Delivery::ToAll).unwrap();
group.try_send_to_all(123usize).unwrap();
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}

Expand All @@ -324,7 +322,7 @@ fn send_delivery_to_all() {
group.add(actor_ref);
}

assert!(group.try_send(123usize, Delivery::ToAll).is_ok());
assert!(group.try_send_to_all(123usize).is_ok());
for mut actor in actors {
assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand All @@ -344,7 +342,7 @@ fn send_delivery_to_one() {

// NOTE: sending order is not guaranteed so this test is too strict.
for mut actor in actors {
assert!(group.try_send(123usize, Delivery::ToOne).is_ok());
assert!(group.try_send_to_one(123usize).is_ok());

assert_eq!(poll_actor(Pin::as_mut(&mut actor)), Poll::Ready(Ok(())));
}
Expand Down
93 changes: 45 additions & 48 deletions src/actor_ref/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,25 +742,11 @@ impl<'r, M> fmt::Debug for Join<'r, M> {
/// of an internal vector.
pub struct ActorGroup<M> {
actor_refs: Vec<ActorRef<M>>,
/// Index of the actor reference to send the [single delivery] message to.
/// Using relaxed ordering on this field is fine because we make no
/// guarantees about to which actor the message will be delivered. E.g. we
/// could always send to the first actor in the group and still fulfill the
/// contract.
///
/// [single delivery]: Delivery::ToOne
/// Index of the actor reference to send the next single delivery message
/// to, used as a simple round-robin load balancing.
send_next: AtomicUsize,
}

/// The kind of delivery to use in [`ActorGroup::try_send`].
#[derive(Copy, Clone, Debug)]
pub enum Delivery {
/// Delivery a copy of the message to all actors in the group.
ToAll,
/// Delivery the message to one of the actors.
ToOne,
}

impl<M> ActorGroup<M> {
/// Creates an empty `ActorGroup`.
pub const fn empty() -> ActorGroup<M> {
Expand Down Expand Up @@ -840,50 +826,61 @@ impl<M> ActorGroup<M> {
}
}

/// Attempts to send a message to all the actors in the group.
///
/// This can either send the message to a single actor, by using
/// [`Delivery::ToOne`], or to all actors in the group by using
/// [`Delivery::ToAll`].
/// Attempts to send a message to one of the actors in the group.
pub fn try_send_to_one<Msg>(&self, msg: Msg) -> Result<(), SendError>
where
Msg: Into<M>,
{
if self.actor_refs.is_empty() {
return Err(SendError);
}

// SAFETY: this needs to sync with all other accesses to `send_next`.
// NOTE: this wraps around on overflow.
let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
self.actor_refs[idx].try_send(msg)
}

/// Send a message to one of the actors in the group.
pub fn send_to_one<'r, Msg>(&'r self, msg: Msg) -> SendValue<'r, M>
where
Msg: Into<M>,
{
if self.actor_refs.is_empty() {
return SendValue {
kind: SendValueKind::Mapped(MappedSendValue::SendErr),
};
}

// SAFETY: this needs to sync with all other accesses to `send_next`.
// NOTE: this wraps around on overflow.
let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
self.actor_refs[idx].send(msg)
}

/// Attempts to send a message to all of the actors in the group.
///
/// When deliverying to all actors this will first `clone` the message and
/// then [`try_send`]ing it to each actor in the group. Note that this means
/// it will `clone` before calling [`Into::into`] on the message. If the
/// call to [`Into::into`] is expansive, or `M` is cheaper to clone than
/// `Msg` it might be worthwhile to call `msg.into()` before calling this
/// method.
/// This will first `clone` the message and then attempts to send it to each
/// actor in the group. Note that this means it will `clone` before calling
/// [`Into::into`] on the message. If the call to [`Into::into`] is
/// expansive, or `M` is cheaper to clone than `Msg` it might be worthwhile
/// to call `msg.into()` before calling this method.
///
/// This only returns an error if the group is empty, otherwise this will
/// always return `Ok(())`.
///
/// See [Sending messages] for more details.
///
/// [`try_send`]: ActorRef::try_send
/// [Sending messages]: index.html#sending-messages
pub fn try_send<Msg>(&self, msg: Msg, delivery: Delivery) -> Result<(), SendError>
#[allow(clippy::needless_pass_by_value)] // Want to make the other send functions.
pub fn try_send_to_all<Msg>(&self, msg: Msg) -> Result<(), SendError>
where
Msg: Into<M> + Clone,
{
if self.actor_refs.is_empty() {
return Err(SendError);
}

match delivery {
Delivery::ToAll => {
for actor_ref in &self.actor_refs {
_ = actor_ref.try_send(msg.clone());
}
Ok(())
}
Delivery::ToOne => {
// Safety: this needs to sync itself.
// NOTE: this wraps around on overflow.
let idx = self.send_next.fetch_add(1, Ordering::AcqRel) % self.actor_refs.len();
let actor_ref = &self.actor_refs[idx];
// TODO: try to send it to another actor on send failure?
actor_ref.try_send(msg)
}
for actor_ref in &self.actor_refs {
_ = actor_ref.try_send(msg.clone());
}
Ok(())
}

/// Wait for all actors in this group to finish running.
Expand Down
24 changes: 23 additions & 1 deletion tests/functional.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
//! Functional tests.

#![feature(never_type)]
#![feature(never_type, noop_waker)]

mod util {
use std::future::Future;
use std::mem::size_of;
use std::pin::{pin, Pin};
use std::task::{self, Poll};

pub fn assert_send<T: Send>() {}

Expand All @@ -13,6 +16,25 @@ mod util {
pub fn assert_size<T>(expected: usize) {
assert_eq!(size_of::<T>(), expected);
}

pub fn block_on<Fut: Future>(fut: Fut) -> Fut::Output {
let mut fut = pin!(fut);
let mut ctx = task::Context::from_waker(task::Waker::noop());
loop {
match fut.as_mut().poll(&mut ctx) {
Poll::Ready(output) => return output,
Poll::Pending => {}
}
}
}

pub fn poll_once<Fut: Future>(fut: Pin<&mut Fut>) {
let mut ctx = task::Context::from_waker(task::Waker::noop());
match fut.poll(&mut ctx) {
Poll::Ready(_) => panic!("unexpected output"),
Poll::Pending => {}
}
}
}

#[path = "functional"] // rustfmt can't find the files.
Expand Down
Loading
Loading