Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scheduler metrics #23

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions base/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,16 @@ end


function multiq_insert(task::Task, priority::UInt16)
ccall(:jl_tv_multiq_p_inc, Cvoid, ())

# tpid = task pool id
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task)
heap_p = multiq_size(tpid)
tp = tpid + 1

task.priority = priority

# TODO: task pushed to a randomly chosen thread
rn = cong(heap_p, cong_unbias[tp])
tpheaps = heaps[tp]
while !trylock(tpheaps[rn].lock)
Expand Down Expand Up @@ -174,6 +178,7 @@ function multiq_deletemin()
prio1 = heap.tasks[1].priority
end
@atomic :monotonic heap.priority = prio1
ccall(:jl_tv_multiq_m_inc, Cvoid, ())
unlock(heap.lock)

return task
Expand Down
11 changes: 11 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,8 @@ function schedule(t::Task, @nospecialize(arg); error=false)
t.queue === nothing || Base.error("schedule: Task not runnable")
setfield!(t, :result, arg)
end

# TODO: how do we ensure that the same task is not enqueued multiple times?
enq_work(t)
return t
end
Expand Down Expand Up @@ -868,6 +870,7 @@ immediately yields to `t` before calling the scheduler.
function yield(t::Task, @nospecialize(x=nothing))
(t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable")
t.result = x
ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
enq_work(current_task())
set_next_task(t)
return try_yieldto(ensure_rescheduled)
Expand All @@ -889,6 +892,7 @@ function yieldto(t::Task, @nospecialize(x=nothing))
elseif t._state === task_state_failed
throw(t.result)
end
ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
t.result = x
set_next_task(t)
return try_yieldto(identity)
Expand All @@ -915,6 +919,7 @@ end

# yield to a task, throwing an exception in it
function throwto(t::Task, @nospecialize exc)
ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
t.result = exc
t._isexception = true
set_next_task(t)
Expand Down Expand Up @@ -967,11 +972,17 @@ checktaskempty = Partr.multiq_check_empty
end

function wait()
ccall(:jl_tv_tasks_running_m_inc, Cvoid, ())
GC.safepoint()
W = workqueue_for(Threads.threadid())
poptask(W)
result = try_yieldto(ensure_rescheduled)

# TODO: how does this call to process_events() interact with locks / conditions?
# First thing a task does after waking is to process events?
# Will there be contention on libuv lock?
process_events()

# return when we come out of the queue
return result
end
Expand Down
1 change: 1 addition & 0 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ macro spawn(args...)
let $(letargs...)
local task = Task($thunk)
task.sticky = false
# TODO: return value from jl_set_task_threadpoolid not checked
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid)
if $(Expr(:islocal, var))
put!($var, task)
Expand Down
4 changes: 4 additions & 0 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@
XX(jl_gc_counted_malloc) \
XX(jl_gc_counted_realloc_with_old_size) \
XX(jl_gc_diff_total_bytes) \
XX(jl_tv_multiq_p_inc) \
XX(jl_tv_multiq_m_inc) \
XX(jl_tv_tasks_running_m_inc) \
XX(jl_tv_getmetric) \
XX(jl_gc_enable) \
XX(jl_gc_enable_conservative_gc_support) \
XX(jl_gc_enable_finalizers) \
Expand Down
10 changes: 10 additions & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
#define sleep(x) Sleep(1000*x)
#endif


extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_running_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_running_m;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_running_p;
extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_running_m;

