Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic working multi-driver demo using async bbq #3

Merged
merged 6 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 187 additions & 12 deletions source/kernel/src/bbq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,38 @@
//! can expect a "single executor" async operation. At some point, this
//! may inform later design around user-to-kernel bbqueue communication.

use core::mem::MaybeUninit;
use core::{mem::MaybeUninit, ops::{Deref, DerefMut}};

use mnemos_alloc::{containers::HeapArc, heap::AHeap};
use abi::bbqueue_ipc::{BBBuffer, Producer, Consumer};
use tracing::{error, info};
use tracing::{error, info, trace};
use maitake::wait::WaitCell;

struct BBQWaitCells {
commit_waitcell: WaitCell,
release_waitcell: WaitCell,
}

struct BBQStorage {
_ring_a: BBBuffer,
_ring_b: BBBuffer,
a_wait: BBQWaitCells,
b_wait: BBQWaitCells,
}

#[derive(Clone, Copy)]
enum Side {
ASide,
BSide,
}

pub struct BBQBidiHandle {
producer: Producer<'static>,
consumer: Consumer<'static>,
side: Side,

// SAFETY: `producer` and `consumer` are only valid for the lifetime of `_storage`
_storage: HeapArc<BBQStorage>,
// SAFETY: all above items are ONLY valid for the lifetime of `storage`
storage: HeapArc<BBQStorage>,
}

pub async fn new_bidi_channel(
Expand All @@ -44,7 +59,12 @@ pub async fn new_bidi_channel(
ring_b.initialize(sto_b_ptr.as_ptr().cast(), capacity_b_tx);
}

let storage = alloc.allocate_arc(BBQStorage { _ring_a: ring_a, _ring_b: ring_b }).await;
let storage = alloc.allocate_arc(BBQStorage {
_ring_a: ring_a,
_ring_b: ring_b,
a_wait: BBQWaitCells { commit_waitcell: WaitCell::new(), release_waitcell: WaitCell::new() },
b_wait: BBQWaitCells { commit_waitcell: WaitCell::new(), release_waitcell: WaitCell::new() },
}).await;

let a_bbbuffer = &storage._ring_a as *const BBBuffer as *mut BBBuffer;
let b_bbbuffer = &storage._ring_b as *const BBBuffer as *mut BBBuffer;
Expand All @@ -57,7 +77,8 @@ pub async fn new_bidi_channel(
BBQBidiHandle {
producer: a_prod,
consumer: b_cons,
_storage: storage.clone(),
side: Side::ASide,
storage: storage.clone(),
}
};

Expand All @@ -69,7 +90,8 @@ pub async fn new_bidi_channel(
BBQBidiHandle {
producer: b_prod,
consumer: a_cons,
_storage: storage.clone(),
side: Side::BSide,
storage: storage.clone(),
}
};

Expand All @@ -84,14 +106,167 @@ impl Drop for BBQStorage {
}
}

impl BBQBidiHandle {
use abi::bbqueue_ipc::{
GrantR as InnerGrantR,
GrantW as InnerGrantW,
};

pub struct GrantW {
grant: InnerGrantW<'static>,
storage: HeapArc<BBQStorage>,
side: Side,
}

impl Deref for GrantW {
type Target = [u8];

#[inline(always)]
pub fn producer(&self) -> &Producer<'static> {
&self.producer
fn deref(&self) -> &Self::Target {
self.grant.deref()
}
}

impl DerefMut for GrantW {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
self.grant.deref_mut()
}
}

impl GrantW {
pub fn commit(self, used: usize) {
self.grant.commit(used);
// If we freed up any space, notify the waker on the reader side
if used != 0 {
match self.side {
Side::ASide => &self.storage.a_wait,
Side::BSide => &self.storage.b_wait,
}.commit_waitcell.notify();
}
}
}

pub struct GrantR {
grant: InnerGrantR<'static>,
storage: HeapArc<BBQStorage>,
side: Side,
}

impl Deref for GrantR {
type Target = [u8];

#[inline(always)]
pub fn consumer(&self) -> &Consumer<'static> {
&self.consumer
fn deref(&self) -> &Self::Target {
self.grant.deref()
}
}

impl DerefMut for GrantR {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
self.grant.deref_mut()
}
}

impl GrantR {
pub fn release(self, used: usize) {
self.grant.release(used);
// If we freed up any space, notify the waker on the reader side
if used != 0 {
match self.side {
Side::ASide => &self.storage.a_wait,
Side::BSide => &self.storage.b_wait,
}.release_waitcell.notify();
}
}
}

