Skip to content

Commit

Permalink
refactor(rust): Rename Pipe to Connector (pola-rs#17655)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Jul 16, 2024
1 parent 6203f50 commit 08b6f1d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ use atomic_waker::AtomicWaker;
use pin_project_lite::pin_project;

/// Single-producer, single-consumer capacity-one channel.
pub fn pipe<T>() -> (Sender<T>, Receiver<T>) {
let pipe = Arc::new(Pipe::default());
(Sender { pipe: pipe.clone() }, Receiver { pipe })
pub fn connector<T>() -> (Sender<T>, Receiver<T>) {
let connector = Arc::new(Connector::default());
(
Sender {
connector: connector.clone(),
},
Receiver { connector },
)
}

/*
Expand All @@ -28,14 +33,14 @@ const CLOSED_BIT: u8 = 0b10;
const WAITING_BIT: u8 = 0b100;

#[repr(align(64))]
struct Pipe<T> {
struct Connector<T> {
send_waker: AtomicWaker,
recv_waker: AtomicWaker,
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicU8,
}

impl<T> Default for Pipe<T> {
impl<T> Default for Connector<T> {
fn default() -> Self {
Self {
send_waker: AtomicWaker::new(),
Expand All @@ -46,7 +51,7 @@ impl<T> Default for Pipe<T> {
}
}

impl<T> Drop for Pipe<T> {
impl<T> Drop for Connector<T> {
fn drop(&mut self) {
if self.state.load(Ordering::Acquire) & FULL_BIT == FULL_BIT {
unsafe {
Expand All @@ -68,7 +73,7 @@ pub enum RecvError {

// SAFETY: all the send methods may only be called from a single sender at a
// time, and similarly for all the recv methods from a single receiver.
impl<T> Pipe<T> {
impl<T> Connector<T> {
unsafe fn poll_send(&self, value: &mut Option<T>, waker: &Waker) -> Poll<Result<(), T>> {
if let Some(v) = value.take() {
let mut state = self.state.load(Ordering::Relaxed);
Expand Down Expand Up @@ -173,7 +178,7 @@ impl<T> Pipe<T> {

/// # Safety
/// After calling close as a sender/receiver, you may not access
/// this pipe anymore as that end.
/// this connector anymore as that end.
unsafe fn close(&self) {
self.state.fetch_or(CLOSED_BIT, Ordering::Relaxed);
self.send_waker.wake();
Expand All @@ -182,32 +187,32 @@ impl<T> Pipe<T> {
}

pub struct Sender<T> {
pipe: Arc<Pipe<T>>,
connector: Arc<Connector<T>>,
}

unsafe impl<T: Send> Send for Sender<T> {}

impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe { self.pipe.close() }
unsafe { self.connector.close() }
}
}

pub struct Receiver<T> {
pipe: Arc<Pipe<T>>,
connector: Arc<Connector<T>>,
}

unsafe impl<T: Send> Send for Receiver<T> {}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe { self.pipe.close() }
unsafe { self.connector.close() }
}
}

pin_project! {
pub struct SendFuture<'a, T> {
pipe: &'a Pipe<T>,
connector: &'a Connector<T>,
value: Option<T>,
}
}
Expand All @@ -216,18 +221,18 @@ unsafe impl<'a, T: Send> Send for SendFuture<'a, T> {}

impl<T: Send> Sender<T> {
/// Returns a future that when awaited will send the value to the [`Receiver`].
/// Returns Err(value) if the pipe is closed.
/// Returns Err(value) if the connector is closed.
#[must_use]
pub fn send(&mut self, value: T) -> SendFuture<'_, T> {
SendFuture {
pipe: &self.pipe,
connector: &self.connector,
value: Some(value),
}
}

#[allow(unused)]
pub fn try_send(&mut self, value: T) -> Result<(), SendError<T>> {
unsafe { self.pipe.try_send(value) }
unsafe { self.connector.try_send(value) }
}
}

Expand All @@ -237,15 +242,15 @@ impl<T> std::future::Future for SendFuture<'_, T> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(
self.value.is_some(),
"re-poll after Poll::Ready in pipe SendFuture"
"re-poll after Poll::Ready in connector SendFuture"
);
unsafe { self.pipe.poll_send(self.project().value, cx.waker()) }
unsafe { self.connector.poll_send(self.project().value, cx.waker()) }
}
}

pin_project! {
pub struct RecvFuture<'a, T> {
pipe: &'a Pipe<T>,
connector: &'a Connector<T>,
done: bool,
}
}
Expand All @@ -259,22 +264,25 @@ impl<T: Send> Receiver<T> {
#[must_use]
pub fn recv(&mut self) -> RecvFuture<'_, T> {
RecvFuture {
pipe: &self.pipe,
connector: &self.connector,
done: false,
}
}

#[allow(unused)]
pub fn try_recv(&mut self) -> Result<T, RecvError> {
unsafe { self.pipe.try_recv() }
unsafe { self.connector.try_recv() }
}
}

impl<T> std::future::Future for RecvFuture<'_, T> {
type Output = Result<T, ()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done, "re-poll after Poll::Ready in pipe SendFuture");
unsafe { self.pipe.poll_recv(cx.waker()) }
assert!(
!self.done,
"re-poll after Poll::Ready in connector SendFuture"
);
unsafe { self.connector.poll_recv(cx.waker()) }
}
}
2 changes: 1 addition & 1 deletion crates/polars-stream/src/async_primitives/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod connector;
pub mod distributor_channel;
pub mod pipe;
pub mod task_parker;
pub mod wait_group;
4 changes: 2 additions & 2 deletions crates/polars-stream/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_utils::aliases::PlHashSet;
use slotmap::{SecondaryMap, SparseSecondaryMap};

use crate::async_executor;
use crate::async_primitives::pipe::{pipe, Receiver, Sender};
use crate::async_primitives::connector::{connector, Receiver, Sender};
use crate::graph::{Graph, GraphNodeKey, LogicalPipeKey, PortState};
use crate::morsel::Morsel;

Expand Down Expand Up @@ -108,7 +108,7 @@ fn run_subgraph(
// The first step is to create N physical pipes for every logical pipe in the graph.
for pipe_key in pipes.iter().copied() {
let (senders, receivers): (Vec<Sender<Morsel>>, Vec<Receiver<Morsel>>) =
(0..num_pipelines).map(|_| pipe()).unzip();
(0..num_pipelines).map(|_| connector()).unzip();

physical_senders.insert(pipe_key, senders);
physical_receivers.insert(pipe_key, receivers);
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod compute_node_prelude {

pub use super::ComputeNode;
pub use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
pub use crate::async_primitives::pipe::{Receiver, Sender};
pub use crate::async_primitives::connector::{Receiver, Sender};
pub use crate::graph::PortState;
pub use crate::morsel::{Morsel, MorselSeq};
}
Expand Down

0 comments on commit 08b6f1d

Please sign in to comment.