#ifdef __cplusplus
extern "C" {
#endif
Expand Down
21 changes: 16 additions & 5 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static const int16_t sleeping = 1;
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
// ^^^ ??? TODO
// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
// information: These observations require sequentially-consistent fences to be inserted between each of those operational phases.
// [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where
Expand Down Expand Up @@ -91,7 +92,7 @@ JL_DLLEXPORT uint32_t jl_rand_ptls(uint32_t max, uint32_t unbias)
// (called only by the main thread)
void jl_init_threadinginfra(void)
{
/* initialize the synchronization trees pool */
/* initialize the synchronization trees pool */ // TODO: this comment obsolete?
sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD;
char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME);
if (cp) {
Expand Down Expand Up @@ -154,7 +155,6 @@ int jl_running_under_rr(int recheck)
#endif
}


// sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1
static int sleep_check_after_threshold(uint64_t *start_cycles)
{
Expand All @@ -171,6 +171,10 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
*start_cycles = jl_hrtime();
return 0;
}

// TODO: jl_hrtime() is a wall clock timestamp. This OS thread is not guaranteed to
// run continuously- there might be a context switch, and this thread could resume
// well after sleep_threshold has elapsed?
uint64_t elapsed_cycles = jl_hrtime() - (*start_cycles);
if (elapsed_cycles >= sleep_threshold) {
*start_cycles = 0;
Expand All @@ -179,12 +183,15 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
return 0;
}


// this doesn't guarantee that on return the thread is waking or awake.
// there is a race condition here where the other thread goes to sleep just
// after this thread checks its state and sees !(jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping)
static int wake_thread(int16_t tid)
{
jl_ptls_t other = jl_all_tls_states[tid];
int8_t state = sleeping;

// TODO: use of condition variable here doesn't adhere to required discipline?
if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state);
Expand All @@ -208,7 +215,7 @@ static void wake_libuv(void)
/* ensure thread tid is awake if necessary */
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
{
jl_task_t *ct = jl_current_task;
jl_task_t *ct = jl_current_task; // #define jl_current_task (container_of(jl_get_pgcstack(), jl_task_t, gcstack)
int16_t self = jl_atomic_load_relaxed(&ct->tid);
if (tid != self)
jl_fence(); // [^store_buffering_1]
Expand Down Expand Up @@ -240,6 +247,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
}
// check if the other threads might be sleeping
if (tid == -1) {
// TODO: every thread woken up when something added to multi-queue??
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
Expand All @@ -266,7 +274,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q)
jl_task_t *task = (jl_task_t*)jl_apply_generic(trypoptask, &q, 1);
if (jl_typeis(task, jl_task_type)) {
int self = jl_atomic_load_relaxed(&jl_current_task->tid);
jl_set_task_tid(task, self);
jl_set_task_tid(task, self); // TODO: return value not checked
return task;
}
return NULL;
Expand All @@ -287,6 +295,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}

