Skip to content

Commit

Permalink
feat(rust): remove bump allocator from ockam_executor
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinevg committed Oct 1, 2021
1 parent 03db987 commit 2988b57
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 121 deletions.
15 changes: 1 addition & 14 deletions implementations/rust/ockam/ockam_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,16 @@ default = ["std"]

# Feature (enabled by default): "std" enables functionality expected to
# be available on a standard platform.
std = ["futures/std", "ockam_core/std", "async-std/default"]
std = ["futures/std", "ockam_core/std"]

# Feature: "alloc" enables support for heap allocation on "no_std"
# platforms, requires nightly.
alloc = ["futures/alloc", "ockam_core/alloc", "ockam_core/no_std"]

# Feature: "cortexm" provides compiler intrinsics and runtime support for
# ARM cortex-m processors.
cortexm = ["alloc-cortex-m", "cortex-m", "cortex-m-rt", "cortex-m-semihosting", "panic-semihosting"]

[dependencies]
crossbeam-queue = { version = "0.3.2", default_features = false, features = ["alloc"] }
futures = { version = "0.3.15", default-features = false, features = [ "async-await" ] }
heapless = { version = "0.7", features = [ "mpmc_large" ] }
ockam_core = { path = "../ockam_core", version = "0.32.0", default_features = false }
pin-project-lite = "0.2"
pin-utils = "0.1.0"

alloc-cortex-m = { version = "0.4.1", optional = true }
cortex-m = { version = "0.7.2", optional = true }
cortex-m-rt = { version = "0.6.14", optional = true }
cortex-m-semihosting = { version = "0.3.7", optional = true }
panic-semihosting = { version = "0.5.6", optional = true }

[dev-dependencies]
async-std = { version = "1.9.0", default_features = false }
53 changes: 0 additions & 53 deletions implementations/rust/ockam/ockam_executor/src/alloc_bump.rs

This file was deleted.

