Skip to content

Commit

Permalink
Temp
Browse files Browse the repository at this point in the history
Signed-off-by: Klimenty Tsoutsman <klim@tsoutsman.com>
  • Loading branch information
tsoutsman committed Dec 19, 2023
1 parent a430baf commit facd0e5
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 117 deletions.
79 changes: 60 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion kernel/async_channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "A bounded, multi-producer, multi-consumer asynchronous channel"
edition = "2021"

[dependencies]
async_wait_queue = { path = "../async_wait_queue" }
async-lock = { version = "3.2.0", default-features = false }
dreadnought = { path = "../dreadnought" }
futures = { version = "0.3.28", default-features = false }
mpmc = "0.1.6"
Expand Down
156 changes: 75 additions & 81 deletions kernel/async_channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,91 @@
//! A bounded, multi-producer, multi-consumer asynchronous channel.
//! A bounded, multi-producer, single-consumer asynchronous channel.
//!
//! See [`Channel`] for more details.
#![no_std]

extern crate alloc;

use alloc::sync::Arc;
use core::{
pin::Pin,
task::{Context, Poll},
};

use async_wait_queue::WaitQueue;
use futures::stream::{FusedStream, Stream};
use async_semaphore::Semaphore;
use futures::{
stream::{FusedStream, Stream},
task::AtomicWaker,
};
use mpmc::Queue;
use sync::DeadlockPrevention;
use sync_spin::Spin;

/// A bounded, multi-producer, multi-consumer asynchronous channel.
#[derive(Clone)]
pub struct Sender<T, P = Spin>
where
T: Send,
P: DeadlockPrevention,
{
inner: Arc<Channel<T, P>>,
}

pub struct Receiver<T, P = Spin>
where
T: Send,
P: DeadlockPrevention,
{
inner: Arc<Channel<T, P>>,
}

impl<T, P> !Sync for Receiver<T, P> {}

pub fn channel<T, P>(capacity: usize) -> (Sender<T, P>, Receiver<T, P>)
where
T: Send,
P: DeadlockPrevention,
{
let inner = Arc::new(Channel::new(capacity));
(
Sender {
inner: inner.clone(),
},
Receiver { inner },
)
}

/// A bounded, multi-producer, single-consumer asynchronous channel.
///
/// The channel can also be used outside of an asynchronous runtime with the
/// [`blocking_send`], and [`blocking_recv`] methods.
///
/// [`blocking_send`]: Self::blocking_send
/// [`blocking_recv`]: Self::blocking_recv
#[derive(Clone)]
pub struct Channel<T, P = Spin>
struct Channel<T, P = Spin>
where
T: Send,
P: DeadlockPrevention,
{
inner: Queue<T>,
senders: WaitQueue<P>,
receivers: WaitQueue<P>,
senders: Semaphore<P>,
receiver: AtomicWaker,
}

impl<T, P> Channel<T, P>
impl<T, P> Sender<T, P>
where
T: Send,
P: DeadlockPrevention,
{
/// Creates a new channel.
///
/// The provided capacity dictates how many messages can be stored in the
/// queue before the sender blocks.
///
/// # Examples
///
/// ```
/// use async_channel::Channel;
///
/// let channel = Channel::new(2);
///
/// assert!(channel.try_send(1).is_ok());
/// assert!(channel.try_send(2).is_ok());
/// // The channel is full.
/// assert!(channel.try_send(3).is_err());
///
/// assert_eq!(channel.try_recv(), Some(1));
/// assert_eq!(channel.try_recv(), Some(2));
/// assert!(channel.try_recv().is_none());
/// ```
pub fn new(capacity: usize) -> Self {
Self {
inner: Queue::with_capacity(capacity),
senders: WaitQueue::new(),
receivers: WaitQueue::new(),
}
}

/// Sends `value`.
///
/// # Cancel safety
///
/// This method is cancel safe, in that if it is dropped prior to
/// completion, `value` is guaranteed to have not been set. However, in that
/// case `value` will be dropped.
pub async fn send(&self, value: T) {
let mut temp = Some(value);

self.senders
.wait_until(|| match self.inner.push(temp.take().unwrap()) {
Ok(()) => {
self.receivers.notify_one();
Some(())
}
Err(value) => {
temp = Some(value);
None
}
})
.await
pub async fn send(&self, mut value: T) {
todo!();
}

/// Tries to send `value`.
Expand All @@ -98,61 +95,58 @@ where
/// Returns an error containing `value` if the channel was full.
pub fn try_send(&self, value: T) -> Result<(), T> {
self.inner.push(value)?;
self.receivers.notify_one();
self.inner.receiver.wake();
Ok(())
}

/// Blocks the current thread until `value` is sent.
pub fn blocking_send(&self, value: T) {
dreadnought::block_on(self.send(value))
}
}

/// Receives the next value.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub async fn recv(&self) -> T {
let value = self.receivers.wait_until(|| self.inner.pop()).await;
self.senders.notify_one();
value
impl<T, P> Receiver<T, P>
where
T: Send,
P: DeadlockPrevention,
{
async fn recv(&self) -> T {
todo!();
}

/// Tries to receive the next value.
pub fn try_recv(&self) -> Option<T> {
let value = self.inner.pop()?;
self.senders.notify_one();
Some(value)
fn try_recv(&self) -> Option<T> {
self.inner.inner.pop()
}

/// Blocks the current thread until a value is received.
pub fn blocking_recv(&self) -> T {
fn blocking_recv(&self) -> T {
dreadnought::block_on(self.recv())
}
}

impl<T, P> Stream for Channel<T, P>
impl<T, P> Stream for Receiver<T, P>
where
T: Send,
P: DeadlockPrevention,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self
.receivers
.poll_wait_until(ctx, &mut || self.inner.pop())
{
Poll::Ready(value) => {
self.senders.notify_one();
Poll::Ready(Some(value))
}
Poll::Pending => Poll::Pending,
macro_rules! try_recv {
() => {
if let Some(value) = self.try_recv() {
return Poll::Ready(Some(value));
}
};
}

try_recv!();
self.inner.receiver.register(ctx.waker());
// TODO: Use push_if_fail instead?
try_recv!();
Poll::Pending
}
}

impl<T, P> FusedStream for Channel<T, P>
impl<T, P> FusedStream for Receiver<T, P>
where
T: Send,
P: DeadlockPrevention,
Expand Down
Loading

0 comments on commit facd0e5

Please sign in to comment.