impl BBQBidiHandle {
// async fn send_grant(buf_len: usize) -> GrantW
// async fn read_grant() -> GrantR
pub async fn send_grant_max(&self, max: usize) -> GrantW {
loop {
match self.producer.grant_max_remaining(max) {
Ok(wgr) => {
trace!(
size = wgr.len(),
max = max,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could just be

Suggested change
max = max,
max,

"Got bbqueue max write grant",
);
return GrantW {
grant: wgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!("awaiting bbqueue max write grant");
// Uh oh! Couldn't get a send grant. We need to wait for the OTHER reader
// to release some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait,
Side::BSide => &self.storage.a_wait,
}.release_waitcell.wait().await.unwrap();

trace!("awoke for bbqueue max write grant");
Comment on lines +194 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, take it or leave it: maybe worth sticking

Suggested change
"Got bbqueue max write grant",
);
return GrantW {
grant: wgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!("awaiting bbqueue max write grant");
// Uh oh! Couldn't get a send grant. We need to wait for the OTHER reader
// to release some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait,
Side::BSide => &self.storage.a_wait,
}.release_waitcell.wait().await.unwrap();
trace!("awoke for bbqueue max write grant");
side = ?self.side,
"Got bbqueue max write grant",
);
return GrantW {
grant: wgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!(side = ?self.side, max, "awaiting bbqueue max write grant");
// Uh oh! Couldn't get a send grant. We need to wait for the OTHER reader
// to release some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait,
Side::BSide => &self.storage.a_wait,
}.release_waitcell.wait().await.unwrap();
trace!(side = ?self.side, max, "awoke for bbqueue max write grant");

},
}
}
}

pub async fn send_grant_exact(&self, size: usize) -> GrantW {
loop {
match self.producer.grant_exact(size) {
Ok(wgr) => {
trace!(
size = size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: this could just be

Suggested change
size = size,
size,

"Got bbqueue exact write grant",
);
return GrantW {
grant: wgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!("awaiting bbqueue exact write grant");
// Uh oh! Couldn't get a send grant. We need to wait for the OTHER reader
// to release some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait,
Side::BSide => &self.storage.a_wait,
}.release_waitcell.wait().await.unwrap();
trace!("awoke for bbqueue exact write grant");
},
Comment on lines +221 to +240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trace!(
size = size,
"Got bbqueue exact write grant",
);
return GrantW {
grant: wgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!("awaiting bbqueue exact write grant");
// Uh oh! Couldn't get a send grant. We need to wait for the OTHER reader
// to release some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait,
Side::BSide => &self.storage.a_wait,
}.release_waitcell.wait().await.unwrap();
trace!("awoke for bbqueue exact write grant");
},
trace!(
size = size,
side = ?self.side,
"Got bbqueue exact write grant",
);
return GrantW {
grant: wgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!(side = ?self.side, size, "awaiting bbqueue exact write grant");
// Uh oh! Couldn't get a send grant. We need to wait for the OTHER reader
// to release some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait,
Side::BSide => &self.storage.a_wait,
}.release_waitcell.wait().await.unwrap();
trace!(side = ?self.side, size, "awoke for bbqueue exact write grant");
},

}
}
}

pub async fn read_grant(&self) -> GrantR {
loop {
match self.consumer.read() {
Ok(rgr) => {
trace!(
size = rgr.len(),
"Got bbqueue read grant",
);
return GrantR {
grant: rgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!("awaiting bbqueue read grant");
// Uh oh! Couldn't get a read grant. We need to wait for the OTHER writer
// to commit some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait.commit_waitcell,
Side::BSide => &self.storage.a_wait.commit_waitcell,
}.wait().await.unwrap();
trace!("awoke for bbqueue read grant");
Comment on lines +250 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
size = rgr.len(),
"Got bbqueue read grant",
);
return GrantR {
grant: rgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!("awaiting bbqueue read grant");
// Uh oh! Couldn't get a read grant. We need to wait for the OTHER writer
// to commit some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait.commit_waitcell,
Side::BSide => &self.storage.a_wait.commit_waitcell,
}.wait().await.unwrap();
trace!("awoke for bbqueue read grant");
size = rgr.len(),
side = ?self.side,
"Got bbqueue read grant",
);
return GrantR {
grant: rgr,
side: self.side,
storage: self.storage.clone(),
}
},
Err(_) => {
trace!(side = ?self.side, "awaiting bbqueue read grant");
// Uh oh! Couldn't get a read grant. We need to wait for the OTHER writer
// to commit some bytes first.
match self.side {
Side::ASide => &self.storage.b_wait.commit_waitcell,
Side::BSide => &self.storage.a_wait.commit_waitcell,
}.wait().await.unwrap();
trace!(side = ?self.side, "awoke for bbqueue read grant");

},
}
}
}
}
99 changes: 80 additions & 19 deletions source/melpomene/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use std::{
future::Future,
ops::Deref,
sync::atomic::{AtomicBool, Ordering},
task::Poll,
thread::{sleep, spawn, yield_now},
time::{Duration, Instant},
};

