Skip to content

Commit

Permalink
use thread stack
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Dec 13, 2023
1 parent c1bbc12 commit 1725aaf
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 30 deletions.
22 changes: 4 additions & 18 deletions mco-gen/src/gen_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ use crate::rt::{Context, ContextStack, Error};
use crate::scope::Scope;
use crate::stack::{Func, Stack, StackBox};

/// The default stack size for generators, in bytes.
// windows has a minimal size as 0x4a8!!!!
pub const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024;

/// the generator obj type, the functor passed to it must be Send
pub struct GeneratorObj<'a, A, T, const LOCAL: bool> {
pub gen: StackBox<GeneratorImpl<'a, A, T>>,
Expand Down Expand Up @@ -183,23 +179,23 @@ pub struct Gn<A = ()> {

impl<A> Gn<A> {
/// create a scoped generator with default stack size
pub fn new_scoped<'a, T, F>(f: F) -> Generator<'a, A, T>
pub fn new_scoped<'a, T, F>(size: usize,f: F) -> Generator<'a, A, T>
where
for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + Send + 'a,
T: Send + 'a,
A: Send + 'a,
{
Self::new_scoped_opt(DEFAULT_STACK_SIZE, f)
Self::new_scoped_opt(size, f)
}

/// create a scoped local generator with default stack size
pub fn new_scoped_local<'a, T, F>(f: F) -> LocalGenerator<'a, A, T>
pub fn new_scoped_local<'a, T, F>(size: usize,f: F) -> LocalGenerator<'a, A, T>
where
F: FnOnce(Scope<A, T>) -> T + 'a,
T: 'a,
A: 'a,
{
Self::new_scoped_opt_local(DEFAULT_STACK_SIZE, f)
Self::new_scoped_opt_local(size, f)
}

/// create a scoped generator with specified stack size
Expand Down Expand Up @@ -228,16 +224,6 @@ impl<A> Gn<A> {
}

impl<A: Any> Gn<A> {
/// create a new generator with default stack size
#[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))]
#[deprecated(since = "0.6.18", note = "please use `scope` version instead")]
pub fn new<'a, T: Any, F>(f: F) -> Generator<'a, A, T>
where
F: FnOnce() -> T + Send + 'a,
{
Self::new_opt(DEFAULT_STACK_SIZE, f)
}

/// create a new generator with specified stack size
// the `may` library use this API so we can't deprecated it yet.
pub fn new_opt<'a, T: Any, F>(size: usize, f: F) -> Generator<'a, A, T>
Expand Down
2 changes: 1 addition & 1 deletion mco-gen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod scope;
mod stack;
mod yield_;

pub use crate::gen_impl::{Generator, Gn, LocalGenerator, DEFAULT_STACK_SIZE};
pub use crate::gen_impl::{Generator, Gn, LocalGenerator};
pub use crate::rt::{get_local_data, is_generator, Error};
pub use crate::scope::Scope;
pub use crate::yield_::{
Expand Down
2 changes: 1 addition & 1 deletion mco-gen/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod test {
// test signal mask
for _ in 0..2 {
let result = catch_unwind(|| {
let mut g = Gn::new_scoped(move |_s: Scope<(), ()>| {
let mut g = Gn::new_scoped(4096,move |_s: Scope<(), ()>| {
let guard = super::guard::current();

// make sure the compiler does not apply any optimization on it
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};

// default stack size, in usize
// windows has a minimal size as 0x4a8!!!!
const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024;
pub const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024;

static WORKERS: AtomicUsize = AtomicUsize::new(0);
static STACK_SIZE: AtomicUsize = AtomicUsize::new(DEFAULT_STACK_SIZE);
Expand Down
16 changes: 8 additions & 8 deletions src/coroutine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::thread::ThreadId;
use std::time::Duration;

use crate::cancel::Cancel;
use crate::config::config;
use crate::config::{config, DEFAULT_STACK_SIZE};
use crate::err;
use crate::join::{make_join_handle, Join, JoinHandle};
use crate::local::get_co_local_data;
Expand All @@ -16,7 +16,7 @@ use crate::park::Park;
use crate::scheduler::get_scheduler;
use crossbeam::atomic::AtomicCell;
use once_cell::sync::Lazy;
use mco_gen::{DEFAULT_STACK_SIZE, Generator, Gn, Stack};
use mco_gen::{Generator, Gn, Stack};

/// /////////////////////////////////////////////////////////////////////////////
/// Coroutine framework types
Expand Down Expand Up @@ -100,7 +100,7 @@ pub struct CoroutineImpl {

impl CoroutineImpl {
pub fn stack_reduce(&mut self) {
if self.reduce.is_none(){
if self.reduce.is_none() {
let reduce_data = unsafe { &*self.gen.stack.get() }.stack_reduce(DEFAULT_STACK_SIZE);
if reduce_data.len() != 0 {
self.reduce = Some(reduce_data);
Expand All @@ -111,11 +111,10 @@ impl CoroutineImpl {
}
}

pub fn stack_restore(&mut self) {
pub fn stack_restore(&mut self, mut stack: Stack) {
if let Some(v) = self.reduce.take() {
let mut s = Stack::new(DEFAULT_STACK_SIZE);
s.write_stack_data(v);
self.gen.stack = UnsafeCell::new(s);
stack.write_stack_data(v);
self.gen.stack = UnsafeCell::new(stack);
}
}
}
Expand Down Expand Up @@ -546,7 +545,8 @@ pub fn park_timeout(dur: Duration) {
/// run the coroutine
#[inline]
pub(crate) fn run_coroutine(mut co: CoroutineImpl) {
co.stack_restore();
let s = get_scheduler();
co.stack_restore(s.get_stack(std::thread::current().id()));
match co.resume() {
Some(ev) => {
co.stack_reduce();
Expand Down
21 changes: 20 additions & 1 deletion src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::{Arc, Once};
use std::thread;
use std::time::Duration;

use crate::config::config;
use crate::config::{config, DEFAULT_STACK_SIZE};
use crate::coroutine_impl::{run_coroutine, CoroutineImpl};
use crate::io::{EventLoop, Selector};
use crate::std::sync::AtomicOption;
Expand All @@ -16,6 +16,7 @@ use crossbeam::utils::Backoff;
#[cfg(nightly)]
use std::intrinsics::likely;
use std::thread::ThreadId;
use mco_gen::{Stack};

#[cfg(not(nightly))]
#[inline]
Expand Down Expand Up @@ -126,6 +127,7 @@ fn init_scheduler() {
println!("init worker {:?}", std::thread::current().id());
let s = unsafe { &*SCHED };
s.worker_ids.insert(std::thread::current().id(), id);
s.stacks.insert(std::thread::current().id(), Stack::new(DEFAULT_STACK_SIZE));
s.event_loop.run(id as usize).unwrap_or_else(|e| {
panic!("event_loop failed running, err={}", e);
});
Expand Down Expand Up @@ -189,6 +191,7 @@ pub struct Scheduler {
// stealers: Vec<Vec<(usize, deque::Stealer<CoroutineImpl>)>>,
workers_len: usize,
pub(crate) worker_ids: dark_std::sync::SyncHashMap<ThreadId, usize>,
pub(crate) stacks: dark_std::sync::SyncHashMap<ThreadId, Stack>,
}

impl Scheduler {
Expand Down Expand Up @@ -218,6 +221,7 @@ impl Scheduler {
let v = dark_std::sync::SyncHashMap::new();
v
},
stacks: dark_std::sync::SyncHashMap::new(),
})
}

Expand Down Expand Up @@ -342,4 +346,19 @@ impl Scheduler {
pub fn get_selector(&self) -> &Selector {
self.event_loop.get_selector()
}

#[inline]
pub fn get_stack(&self,key:std::thread::ThreadId) -> Stack{
match self.stacks.get(&key){
None => {
let v=Stack::new(DEFAULT_STACK_SIZE);
let r= v.shadow_clone();
self.stacks.insert(key,Stack::new(DEFAULT_STACK_SIZE));
r
}
Some(v) => {
v.shadow_clone()
}
}
}
}

0 comments on commit 1725aaf

Please sign in to comment.