Skip to content

Commit

Permalink
Wall-time/all tasks profiler (#55889)
Browse files Browse the repository at this point in the history
One limitation of sampling CPU/thread profiles, as is currently done in
Julia, is that they primarily capture samples from CPU-intensive tasks.

If many tasks are performing IO or contending for concurrency primitives
like semaphores, these tasks won’t appear in the profile, as they aren't
scheduled on OS threads sampled by the profiler.

A wall-time profiler, like the one implemented in this PR, samples tasks
regardless of OS thread scheduling. This enables profiling of IO-heavy
tasks and detecting areas of heavy contention in the system.

Co-developed with @nickrobinson251.
  • Loading branch information
d-netto authored Oct 25, 2024
1 parent f1a90e0 commit f6a38e0
Show file tree
Hide file tree
Showing 19 changed files with 796 additions and 231 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,6 @@ External dependencies
Tooling Improvements
--------------------

- A wall-time profiler is now available for users who need a sampling profiler that captures tasks regardless of their scheduling or running state. This type of profiler enables profiling of I/O-heavy tasks and helps detect areas of heavy contention in the system ([#55889]).

<!--- generated by NEWS-update.jl: -->
Binary file added doc/src/manual/img/cpu-profile.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/src/manual/img/task-sampling-failure.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
214 changes: 214 additions & 0 deletions doc/src/manual/profile.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,220 @@ Of course, you can decrease the delay as well as increase it; however, the overh
grows once the delay becomes similar to the amount of time needed to take a backtrace (~30 microseconds
on the author's laptop).

## Wall-time Profiler

### Introduction & Problem Motivation

The profiler described in the previous section is a sampling CPU profiler. At a high level, the profiler periodically stops all Julia compute threads to collect their backtraces and estimates the time spent in each function based on the number of backtrace samples that include a frame from that function. However, note that only tasks currently running on system threads just before the profiler stops them will have their backtraces collected.

While this profiler is typically well-suited for workloads where the majority of tasks are compute-bound, it is less helpful for systems where most tasks are IO-heavy or for diagnosing contention on synchronization primitives in your code.

Let's consider this simple workload:

```Julia
using Base.Threads
using Profile
using PProf
ch = Channel(1)
const N_SPAWNED_TASKS = (1 << 10)
const WAIT_TIME_NS = 10_000_000
function spawn_a_bunch_of_tasks_waiting_on_channel()
for i in 1:N_SPAWNED_TASKS
Threads.@spawn begin
take!(ch)
end
end
end
function busywait()
t0 = time_ns()
while true
if time_ns() - t0 > WAIT_TIME_NS
break
end
end
end
function main()
spawn_a_bunch_of_tasks_waiting_on_channel()
for i in 1:N_SPAWNED_TASKS
put!(ch, i)
busywait()
end
end
Profile.@profile main()
```

Our goal is to detect whether there is contention on the `ch` channel—i.e., whether the number of waiters is excessive given the rate at which work items are being produced in the channel.

If we run this, we obtain the following [PProf](https://github.com/JuliaPerf/PProf.jl) flame graph:

![CPU Profile](./img/cpu-profile.png)()

This profile provides no information to help determine where contention occurs in the system’s synchronization primitives. Waiters on a channel will be blocked and descheduled, meaning no system thread will be running the tasks assigned to those waiters, and as a result, they won't be sampled by the profiler.

### Wall-time Profiler

Instead of sampling threads—and thus only sampling tasks that are running—a wall-time task profiler samples tasks independently of their scheduling state. For example, tasks that are sleeping on a synchronization primitive at the time the profiler is running will be sampled with the same probability as tasks that were actively running when the profiler attempted to capture backtraces.

This approach allows us to construct a profile where backtraces from tasks blocked on the `ch` channel, as in the example above, are actually represented.

Let's run the same example, but now with a wall-time profiler:


```Julia
using Base.Threads
using Profile
using PProf
ch = Channel(1)
const N_SPAWNED_TASKS = (1 << 10)
const WAIT_TIME_NS = 10_000_000
function spawn_a_bunch_of_tasks_waiting_on_channel()
for i in 1:N_SPAWNED_TASKS
Threads.@spawn begin
take!(ch)
end
end
end
function busywait()
t0 = time_ns()
while true
if time_ns() - t0 > WAIT_TIME_NS
break
end
end
end
function main()
spawn_a_bunch_of_tasks_waiting_on_channel()
for i in 1:N_SPAWNED_TASKS
put!(ch, i)
busywait()
end
end
Profile.@profile_walltime main()
```

We obtain the following flame graph:

![Wall-time Profile Channel](./img/wall-time-profiler-channel-example.png)()

We see that a large number of samples come from channel-related `take!` functions, which allows us to determine that there is indeed an excessive number of waiters in `ch`.

### A Compute-Bound Workload

Despite the wall-time profiler sampling all live tasks in the system and not just the currently running ones, it can still be helpful for identifying performance hotspots, even if your code is compute-bound. Let’s consider a simple example:

```Julia
using Base.Threads
using Profile
using PProf
ch = Channel(1)
const MAX_ITERS = (1 << 22)
const N_TASKS = (1 << 12)
function spawn_a_task_waiting_on_channel()
Threads.@spawn begin
take!(ch)
end
end
function sum_of_sqrt()
sum_of_sqrt = 0.0
for i in 1:MAX_ITERS
sum_of_sqrt += sqrt(i)
end
return sum_of_sqrt
end
function spawn_a_bunch_of_compute_heavy_tasks()
Threads.@sync begin
for i in 1:N_TASKS
Threads.@spawn begin
sum_of_sqrt()
end
end
end
end
function main()
spawn_a_task_waiting_on_channel()
spawn_a_bunch_of_compute_heavy_tasks()
end
Profile.@profile_walltime main()
```

After collecting a wall-time profile, we get the following flame graph:

![Wall-time Profile Compute-Bound](./img/wall-time-profiler-compute-bound-example.png)()

Notice how many of the samples contain `sum_of_sqrt`, which is the expensive compute function in our example.

### Identifying Task Sampling Failures in your Profile

In the current implementation, the wall-time profiler attempts to sample from tasks that have been alive since the last garbage collection, along with those created afterward. However, if most tasks are extremely short-lived, you may end up sampling tasks that have already completed, resulting in missed backtrace captures.

If you encounter samples containing `failed_to_sample_task_fun` or `failed_to_stop_thread_fun`, this likely indicates a high volume of short-lived tasks, which prevented their backtraces from being collected.

Let's consider this simple example:

```Julia
using Base.Threads
using Profile
using PProf
const N_SPAWNED_TASKS = (1 << 16)
const WAIT_TIME_NS = 100_000
function spawn_a_bunch_of_short_lived_tasks()
for i in 1:N_SPAWNED_TASKS
Threads.@spawn begin
# Do nothing
end
end
end
function busywait()
t0 = time_ns()
while true
if time_ns() - t0 > WAIT_TIME_NS
break
end
end
end
function main()
GC.enable(false)
spawn_a_bunch_of_short_lived_tasks()
for i in 1:N_SPAWNED_TASKS
busywait()
end
GC.enable(true)
end
Profile.@profile_walltime main()
```

Notice that the tasks spawned in `spawn_a_bunch_of_short_lived_tasks` are extremely short-lived. Since these tasks constitute the majority in the system, we will likely miss capturing a backtrace for most sampled tasks.

After collecting a wall-time profile, we obtain the following flame graph:

![Task Sampling Failure](./img/task-sampling-failure.png)()

The large number of samples from `failed_to_stop_thread_fun` confirms that we have a significant number of short-lived tasks in the system.

## Memory allocation analysis

One of the most common techniques to improve performance is to reduce memory allocation. Julia
Expand Down
33 changes: 33 additions & 0 deletions src/gc-stacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,39 @@ void sweep_stack_pool_loop(void) JL_NOTSAFEPOINT
jl_atomic_fetch_add(&gc_n_threads_sweeping_stacks, -1);
}

// Builds a list of the live tasks. Racy: `live_tasks` can expand at any time.
arraylist_t *jl_get_all_tasks_arraylist(void) JL_NOTSAFEPOINT
{
arraylist_t *tasks = (arraylist_t*)malloc_s(sizeof(arraylist_t));
arraylist_new(tasks, 0);
size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
for (size_t i = 0; i < nthreads; i++) {
// skip GC threads...
if (gc_is_collector_thread(i)) {
continue;
}
jl_ptls_t ptls2 = allstates[i];
if (ptls2 == NULL) {
continue;
}
jl_task_t *t = ptls2->root_task;
if (t->ctx.stkbuf != NULL) {
arraylist_push(tasks, t);
}
small_arraylist_t *live_tasks = &ptls2->gc_tls_common.heap.live_tasks;
size_t n = mtarraylist_length(live_tasks);
for (size_t i = 0; i < n; i++) {
jl_task_t *t = (jl_task_t*)mtarraylist_get(live_tasks, i);
assert(t != NULL);
if (t->ctx.stkbuf != NULL) {
arraylist_push(tasks, t);
}
}
}
return tasks;
}

JL_DLLEXPORT jl_array_t *jl_live_tasks(void)
{
size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
Expand Down
24 changes: 22 additions & 2 deletions src/gc-stock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,22 @@ void gc_sweep_wait_for_all_stacks(void) JL_NOTSAFEPOINT
}
}

void sweep_stack_pools(jl_ptls_t ptls) JL_NOTSAFEPOINT
void sweep_mtarraylist_buffers(void) JL_NOTSAFEPOINT
{
for (int i = 0; i < gc_n_threads; i++) {
jl_ptls_t ptls = gc_all_tls_states[i];
if (ptls == NULL) {
continue;
}
small_arraylist_t *buffers = &ptls->lazily_freed_mtarraylist_buffers;
void *buf;
while ((buf = small_arraylist_pop(buffers)) != NULL) {
free(buf);
}
}
}

void sweep_stack_pools_and_mtarraylist_buffers(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// initialize ptls index for parallel sweeping of stack pools
assert(gc_n_threads);
Expand All @@ -1035,9 +1050,12 @@ void sweep_stack_pools(jl_ptls_t ptls) JL_NOTSAFEPOINT
else
jl_atomic_store_relaxed(&gc_stack_free_idx, stack_free_idx + 1);
jl_atomic_store_release(&gc_ptls_sweep_idx, gc_n_threads - 1); // idx == gc_n_threads = release stacks to the OS so it's serial
uv_mutex_lock(&live_tasks_lock);
gc_sweep_wake_all_stacks(ptls);
sweep_stack_pool_loop();
gc_sweep_wait_for_all_stacks();
sweep_mtarraylist_buffers();
uv_mutex_unlock(&live_tasks_lock);
}

static void gc_pool_sync_nfree(jl_gc_pagemeta_t *pg, jl_taggedvalue_t *last) JL_NOTSAFEPOINT
Expand Down Expand Up @@ -3084,7 +3102,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
current_sweep_full = sweep_full;
sweep_weak_refs();
uint64_t stack_pool_time = jl_hrtime();
sweep_stack_pools(ptls);
sweep_stack_pools_and_mtarraylist_buffers(ptls);
stack_pool_time = jl_hrtime() - stack_pool_time;
gc_num.total_stack_pool_sweep_time += stack_pool_time;
gc_num.stack_pool_sweep_time = stack_pool_time;
Expand Down Expand Up @@ -3453,6 +3471,8 @@ void jl_init_thread_heap(jl_ptls_t ptls)
jl_atomic_store_relaxed(&q->bottom, 0);
jl_atomic_store_relaxed(&q->array, wsa2);
arraylist_new(&mq->reclaim_set, 32);
// Initialize `lazily_freed_mtarraylist_buffers`
small_arraylist_new(&ptls->lazily_freed_mtarraylist_buffers, 0);

memset(&ptls->gc_tls_common.gc_num, 0, sizeof(ptls->gc_tls_common.gc_num));
jl_atomic_store_relaxed(&ptls->gc_tls_common.gc_num.allocd, -(int64_t)gc_num.interval);
Expand Down
4 changes: 4 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,10 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel)

// initialize symbol-table lock
uv_mutex_init(&symtab_lock);
// initialize the live tasks lock
uv_mutex_init(&live_tasks_lock);
// initialize the profiler buffer lock
uv_mutex_init(&bt_data_prof_lock);

// initialize backtraces
jl_init_profile_lock();
Expand Down
29 changes: 29 additions & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,35 @@ JL_DLLEXPORT void jl_unlock_profile_wr(void) JL_NOTSAFEPOINT JL_NOTSAFEPOINT_LEA
int jl_lock_stackwalk(void) JL_NOTSAFEPOINT JL_NOTSAFEPOINT_ENTER;
void jl_unlock_stackwalk(int lockret) JL_NOTSAFEPOINT JL_NOTSAFEPOINT_LEAVE;

arraylist_t *jl_get_all_tasks_arraylist(void) JL_NOTSAFEPOINT;
typedef struct {
size_t bt_size;
int tid;
} jl_record_backtrace_result_t;
JL_DLLEXPORT jl_record_backtrace_result_t jl_record_backtrace(jl_task_t *t, struct _jl_bt_element_t *bt_data,
size_t max_bt_size, int all_tasks_profiler) JL_NOTSAFEPOINT;
extern volatile struct _jl_bt_element_t *profile_bt_data_prof;
extern volatile size_t profile_bt_size_max;
extern volatile size_t profile_bt_size_cur;
extern volatile int profile_running;
extern volatile int profile_all_tasks;
// Ensures that we can safely read the `live_tasks`field of every TLS when profiling.
// We want to avoid the case that a GC gets interleaved with `jl_profile_task` and shrinks
// the `live_tasks` array while we are reading it or frees tasks that are being profiled.
// Because of that, this lock must be held in `jl_profile_task` and `sweep_stack_pools_and_mtarraylist_buffers`.
extern uv_mutex_t live_tasks_lock;
// Ensures that we can safely write to `profile_bt_data_prof` and `profile_bt_size_cur`.
// We want to avoid the case that:
// - We start to profile a task very close to the profiling time window end.
// - The profiling time window ends and we start to read the profile data in a compute thread.
// - We write to the profile in a profiler thread while the compute thread is reading it.
// Locking discipline: `bt_data_prof_lock` must be held inside the scope of `live_tasks_lock`.
extern uv_mutex_t bt_data_prof_lock;
#define PROFILE_STATE_THREAD_NOT_SLEEPING (1)
#define PROFILE_STATE_THREAD_SLEEPING (2)
#define PROFILE_STATE_WALL_TIME_PROFILING (3)
void jl_profile_task(void);

// number of cycles since power-on
static inline uint64_t cycleclock(void) JL_NOTSAFEPOINT
{
Expand Down
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ typedef struct _jl_tls_states_t {
int finalizers_inhibited;
jl_gc_tls_states_t gc_tls; // this is very large, and the offset of the first member is baked into codegen
jl_gc_tls_states_common_t gc_tls_common; // common tls for both GCs
small_arraylist_t lazily_freed_mtarraylist_buffers;
volatile sig_atomic_t defer_signal;
_Atomic(struct _jl_task_t*) current_task;
struct _jl_task_t *next_task;
Expand Down
2 changes: 1 addition & 1 deletion src/mtarraylist.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static void mtarraylist_resizeto(small_mtarraylist_t *a, size_t len, size_t newl
a->max = nm;
if (olditems != (void*)&a->_space[0]) {
jl_task_t *ct = jl_current_task;
jl_gc_add_quiescent(ct->ptls, (void**)olditems, free);
small_arraylist_push(&ct->ptls->lazily_freed_mtarraylist_buffers, olditems);
}
}
}
Expand Down
Loading

0 comments on commit f6a38e0

Please sign in to comment.