Skip to content

Commit

Permalink
edit timer
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiujia committed Dec 11, 2023
1 parent f2c8c91 commit e7f7b68
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ once_cell = "1.18"
parking_lot = "0.11"
time = { version = "0.3", features = ["formatting", "local-offset", "parsing", "serde"] }
serde = "1.0"
dark-std = "0.2"

[target.'cfg(unix)'.dependencies]
nix = { version = "0.27", features = ["event"] }
Expand Down
2 changes: 1 addition & 1 deletion mco-gen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//!

#![cfg_attr(nightly, feature(thread_local))]
#![cfg_attr(test, deny(warnings))]
//#![cfg_attr(test, deny(warnings))]
#![deny(missing_docs)]
#![allow(deprecated)]

Expand Down
43 changes: 38 additions & 5 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::io;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Once};
use std::thread;
use std::time::Duration;
Expand All @@ -16,6 +17,7 @@ use crossbeam::utils::Backoff;

#[cfg(nightly)]
use std::intrinsics::likely;
use std::thread::ThreadId;

#[cfg(not(nightly))]
#[inline]
Expand Down Expand Up @@ -95,8 +97,10 @@ fn init_scheduler() {
}
filter_cancel_panic();


// timer thread
thread::spawn(move || {
println!("init timer worker {:?}", std::thread::current().id());
let s = unsafe { &*SCHED };
// timer function
let timer_event_handler = |co: Arc<AtomicOption<CoroutineImpl>>| {
Expand All @@ -105,17 +109,31 @@ fn init_scheduler() {
// set the timeout result for the coroutine
set_co_para(&mut c, io::Error::new(io::ErrorKind::TimedOut, "timeout"));
// s.schedule_global(c);
run_coroutine(c);
// run_coroutine(c);
for (t, b) in s.sleeps.iter() {
if b.load(Ordering::Relaxed) {
let id = s.worker_ids2.get(&t);
if let Some(id) = id {
s.local_queues[*id].push(c);
s.get_selector().wakeup(*id);
break;
}
}
}
}
};

s.timer_thread.run(&timer_event_handler);
});

println!("init workers {}", workers);
// io event loop thread
for id in 0..workers {
thread::spawn(move || {
println!("init worker {:?}", std::thread::current().id());
let s = unsafe { &*SCHED };
s.sleeps.insert(std::thread::current().id(), AtomicBool::new(false));
s.worker_ids.insert(id, std::thread::current().id());
s.worker_ids2.insert(std::thread::current().id(), id);
s.event_loop.run(id as usize).unwrap_or_else(|e| {
panic!("event_loop failed running, err={}", e);
});
Expand Down Expand Up @@ -179,6 +197,9 @@ pub struct Scheduler {
timer_thread: TimerThread,
stealers: Vec<Vec<(usize, deque::Stealer<CoroutineImpl>)>>,
workers_len: usize,
pub(crate) sleeps: dark_std::sync::SyncHashMap<ThreadId, AtomicBool>,
pub(crate) worker_ids: dark_std::sync::SyncHashMap<usize, ThreadId>,
pub(crate) worker_ids2: dark_std::sync::SyncHashMap<ThreadId, usize>,
}

impl Scheduler {
Expand All @@ -205,6 +226,18 @@ impl Scheduler {
workers: ParkStatus::new(workers as u64),
stealers,
workers_len: workers,
sleeps: {
let v = dark_std::sync::SyncHashMap::new();
v
},
worker_ids: {
let v = dark_std::sync::SyncHashMap::new();
v
},
worker_ids2: {
let v = dark_std::sync::SyncHashMap::new();
v
},
})
}

Expand Down Expand Up @@ -250,9 +283,9 @@ impl Scheduler {
#[inline]
pub fn schedule(&self, co: CoroutineImpl) {
#[cfg(nightly)]
let id = WORKER_ID.load(Ordering::Relaxed);
let id = WORKER_ID.load(Ordering::Relaxed);
#[cfg(not(nightly))]
let id = WORKER_ID.with(|id| id.load(Ordering::Relaxed));
let id = WORKER_ID.with(|id| id.load(Ordering::Relaxed));

if id == !1 {
self.schedule_global(co);
Expand Down
8 changes: 7 additions & 1 deletion src/sleep.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::std::sync::AtomicOption;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::thread;
use std::time::Duration;
use parking_lot::Mutex;

use crate::coroutine_impl::{co_cancel_data, is_coroutine, CoroutineImpl, EventSource};
use crate::scheduler::get_scheduler;
use crate::std::lazy::sync::Lazy;
use crate::yield_now::{get_co_para, yield_with};

struct Sleep {
Expand Down Expand Up @@ -33,7 +36,10 @@ pub fn sleep(dur: Duration) {
if !is_coroutine() {
return thread::sleep(dur);
}

let s= get_scheduler().sleeps.get(&thread::current().id());
if let Some(s)=s{
s.store(true,Ordering::Relaxed);
};
let sleeper = Sleep { dur };
yield_with(&sleeper);
// consume the timeout error
Expand Down

0 comments on commit e7f7b68

Please sign in to comment.