2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_executor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<T: core::fmt::Debug> Receiver<T> {
}
None => {
self.0.wake_receiver.register(&context.waker());
if self.0.is_sender_closed.load(Ordering::Relaxed) {
if self.0.is_sender_closed.load(Ordering::Acquire) {
Poll::Ready(None)
} else {
Poll::Pending
Expand Down
61 changes: 16 additions & 45 deletions implementations/rust/ockam/ockam_executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,18 @@ use ockam_core::compat::vec::Vec;

use pin_utils::pin_mut;

use crate::alloc_bump::Alloc;

/// Reserved memory for the bump allocator
const HEAP_SIZE: usize = 1024 * 128;

static mut ALLOCATOR: UnsafeCell<MaybeUninit<Alloc>> = UnsafeCell::new(MaybeUninit::uninit());

/// abort
#[cfg(target_arch = "arm")]
pub use cortex_m::asm::udf as abort;

/// abort
#[cfg(not(target_arch = "arm"))]
pub fn abort() -> ! {
loop {
panic!();
}
}

/// Returns current executor.
/// WARNING: this is not thread-safe
pub fn current() -> &'static Executor<'static> {
static INIT: AtomicBool = AtomicBool::new(false);
static mut EXECUTOR: UnsafeCell<MaybeUninit<Executor>> = UnsafeCell::new(MaybeUninit::uninit());
static mut MEMORY: [u8; HEAP_SIZE] = [0; HEAP_SIZE];

if INIT.load(Ordering::Relaxed) {
unsafe { &*(EXECUTOR.get() as *const Executor) }
} else {
unsafe {
let executorp = EXECUTOR.get() as *mut Executor;
executorp.write(Executor::new());
let allocatorp = ALLOCATOR.get() as *mut Alloc;
allocatorp.write(Alloc::new(&mut MEMORY));
atomic::compiler_fence(Ordering::Release);
INIT.store(true, Ordering::Relaxed);
&*executorp
Expand All @@ -57,22 +35,16 @@ pub fn current() -> &'static Executor<'static> {

/// Executor
pub struct Executor<'a> {
tasks: UnsafeCell<Vec<&'a Task>>,

tasks: UnsafeCell<Vec<Box<Task>>>,
task_queue: Arc<SegQueue<TaskId>>,
task_cache: Arc<Mutex<BTreeMap<TaskId, &'a Task>>>,

marker: core::marker::PhantomData<&'a ()>,
}

impl<'a> Executor<'a> {
pub fn new() -> Self {
Self {
tasks: UnsafeCell::new(Vec::new()),

task_queue: Arc::new(SegQueue::new()),
task_cache: Arc::new(Mutex::new(BTreeMap::new())),

marker: core::marker::PhantomData,
}
}
Expand All @@ -93,9 +65,13 @@ impl<'a> Executor<'a> {
}
}

let len = unsafe { (*self.tasks.get()).len() };
let tasks = unsafe {
let tasksp = self.tasks.get() as *mut Vec<Box<Task>>;
&mut (*tasksp)
};
let len = tasks.len();
for i in 0..len {
let task = unsafe { (*self.tasks.get()).get_unchecked(i) };
let task = unsafe { tasks.get_unchecked(i) };
if task.ready.load(Ordering::Acquire) {
task.ready.store(false, Ordering::Release);
let waker = unsafe {
Expand All @@ -109,23 +85,15 @@ impl<'a> Executor<'a> {
}
}
}

self.sleep_if_idle();
};
result
}

/// spawn
pub fn spawn(&self, future: impl Future + 'static) {
let task: &'static mut Task = Task::new(future);
self.task_queue.push(task.id);

let mut guard = self.task_cache.lock().unwrap();
if guard.insert(task.id, task).is_some() {
panic!("task with same ID already in tasks");
}

unsafe { (*self.tasks.get()).push(task) };
let task_id = Task::allocate(self, future);
self.task_queue.push(task_id);
}

fn sleep_if_idle(&self) {
Expand All @@ -151,19 +119,22 @@ where
}

impl Task {
fn new(future: impl Future + 'static) -> &'static mut Self {
fn allocate(executor: &Executor, future: impl Future + 'static) -> TaskId {
let task_id = TaskId::new();
let task = Node {
id: TaskId::new(),
id: task_id.clone(),
ready: AtomicBool::new(true),
future: UnsafeCell::new(async {
// task terminating
future.await;
}),
};

unsafe {
let allocator = ALLOCATOR.get() as *mut Alloc;
(*allocator).alloc_init(task)
(*executor.tasks.get()).push(Box::new(task));
}

task_id
}
}

Expand Down
4 changes: 1 addition & 3 deletions implementations/rust/ockam/ockam_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! The ockam_node crate re-exports types defined in this crate when the 'std'
//! feature is not enabled.
#![deny(
#![allow(
//missing_docs,
//trivial_casts,
trivial_numeric_casts,
Expand All @@ -28,8 +28,6 @@ extern crate core;
#[macro_use]
extern crate alloc;

mod alloc_bump;

pub mod channel;
pub mod executor;
pub mod oneshot;
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_executor/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<T> Future for Receiver<T> {
}
None => {
self.0.wake_receiver.register(&context.waker());
if self.0.is_sender_closed.load(Ordering::Relaxed) {
if self.0.is_sender_closed.load(Ordering::Acquire) {
panic!("called after complete");
} else {
Poll::Pending
Expand Down
5 changes: 3 additions & 2 deletions implementations/rust/ockam/ockam_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std = ["ockam_core/std", "tokio/full", "tracing-subscriber"]

# Feature: "no_std" enables functionality required for platforms
# without the standard library.
no_std = ["ockam_core/no_std", "ockam_executor/cortexm", "ockam_node_no_std/no_std", "heapless"]
no_std = ["ockam_core/no_std", "ockam_node_no_std/no_std", "heapless", "cortex-m-semihosting"]

# Feature: "alloc" enables support for heap allocation on "no_std"
# platforms, requires nightly.
Expand All @@ -36,7 +36,8 @@ tracing-subscriber = { version = "0.2", features = ["fmt", "env-filter"], option
heapless = { version = "0.7", features = [ "mpmc_large" ], optional = true }
ockam_node_no_std = { path = "../ockam_node_no_std", version = "0.7.0" , default-features = false, optional = true }
ockam_executor = { path = "../ockam_executor", version = "0.1.0", default-features = false, optional = true }
cortex-m-semihosting = { version = "0.3.7" }
# TODO replace cortex-m-semihosting with 'log'
cortex-m-semihosting = { version = "0.3.7", optional = true }

[dev-dependencies]
async-trait = { version = "0.1" }
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::relay::{run_mailbox, RelayMessage, ShutdownHandle, ShutdownListener};
use crate::tokio::runtime::Runtime;
use crate::tokio::sync::mpsc;
#[cfg(feature = "std")]
use crate::{error::Error, Context};
#[cfg(not(feature = "std"))]
use crate::Context;
use ockam_core::{Processor, Result};

pub struct ProcessorRelay<P>
Expand All @@ -26,7 +29,7 @@ where
}

async fn run(self) {
let (rx_shutdown, tx_ack) = self.shutdown_listener.consume();
let (_rx_shutdown, tx_ack) = self.shutdown_listener.consume();
let mut ctx = self.ctx;
let mut processor = self.processor;

Expand Down Expand Up @@ -64,7 +67,7 @@ where
let stop_reason;
#[cfg(feature = "std")]
tokio::select! {
res = rx_shutdown => {
res = _rx_shutdown => {
match res {
Ok(_) => stop_reason = StopReason::Shutdown,
Err(_) => stop_reason = StopReason::RxError(Error::ShutdownRxError.into()),
Expand Down

0 comments on commit 2988b57

Please sign in to comment.