diff --git a/base/partr.jl b/base/partr.jl index a4cfcb60fe520..f7dbe47e9ccc8 100644 --- a/base/partr.jl +++ b/base/partr.jl @@ -94,12 +94,16 @@ end function multiq_insert(task::Task, priority::UInt16) + ccall(:jl_tv_multiq_p_inc, Cvoid, ()) + + # tpid = task pool id tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task) heap_p = multiq_size(tpid) tp = tpid + 1 task.priority = priority + # TODO: task pushed to a randomly chosen thread rn = cong(heap_p, cong_unbias[tp]) tpheaps = heaps[tp] while !trylock(tpheaps[rn].lock) @@ -174,6 +178,7 @@ function multiq_deletemin() prio1 = heap.tasks[1].priority end @atomic :monotonic heap.priority = prio1 + ccall(:jl_tv_multiq_m_inc, Cvoid, ()) unlock(heap.lock) return task diff --git a/base/task.jl b/base/task.jl index 1a9bff051d7c7..7d0a80040f364 100644 --- a/base/task.jl +++ b/base/task.jl @@ -835,6 +835,8 @@ function schedule(t::Task, @nospecialize(arg); error=false) t.queue === nothing || Base.error("schedule: Task not runnable") setfield!(t, :result, arg) end + + # TODO: how do we ensure that the same task is not enqueued multiple times? enq_work(t) return t end @@ -868,6 +870,7 @@ immediately yields to `t` before calling the scheduler. function yield(t::Task, @nospecialize(x=nothing)) (t._state === task_state_runnable && t.queue === nothing) || error("yield: Task not runnable") t.result = x + ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) enq_work(current_task()) set_next_task(t) return try_yieldto(ensure_rescheduled) @@ -889,6 +892,7 @@ function yieldto(t::Task, @nospecialize(x=nothing)) elseif t._state === task_state_failed throw(t.result) end + ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) t.result = x set_next_task(t) return try_yieldto(identity) @@ -915,6 +919,7 @@ end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) + ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) t.result = exc t._isexception = true set_next_task(t) @@ -967,11 +972,17 @@ checktaskempty = Partr.multiq_check_empty end function wait() + ccall(:jl_tv_tasks_running_m_inc, Cvoid, ()) GC.safepoint() W = workqueue_for(Threads.threadid()) poptask(W) result = try_yieldto(ensure_rescheduled) + + # TODO: how does this call to process_events() interact with locks / conditions? + # First thing a task does after waking is to process events? + # Will there be contention on libuv lock? process_events() + # return when we come out of the queue return result end diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 6c8ea35cfa373..a87bde0bc0179 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -325,6 +325,7 @@ macro spawn(args...) let $(letargs...) local task = Task($thunk) task.sticky = false + # TODO: return value from jl_set_task_threadpoolid not checked ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid) if $(Expr(:islocal, var)) put!($var, task) diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index b17251d4a5af3..e371701f9e5fd 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -165,6 +165,10 @@ XX(jl_gc_counted_malloc) \ XX(jl_gc_counted_realloc_with_old_size) \ XX(jl_gc_diff_total_bytes) \ + XX(jl_tv_multiq_p_inc) \ + XX(jl_tv_multiq_m_inc) \ + XX(jl_tv_tasks_running_m_inc) \ + XX(jl_tv_getmetric) \ XX(jl_gc_enable) \ XX(jl_gc_enable_conservative_gc_support) \ XX(jl_gc_enable_finalizers) \ diff --git a/src/julia_internal.h b/src/julia_internal.h index b5fbf9416fcf0..ef7c389ef4992 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -21,6 +21,16 @@ #define sleep(x) Sleep(1000*x) #endif + +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_running_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_running_m; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_running_p; +extern JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_running_m; + #ifdef __cplusplus extern "C" { #endif diff --git a/src/partr.c b/src/partr.c index eeb0d0f456d97..2a59ecce0ebdf 100644 --- a/src/partr.c +++ b/src/partr.c @@ -30,6 +30,7 @@ static const int16_t sleeping = 1; // invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping. // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. // information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue. + // ^^^ ??? TODO // information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons. // information: These observations require sequentially-consistent fences to be inserted between each of those operational phases. // [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where @@ -91,7 +92,7 @@ JL_DLLEXPORT uint32_t jl_rand_ptls(uint32_t max, uint32_t unbias) // (called only by the main thread) void jl_init_threadinginfra(void) { - /* initialize the synchronization trees pool */ + /* initialize the synchronization trees pool */ // TODO: this comment obsolete? sleep_threshold = DEFAULT_THREAD_SLEEP_THRESHOLD; char *cp = getenv(THREAD_SLEEP_THRESHOLD_NAME); if (cp) { @@ -154,7 +155,6 @@ int jl_running_under_rr(int recheck) #endif } - // sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1 static int sleep_check_after_threshold(uint64_t *start_cycles) { @@ -171,6 +171,10 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) *start_cycles = jl_hrtime(); return 0; } + + // TODO: jl_hrtime() is a wall clock timestamp. This OS thread is not guaranteed to + // run continuously- there might be a context switch, and this thread could resume + // well after sleep_threshold has elapsed? uint64_t elapsed_cycles = jl_hrtime() - (*start_cycles); if (elapsed_cycles >= sleep_threshold) { *start_cycles = 0; @@ -179,12 +183,15 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) return 0; } - +// this doesn't guarantee that on return the thread is waking or awake. +// there is a race condition here where the other thread goes to sleep just +// after this thread checks its state and sees !(jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) static int wake_thread(int16_t tid) { jl_ptls_t other = jl_all_tls_states[tid]; int8_t state = sleeping; + // TODO: use of condition variable here doesn't adhere to required discipline? if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) { if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) { JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state); @@ -208,7 +215,7 @@ static void wake_libuv(void) /* ensure thread tid is awake if necessary */ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { - jl_task_t *ct = jl_current_task; + jl_task_t *ct = jl_current_task; // #define jl_current_task (container_of(jl_get_pgcstack(), jl_task_t, gcstack) int16_t self = jl_atomic_load_relaxed(&ct->tid); if (tid != self) jl_fence(); // [^store_buffering_1] @@ -240,6 +247,7 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) } // check if the other threads might be sleeping if (tid == -1) { +// TODO: every thread woken up when something added to multi-queue?? // something added to the multi-queue: notify all threads // in the future, we might want to instead wake some fraction of threads, // and let each of those wake additional threads if they find work @@ -266,7 +274,7 @@ static jl_task_t *get_next_task(jl_value_t *trypoptask, jl_value_t *q) jl_task_t *task = (jl_task_t*)jl_apply_generic(trypoptask, &q, 1); if (jl_typeis(task, jl_task_type)) { int self = jl_atomic_load_relaxed(&jl_current_task->tid); - jl_set_task_tid(task, self); + jl_set_task_tid(task, self); // TODO: return value not checked return task; } return NULL; @@ -287,6 +295,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping; } +// TODO: what is _threadedregion? extern _Atomic(unsigned) _threadedregion; JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty) @@ -405,7 +414,9 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, int8_t gc_state = jl_gc_safe_enter(ptls); uv_mutex_lock(&ptls->sleep_lock); while (may_sleep(ptls)) { + jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_m, 1); uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock); + jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_p, 1); // TODO: help with gc work here, if applicable } assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); diff --git a/src/partr.dot b/src/partr.dot new file mode 100644 index 0000000000000..7b48108671a01 --- /dev/null +++ b/src/partr.dot @@ -0,0 +1,94 @@ +digraph G { + node [shape=box] + //subgraph cluster0 { + // partr.c + wake_thread + wake_signal [shape=none] + sleep_check_state [shape=none] + sleep_lock [shape=none] + //wake_thread -> wake_signal [style=dotted] + //wake_thread -> sleep_check_state [style=dotted] + //wake_thread -> sleep_lock [style=dotted] + + wake_libuv -> jl_wake_libuv + //jl_wakeup_thread -> sleep_check_state [style=dotted] + jl_wakeup_thread -> uv_stop + jl_wakeup_thread -> wake_thread + jl_wakeup_thread -> wake_libuv + get_next_task -> trypoptask [label="jl_apply_generic"] + get_next_task -> jl_set_task_tid + check_empty -> checkempty [label="jl_apply_generic"] + //may_sleep -> sleep_check_state [style=dotted] + + jl_uv_n_waiters [shape=none] + jl_task_get_next -> get_next_task + jl_task_get_next -> check_empty + jl_task_get_next -> sleep_check_after_threshold + //jl_task_get_next -> sleep_check_state [style=dotted] + jl_task_get_next -> jl_global_event_loop + jl_task_get_next -> may_sleep + //jl_task_get_next -> jl_uv_n_waiters [style=dotted] + jl_task_get_next -> uv_run + //jl_task_get_next -> sleep_lock [style=dotted] + jl_task_get_next -> jl_process_events + jl_task_get_next -> multiq_check_empty + //} + + // task.c + //subgraph cluster1 { + jl_finish_task + //jl_set_next_task + //jl_get_next_task + ctx_switch + jl_switch -> ctx_switch + jl_switchto -> jl_set_next_task + jl_switchto -> jl_switch + jl_new_task + jl_task_wait + jl_task_wait -> wait [label="jl_apply"] + jl_schedule_task + jl_schedule_task -> schedule [label="jl_apply"] + start_task -> _start_task + _start_task -> jl_finish_task + jl_finish_task -> task_done_hook + //} + + // partr.jl + //subgraph cluster2 { + multiq_insert -> jl_get_task_threadpoolid + multiq_deletemin -> jl_threadpoolid + multiq_deletemin -> jl_set_task_tid + multiq_check_empty + //} + + // task.jl + //subgraph cluster3 { + workqueue_for + trypoptask + enq_work -> jl_set_task_tid + enq_work -> workqueue_for + enq_work -> multiq_insert + enq_work -> jl_wakeup_thread + trypoptask -> multiq_deletemin + poptask -> trypoptask + poptask -> jl_task_get_next + poptask -> set_next_task + wait -> workqueue_for + wait -> poptask + wait -> try_yieldto + wait -> process_events + task_done_hook -> wait + //} + + // threading.c + // subgraph cluster4 { + jl_start_threads -> uv_thread_create + + + // } + wait_condition [label="wait(::GenericCondition)"] + wait_condition -> wait + closewrite -> wait + uv_write -> wait + yield -> wait +} \ No newline at end of file diff --git a/src/safepoint.c b/src/safepoint.c index b2feccf74e068..ad67b6a466456 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -168,7 +168,11 @@ void jl_safepoint_wait_gc(void) // This is particularly important when run under rr. uv_mutex_lock(&safepoint_lock); if (jl_atomic_load_relaxed(&jl_gc_running)) + { + jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_m, 1); uv_cond_wait(&safepoint_cond, &safepoint_lock); + jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_p, 1); + } uv_mutex_unlock(&safepoint_lock); } } diff --git a/src/task.c b/src/task.c index a1adb704695a7..66826e7426096 100644 --- a/src/task.c +++ b/src/task.c @@ -292,6 +292,7 @@ static _Atomic(jl_function_t*) task_done_hook_func JL_GLOBALLY_ROOTED = NULL; void JL_NORETURN jl_finish_task(jl_task_t *t) { + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_m, 1); jl_task_t *ct = jl_current_task; JL_PROBE_RT_FINISH_TASK(ct); JL_SIGATOMIC_BEGIN(); @@ -685,6 +686,7 @@ JL_DLLEXPORT void jl_switch(void) if (other_defer_signal && !defer_signal) jl_sigint_safepoint(ptls); + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1); JL_PROBE_RT_RUN_TASK(ct); } @@ -954,6 +956,7 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion #ifdef _COMPILER_ASAN_ENABLED_ t->ctx.asan_fake_stack = NULL; #endif + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_p, 1); return t; } @@ -977,6 +980,7 @@ JL_DLLEXPORT jl_value_t *jl_get_root_task(void) return (jl_value_t*)ct->ptls->root_task; } +// this function has no callers? JL_DLLEXPORT void jl_task_wait() { static jl_function_t *wait_func = NULL; @@ -1067,6 +1071,7 @@ CFI_NORETURN jl_atomic_store_release(&pt->tid, -1); #endif + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_p, 1); ct->started = 1; JL_PROBE_RT_START_TASK(ct); if (jl_atomic_load_relaxed(&ct->_isexception)) { @@ -1092,6 +1097,7 @@ CFI_NORETURN skip_pop_exception:; } ct->result = res; + jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_m, 1); jl_gc_wb(ct, ct->result); jl_finish_task(ct); jl_gc_debug_critical_error(); diff --git a/src/threading.c b/src/threading.c index 2cebdb22fc0aa..ebf2de78367f4 100644 --- a/src/threading.c +++ b/src/threading.c @@ -40,6 +40,40 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_running_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_threads_running_m = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_m = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_multiq_m = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_running_p = 0; +JL_DLLEXPORT _Atomic(uint64_t) jl_tv_tasks_running_m = 0; + +JL_DLLEXPORT void jl_tv_multiq_p_inc(void) +{ jl_atomic_fetch_add_relaxed(&jl_tv_multiq_p, 1); } + +JL_DLLEXPORT void jl_tv_multiq_m_inc(void) +{ jl_atomic_fetch_add_relaxed(&jl_tv_multiq_m, 1); } + +JL_DLLEXPORT void jl_tv_tasks_running_m_inc(void) +{ jl_atomic_fetch_add_relaxed(&jl_tv_tasks_running_m, 1); } + +JL_DLLEXPORT int jl_tv_getmetric(int i) +{ + switch(i) + { + case 1: return jl_atomic_load_relaxed(&jl_tv_threads_running_p); + case 2: return jl_atomic_load_relaxed(&jl_tv_threads_running_m); + case 3: return jl_atomic_load_relaxed(&jl_tv_tasks_p); + case 4: return jl_atomic_load_relaxed(&jl_tv_tasks_m); + case 5: return jl_atomic_load_relaxed(&jl_tv_multiq_p); + case 6: return jl_atomic_load_relaxed(&jl_tv_multiq_m); + case 7: return jl_atomic_load_relaxed(&jl_tv_tasks_running_p); + case 8: return jl_atomic_load_relaxed(&jl_tv_tasks_running_m); + default: return 0; + } +} + JL_DLLEXPORT void *jl_get_ptls_states(void) { // mostly deprecated: use current_task instead @@ -569,8 +603,11 @@ void jl_start_threads(void) jl_threadarg_t *t = (jl_threadarg_t *)malloc_s(sizeof(jl_threadarg_t)); // ownership will be passed to the thread t->tid = i; t->barrier = &thread_init_done; + // TODO: increment threads running metric uv_thread_create(&uvtid, jl_threadfun, t); if (exclusive) { + // TODO: log setting thread affinity + jl_printf(JL_STDERR, "Setting thread affinity for thread %d\n", i); mask[i] = 1; uv_thread_setaffinity(&uvtid, mask, NULL, cpumasksize); mask[i] = 0; @@ -625,7 +662,11 @@ void _jl_mutex_wait(jl_task_t *self, jl_mutex_t *lock, int safepoint) // when running under `rr`, use system mutexes rather than spin locking uv_mutex_lock(&tls_lock); if (jl_atomic_load_relaxed(&lock->owner)) + { + jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_m, 1); uv_cond_wait(&cond, &tls_lock); + jl_atomic_fetch_add_relaxed(&jl_tv_threads_running_p, 1); + } uv_mutex_unlock(&tls_lock); } jl_cpu_pause();