// TODO: what is _threadedregion?
extern _Atomic(unsigned) _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
Expand Down Expand Up @@ -405,7 +414,9 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_m, 1);
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_p, 1);
// TODO: help with gc work here, if applicable
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
Expand Down
94 changes: 94 additions & 0 deletions src/partr.dot
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
digraph G {
node [shape=box]
//subgraph cluster0 {
// partr.c
wake_thread
wake_signal [shape=none]
sleep_check_state [shape=none]
sleep_lock [shape=none]
//wake_thread -> wake_signal [style=dotted]
//wake_thread -> sleep_check_state [style=dotted]
//wake_thread -> sleep_lock [style=dotted]

wake_libuv -> jl_wake_libuv
//jl_wakeup_thread -> sleep_check_state [style=dotted]
jl_wakeup_thread -> uv_stop
jl_wakeup_thread -> wake_thread
jl_wakeup_thread -> wake_libuv
get_next_task -> trypoptask [label="jl_apply_generic"]
get_next_task -> jl_set_task_tid
check_empty -> checkempty [label="jl_apply_generic"]
//may_sleep -> sleep_check_state [style=dotted]

jl_uv_n_waiters [shape=none]
jl_task_get_next -> get_next_task
jl_task_get_next -> check_empty
jl_task_get_next -> sleep_check_after_threshold
//jl_task_get_next -> sleep_check_state [style=dotted]
jl_task_get_next -> jl_global_event_loop
jl_task_get_next -> may_sleep
//jl_task_get_next -> jl_uv_n_waiters [style=dotted]
jl_task_get_next -> uv_run
//jl_task_get_next -> sleep_lock [style=dotted]
jl_task_get_next -> jl_process_events
jl_task_get_next -> multiq_check_empty
//}

// task.c
//subgraph cluster1 {
jl_finish_task
//jl_set_next_task
//jl_get_next_task
ctx_switch
jl_switch -> ctx_switch
jl_switchto -> jl_set_next_task
jl_switchto -> jl_switch
jl_new_task
jl_task_wait
jl_task_wait -> wait [label="jl_apply"]
jl_schedule_task
jl_schedule_task -> schedule [label="jl_apply"]
start_task -> _start_task
_start_task -> jl_finish_task
jl_finish_task -> task_done_hook
//}

// partr.jl
//subgraph cluster2 {
multiq_insert -> jl_get_task_threadpoolid
multiq_deletemin -> jl_threadpoolid
multiq_deletemin -> jl_set_task_tid
multiq_check_empty
//}

// task.jl
//subgraph cluster3 {
workqueue_for
trypoptask
enq_work -> jl_set_task_tid
enq_work -> workqueue_for
enq_work -> multiq_insert
enq_work -> jl_wakeup_thread
trypoptask -> multiq_deletemin
poptask -> trypoptask
poptask -> jl_task_get_next
poptask -> set_next_task
wait -> workqueue_for
wait -> poptask
wait -> try_yieldto
wait -> process_events
task_done_hook -> wait
//}

// threading.c
// subgraph cluster4 {
jl_start_threads -> uv_thread_create


// }
wait_condition [label="wait(::GenericCondition)"]
wait_condition -> wait
closewrite -> wait
uv_write -> wait
yield -> wait
}
4 changes: 4 additions & 0 deletions src/safepoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ void jl_safepoint_wait_gc(void)
// This is particularly important when run under rr.
uv_mutex_lock(&safepoint_lock);
if (jl_atomic_load_relaxed(&jl_gc_running))
{
jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_m, 1);
uv_cond_wait(&safepoint_cond, &safepoint_lock);
jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_p, 1);
}
uv_mutex_unlock(&safepoint_lock);
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ static _Atomic(jl_function_t*) task_done_hook_func JL_GLOBALLY_ROOTED = NULL;

void JL_NORETURN jl_finish_task(jl_task_t *t)
{
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_m, 1);
jl_task_t *ct = jl_current_task;
JL_PROBE_RT_FINISH_TASK(ct);
JL_SIGATOMIC_BEGIN();
Expand Down Expand Up @@ -685,6 +686,7 @@ JL_DLLEXPORT void jl_switch(void)
if (other_defer_signal && !defer_signal)
jl_sigint_safepoint(ptls);

jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1);
JL_PROBE_RT_RUN_TASK(ct);
}

Expand Down Expand Up @@ -954,6 +956,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion
#ifdef _COMPILER_ASAN_ENABLED_
t->ctx.asan_fake_stack = NULL;
#endif
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_p, 1);
return t;
}

Expand All @@ -977,6 +980,7 @@ JL_DLLEXPORT jl_value_t *jl_get_root_task(void)
return (jl_value_t*)ct->ptls->root_task;
}

// this function has no callers?
JL_DLLEXPORT void jl_task_wait()
{
static jl_function_t *wait_func = NULL;
Expand Down Expand Up @@ -1067,6 +1071,7 @@ CFI_NORETURN
jl_atomic_store_release(&pt->tid, -1);
#endif

jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1);
ct->started = 1;
JL_PROBE_RT_START_TASK(ct);
if (jl_atomic_load_relaxed(&ct->_isexception)) {
Expand All @@ -1092,6 +1097,7 @@ CFI_NORETURN
skip_pop_exception:;
}
ct->result = res;
jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_m, 1);
jl_gc_wb(ct, ct->result);
jl_finish_task(ct);
jl_gc_debug_critical_error();
Expand Down
Loading