Skip to content

Commit

Permalink
add stack reduce/restore
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Dec 13, 2023
1 parent d59c234 commit c1bbc12
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 23 deletions.
2 changes: 1 addition & 1 deletion mco-gen/src/gen_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::stack::{Func, Stack, StackBox};

/// The default stack size for generators, in bytes.
// windows has a minimal size as 0x4a8!!!!
pub const DEFAULT_STACK_SIZE: usize = 0x1000;
pub const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024;

/// the generator obj type, the functor passed to it must be Send
pub struct GeneratorObj<'a, A, T, const LOCAL: bool> {
Expand Down
7 changes: 3 additions & 4 deletions mco-gen/src/stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl Stack {
}

// dealloc the stack
pub(crate) fn drop_stack(&self) {
pub fn drop_stack(&self) {
if self.buf.len() == 0 {
return;
}
Expand Down Expand Up @@ -431,7 +431,6 @@ impl Stack {
}
}


pub fn stack_restore(&self, max: usize) -> Vec<u8> {
if self.size() < max {
let mut data = self.get_stack_data();
Expand Down Expand Up @@ -493,13 +492,13 @@ mod test {
#[test]
fn test_reduce() {
let s = Stack::new(4096);
println!("len={}",s.size());
println!("len={}", s.size());
let raw = s.get_stack_data();
let reduce = s.stack_reduce(4096);
drop(s);
let mut new = Stack::new(reduce.len());
new.write_stack_data(reduce);
let restore_data= new.stack_restore(4096);
let restore_data = new.stack_restore(4096);
drop(new);
let mut new_4096 = Stack::new(restore_data.len());
new_4096.write_stack_data(restore_data);
Expand Down
50 changes: 33 additions & 17 deletions src/coroutine_impl.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
Expand All @@ -14,7 +15,8 @@ use crate::local::CoroutineLocal;
use crate::park::Park;
use crate::scheduler::get_scheduler;
use crossbeam::atomic::AtomicCell;
use mco_gen::{Generator, Gn};
use once_cell::sync::Lazy;
use mco_gen::{DEFAULT_STACK_SIZE, Generator, Gn, Stack};

/// /////////////////////////////////////////////////////////////////////////////
/// Coroutine framework types
Expand Down Expand Up @@ -93,15 +95,28 @@ impl EventSource for Done {
pub struct CoroutineImpl {
pub worker_thread_id: Option<ThreadId>,
pub inner: Generator<'static, EventResult, EventSubscriber>,
pub reduce: Option<Vec<u8>>,
}

impl CoroutineImpl {
pub fn stack_reduce(&mut self) {

if self.reduce.is_none(){
let reduce_data = unsafe { &*self.gen.stack.get() }.stack_reduce(DEFAULT_STACK_SIZE);
if reduce_data.len() != 0 {
self.reduce = Some(reduce_data);
//alloc a small stack
// unsafe { &*self.gen.stack.get() }.drop_stack();
self.gen.stack = UnsafeCell::new(Stack::new(0));
}
}
}

pub fn stack_restore(&mut self) {

if let Some(v) = self.reduce.take() {
let mut s = Stack::new(DEFAULT_STACK_SIZE);
s.write_stack_data(v);
self.gen.stack = UnsafeCell::new(s);
}
}
}

Expand Down Expand Up @@ -276,9 +291,9 @@ impl Builder {
/// The join handle can be used to block on
/// termination of the child coroutine, including recovering its panics.
fn spawn_impl<F, T>(self, f: F) -> (CoroutineImpl, JoinHandle<T>)
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
static DONE: Done = Done {};

Expand Down Expand Up @@ -311,6 +326,7 @@ impl Builder {
let mut co = CoroutineImpl {
worker_thread_id: None,
inner: Gn::new_opt(stack_size, closure),
reduce: None,
};

let handle = Coroutine::new(self.name, stack_size);
Expand Down Expand Up @@ -362,9 +378,9 @@ impl Builder {
/// [`go!`]: ../macro.go.html
/// [`spawn`]: ./fn.spawn.html
pub fn spawn<F, T>(self, f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (co, handle) = self.spawn_impl(f);
let s = get_scheduler();
Expand All @@ -381,9 +397,9 @@ impl Builder {
/// Normally this is safe but for some cases you should
/// take care of the side effect
pub fn spawn_local<F, T>(self, f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
// we will still get optimizations in spawn_impl
let (co, handle) = self.spawn_impl(f);
Expand Down Expand Up @@ -444,9 +460,9 @@ impl Builder {
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
/// [`Builder`]: struct.Builder.html
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f)
}
Expand Down Expand Up @@ -534,8 +550,8 @@ pub(crate) fn run_coroutine(mut co: CoroutineImpl) {
match co.resume() {
Some(ev) => {
co.stack_reduce();
ev.subscribe(co)
},
ev.subscribe(co);
}
None => {
// panic happened here
let local = unsafe { &mut *get_co_local(&co) };
Expand Down
1 change: 1 addition & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ impl CoroutinePool {
inner: Gn::new_opt(config().get_stack_size(), move || {
unreachable!("dummy coroutine should never be called");
}),
reduce: None,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl Scheduler {

/// put the coroutine to global queue so that next time it can be scheduled
#[inline]
pub fn schedule_global(&self, co: CoroutineImpl) {
pub fn schedule_global(&self, mut co: CoroutineImpl) {
self.global_queue.push(co);
// signal one waiting thread if any
self.workers.wake_one(self);
Expand Down

0 comments on commit c1bbc12

Please sign in to comment.