use abi::{bbqueue_ipc::BBBuffer, syscall::DriverKind};
use abi::bbqueue_ipc::BBBuffer;
use melpomene::sim_tracing::setup_tracing;
use mnemos_kernel::{DriverHandle, KChannel, Kernel, KernelSettings};
use mnemos_kernel::{
bbq::{new_bidi_channel, BBQBidiHandle},
KChannel, Kernel, KernelSettings,
};

use tracing::Instrument;

const HEAP_SIZE: usize = 192 * 1024;
static KERNEL_LOCK: AtomicBool = AtomicBool::new(true);
Expand Down Expand Up @@ -69,27 +75,82 @@ fn kernel_entry() {
{
let mut guard = k.heap().lock().unwrap();

// First let's make a dummy driver just to make sure some stuff happens
let dummy_chan = KChannel::new(&mut guard, 16);

k.register_driver(DriverHandle {
kind: DriverKind::Todo,
queue: dummy_chan.clone(),
})
.map_err(drop)
.unwrap();
let send_channel: KChannel<BBQBidiHandle>;

let dummy_fut = async move {
let _ = dummy_chan;
{
// First let's make a dummy driver just to make sure some stuff happens
let dummy_chan = KChannel::new(&mut guard, 16);
send_channel = dummy_chan.clone();

loop {
let dummy_fut = async move {
Sleepy::new(Duration::from_secs(1)).await;
tracing::warn!("Dummy tick...");
let (a_ring, b_ring) = new_bidi_channel(k.heap(), 32, 16).await;
dummy_chan
.enqueue_async(b_ring)
.await
.map_err(drop)
.unwrap();

let mut i = 0u32;
loop {
tracing::info!("Driver A: Writing...");
let mut wgr = a_ring.send_grant_exact(8).await;
wgr.iter_mut().for_each(|b| *b = (i % 255) as u8);
i = i.wrapping_add(1);
let len = wgr.len();
wgr.commit(len);

tracing::info!("Driver A: Sleeping...");
Sleepy::new(Duration::from_secs(1)).await;

tracing::warn!("Driver A: Reading...");
let rgr = a_ring.read_grant().await;
tracing::warn!(
buf = ?rgr.deref(),
"Driver A: Got data"
);
let len = rgr.len();
rgr.release(len);
}
}
};
let dummy_task = k.new_task(dummy_fut);
let boxed_dummy = guard.alloc_box(dummy_task).map_err(drop).unwrap();
k.spawn_allocated(boxed_dummy);
.instrument(tracing::info_span!("Driver A"));

let dummy_task = k.new_task(dummy_fut);
let boxed_dummy = guard.alloc_box(dummy_task).map_err(drop).unwrap();
k.spawn_allocated(boxed_dummy);
}

{
let dummy_fut = async move {
let b_ring = send_channel.dequeue_async().await.unwrap();

let mut i = u32::MAX;
loop {
tracing::info!("Driver B: Writing...");
let mut wgr = b_ring.send_grant_exact(4).await;
wgr.iter_mut().for_each(|b| *b = (i % 255) as u8);
i = i.wrapping_sub(1);
let len = wgr.len();
wgr.commit(len);

tracing::info!("Driver B: Sleeping...");
Sleepy::new(Duration::from_millis(500)).await;

tracing::warn!("Driver B: Reading...");
let rgr = b_ring.read_grant().await;
tracing::warn!(
buf = ?rgr.deref(),
"Driver B: Got data"
);
let len = rgr.len();
rgr.release(len);
}
}
.instrument(tracing::info_span!("Driver B"));
let dummy_task = k.new_task(dummy_fut);
let boxed_dummy = guard.alloc_box(dummy_task).map_err(drop).unwrap();
k.spawn_allocated(boxed_dummy);
}
}

//////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions source/melpomene/src/sim_drivers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions source/mstd/src/serial/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use abi::syscall::{UserRequestBody, serial::{SerialRequest, SerialError, SerialResponse}, KernelResponseBody};
use crate::executor::mailbox::MAILBOX;

#[allow(dead_code)]
pub struct SerialPort {
port: u16,
}
Expand Down