From 1725aaf8ab47a72717529bd78872f67538f8e6e9 Mon Sep 17 00:00:00 2001 From: "zhuxiujia@qq.com" Date: Wed, 13 Dec 2023 12:58:38 +0800 Subject: [PATCH] use thread stack --- mco-gen/src/gen_impl.rs | 22 ++++------------------ mco-gen/src/lib.rs | 2 +- mco-gen/src/rt.rs | 2 +- src/config.rs | 2 +- src/coroutine_impl.rs | 16 ++++++++-------- src/scheduler.rs | 21 ++++++++++++++++++++- 6 files changed, 35 insertions(+), 30 deletions(-) diff --git a/mco-gen/src/gen_impl.rs b/mco-gen/src/gen_impl.rs index 79156da..a2377ff 100644 --- a/mco-gen/src/gen_impl.rs +++ b/mco-gen/src/gen_impl.rs @@ -17,10 +17,6 @@ use crate::rt::{Context, ContextStack, Error}; use crate::scope::Scope; use crate::stack::{Func, Stack, StackBox}; -/// The default stack size for generators, in bytes. -// windows has a minimal size as 0x4a8!!!! -pub const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024; - /// the generator obj type, the functor passed to it must be Send pub struct GeneratorObj<'a, A, T, const LOCAL: bool> { pub gen: StackBox>, @@ -183,23 +179,23 @@ pub struct Gn { impl Gn { /// create a scoped generator with default stack size - pub fn new_scoped<'a, T, F>(f: F) -> Generator<'a, A, T> + pub fn new_scoped<'a, T, F>(size: usize,f: F) -> Generator<'a, A, T> where for<'scope> F: FnOnce(Scope<'scope, 'a, A, T>) -> T + Send + 'a, T: Send + 'a, A: Send + 'a, { - Self::new_scoped_opt(DEFAULT_STACK_SIZE, f) + Self::new_scoped_opt(size, f) } /// create a scoped local generator with default stack size - pub fn new_scoped_local<'a, T, F>(f: F) -> LocalGenerator<'a, A, T> + pub fn new_scoped_local<'a, T, F>(size: usize,f: F) -> LocalGenerator<'a, A, T> where F: FnOnce(Scope) -> T + 'a, T: 'a, A: 'a, { - Self::new_scoped_opt_local(DEFAULT_STACK_SIZE, f) + Self::new_scoped_opt_local(size, f) } /// create a scoped generator with specified stack size @@ -228,16 +224,6 @@ impl Gn { } impl Gn { - /// create a new generator with default stack size - #[cfg_attr(feature = "cargo-clippy", allow(clippy::new_ret_no_self))] - #[deprecated(since = "0.6.18", note = "please use `scope` version instead")] - pub fn new<'a, T: Any, F>(f: F) -> Generator<'a, A, T> - where - F: FnOnce() -> T + Send + 'a, - { - Self::new_opt(DEFAULT_STACK_SIZE, f) - } - /// create a new generator with specified stack size // the `may` library use this API so we can't deprecated it yet. pub fn new_opt<'a, T: Any, F>(size: usize, f: F) -> Generator<'a, A, T> diff --git a/mco-gen/src/lib.rs b/mco-gen/src/lib.rs index baca642..5e87d24 100644 --- a/mco-gen/src/lib.rs +++ b/mco-gen/src/lib.rs @@ -19,7 +19,7 @@ mod scope; mod stack; mod yield_; -pub use crate::gen_impl::{Generator, Gn, LocalGenerator, DEFAULT_STACK_SIZE}; +pub use crate::gen_impl::{Generator, Gn, LocalGenerator}; pub use crate::rt::{get_local_data, is_generator, Error}; pub use crate::scope::Scope; pub use crate::yield_::{ diff --git a/mco-gen/src/rt.rs b/mco-gen/src/rt.rs index ca4bf46..fa39e77 100644 --- a/mco-gen/src/rt.rs +++ b/mco-gen/src/rt.rs @@ -329,7 +329,7 @@ mod test { // test signal mask for _ in 0..2 { let result = catch_unwind(|| { - let mut g = Gn::new_scoped(move |_s: Scope<(), ()>| { + let mut g = Gn::new_scoped(4096,move |_s: Scope<(), ()>| { let guard = super::guard::current(); // make sure the compiler does not apply any optimization on it diff --git a/src/config.rs b/src/config.rs index 0ddb4f1..5fd2337 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; // default stack size, in usize // windows has a minimal size as 0x4a8!!!! -const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024; +pub const DEFAULT_STACK_SIZE: usize = 6 * 1024 * 1024; static WORKERS: AtomicUsize = AtomicUsize::new(0); static STACK_SIZE: AtomicUsize = AtomicUsize::new(DEFAULT_STACK_SIZE); diff --git a/src/coroutine_impl.rs b/src/coroutine_impl.rs index 4a0f5d2..3f4c496 100644 --- a/src/coroutine_impl.rs +++ b/src/coroutine_impl.rs @@ -7,7 +7,7 @@ use std::thread::ThreadId; use std::time::Duration; use crate::cancel::Cancel; -use crate::config::config; +use crate::config::{config, DEFAULT_STACK_SIZE}; use crate::err; use crate::join::{make_join_handle, Join, JoinHandle}; use crate::local::get_co_local_data; @@ -16,7 +16,7 @@ use crate::park::Park; use crate::scheduler::get_scheduler; use crossbeam::atomic::AtomicCell; use once_cell::sync::Lazy; -use mco_gen::{DEFAULT_STACK_SIZE, Generator, Gn, Stack}; +use mco_gen::{Generator, Gn, Stack}; /// ///////////////////////////////////////////////////////////////////////////// /// Coroutine framework types @@ -100,7 +100,7 @@ pub struct CoroutineImpl { impl CoroutineImpl { pub fn stack_reduce(&mut self) { - if self.reduce.is_none(){ + if self.reduce.is_none() { let reduce_data = unsafe { &*self.gen.stack.get() }.stack_reduce(DEFAULT_STACK_SIZE); if reduce_data.len() != 0 { self.reduce = Some(reduce_data); @@ -111,11 +111,10 @@ impl CoroutineImpl { } } - pub fn stack_restore(&mut self) { + pub fn stack_restore(&mut self, mut stack: Stack) { if let Some(v) = self.reduce.take() { - let mut s = Stack::new(DEFAULT_STACK_SIZE); - s.write_stack_data(v); - self.gen.stack = UnsafeCell::new(s); + stack.write_stack_data(v); + self.gen.stack = UnsafeCell::new(stack); } } } @@ -546,7 +545,8 @@ pub fn park_timeout(dur: Duration) { /// run the coroutine #[inline] pub(crate) fn run_coroutine(mut co: CoroutineImpl) { - co.stack_restore(); + let s = get_scheduler(); + co.stack_restore(s.get_stack(std::thread::current().id())); match co.resume() { Some(ev) => { co.stack_reduce(); diff --git a/src/scheduler.rs b/src/scheduler.rs index f87f412..9c9e0e5 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Once}; use std::thread; use std::time::Duration; -use crate::config::config; +use crate::config::{config, DEFAULT_STACK_SIZE}; use crate::coroutine_impl::{run_coroutine, CoroutineImpl}; use crate::io::{EventLoop, Selector}; use crate::std::sync::AtomicOption; @@ -16,6 +16,7 @@ use crossbeam::utils::Backoff; #[cfg(nightly)] use std::intrinsics::likely; use std::thread::ThreadId; +use mco_gen::{Stack}; #[cfg(not(nightly))] #[inline] @@ -126,6 +127,7 @@ fn init_scheduler() { println!("init worker {:?}", std::thread::current().id()); let s = unsafe { &*SCHED }; s.worker_ids.insert(std::thread::current().id(), id); + s.stacks.insert(std::thread::current().id(), Stack::new(DEFAULT_STACK_SIZE)); s.event_loop.run(id as usize).unwrap_or_else(|e| { panic!("event_loop failed running, err={}", e); }); @@ -189,6 +191,7 @@ pub struct Scheduler { // stealers: Vec)>>, workers_len: usize, pub(crate) worker_ids: dark_std::sync::SyncHashMap, + pub(crate) stacks: dark_std::sync::SyncHashMap, } impl Scheduler { @@ -218,6 +221,7 @@ impl Scheduler { let v = dark_std::sync::SyncHashMap::new(); v }, + stacks: dark_std::sync::SyncHashMap::new(), }) } @@ -342,4 +346,19 @@ impl Scheduler { pub fn get_selector(&self) -> &Selector { self.event_loop.get_selector() } + + #[inline] + pub fn get_stack(&self,key:std::thread::ThreadId) -> Stack{ + match self.stacks.get(&key){ + None => { + let v=Stack::new(DEFAULT_STACK_SIZE); + let r= v.shadow_clone(); + self.stacks.insert(key,Stack::new(DEFAULT_STACK_SIZE)); + r + } + Some(v) => { + v.shadow_clone() + } + } + } }