diff --git a/src/gc.c b/src/gc.c index 5010530622fd5c..446a3cb1998e29 100644 --- a/src/gc.c +++ b/src/gc.c @@ -1652,9 +1652,11 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_ if (parallel_sweep_worthwhile && !page_profile_enabled) { jl_atomic_store(&gc_allocd_scratch, new_gc_allocd_scratch); uv_mutex_lock(&gc_threads_lock); - for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { + int first = gc_first_parallel_collector_thread_id(); + int last = gc_last_parallel_collector_thread_id(); + for (int i = first; i <= last; i++) { jl_ptls_t ptls2 = gc_all_tls_states[i]; - assert(ptls2 != NULL); // should be a GC thread + gc_check_ptls_of_parallel_collector_thread(ptls2); jl_atomic_fetch_add(&ptls2->gc_sweeps_requested, 1); } uv_cond_broadcast(&gc_threads_cond); @@ -1666,9 +1668,11 @@ void gc_sweep_wake_all(jl_ptls_t ptls, jl_gc_padded_page_stack_t *new_gc_allocd_ // collecting a page profile. // wait for all to leave in order to ensure that a straggler doesn't // try to enter sweeping after we set `gc_allocd_scratch` below. - for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { + int first = gc_first_parallel_collector_thread_id(); + int last = gc_last_parallel_collector_thread_id(); + for (int i = first; i <= last; i++) { jl_ptls_t ptls2 = gc_all_tls_states[i]; - assert(ptls2 != NULL); // should be a GC thread + gc_check_ptls_of_parallel_collector_thread(ptls2); while (jl_atomic_load_acquire(&ptls2->gc_sweeps_requested) != 0) { jl_cpu_pause(); } @@ -3009,10 +3013,14 @@ void gc_mark_and_steal(jl_ptls_t ptls) // since we know chunks will likely expand into a lot // of work for the mark loop steal : { + int first = gc_first_parallel_collector_thread_id(); + int last = gc_last_parallel_collector_thread_id(); // Try to steal chunk from random GC thread for (int i = 0; i < 4 * jl_n_markthreads; i++) { - uint32_t v = gc_first_tid + cong(jl_n_markthreads, &ptls->rngseed); - jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; + int v = gc_random_parallel_collector_thread_id(ptls); + jl_ptls_t ptls2 = gc_all_tls_states[v]; + gc_check_ptls_of_parallel_collector_thread(ptls2); + jl_gc_markqueue_t *mq2 = &ptls2->mark_queue; c = gc_chunkqueue_steal_from(mq2); if (c.cid != GC_empty_chunk) { gc_mark_chunk(ptls, mq, &c); @@ -3020,8 +3028,10 @@ void gc_mark_and_steal(jl_ptls_t ptls) } } // Sequentially walk GC threads to try to steal chunk - for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { - jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; + for (int i = first; i <= last; i++) { + jl_ptls_t ptls2 = gc_all_tls_states[i]; + gc_check_ptls_of_parallel_collector_thread(ptls2); + jl_gc_markqueue_t *mq2 = &ptls2->mark_queue; c = gc_chunkqueue_steal_from(mq2); if (c.cid != GC_empty_chunk) { gc_mark_chunk(ptls, mq, &c); @@ -3036,15 +3046,19 @@ void gc_mark_and_steal(jl_ptls_t ptls) } // Try to steal pointer from random GC thread for (int i = 0; i < 4 * jl_n_markthreads; i++) { - uint32_t v = gc_first_tid + cong(jl_n_markthreads, &ptls->rngseed); - jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; + int v = gc_random_parallel_collector_thread_id(ptls); + jl_ptls_t ptls2 = gc_all_tls_states[v]; + gc_check_ptls_of_parallel_collector_thread(ptls2); + jl_gc_markqueue_t *mq2 = &ptls2->mark_queue; new_obj = gc_ptr_queue_steal_from(mq2); if (new_obj != NULL) goto mark; } // Sequentially walk GC threads to try to steal pointer - for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { - jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; + for (int i = first; i <= last; i++) { + jl_ptls_t ptls2 = gc_all_tls_states[i]; + gc_check_ptls_of_parallel_collector_thread(ptls2); + jl_gc_markqueue_t *mq2 = &ptls2->mark_queue; new_obj = gc_ptr_queue_steal_from(mq2); if (new_obj != NULL) goto mark; @@ -3103,12 +3117,13 @@ int gc_should_mark(void) } int tid = jl_atomic_load_relaxed(&gc_master_tid); assert(tid != -1); + assert(gc_all_tls_states != NULL); size_t work = gc_count_work_in_queue(gc_all_tls_states[tid]); - for (tid = gc_first_tid; tid < gc_first_tid + jl_n_markthreads; tid++) { - jl_ptls_t ptls2 = gc_all_tls_states[tid]; - if (ptls2 == NULL) { - continue; - } + int first = gc_first_parallel_collector_thread_id(); + int last = gc_last_parallel_collector_thread_id(); + for (int i = first; i <= last; i++) { + jl_ptls_t ptls2 = gc_all_tls_states[i]; + gc_check_ptls_of_parallel_collector_thread(ptls2); work += gc_count_work_in_queue(ptls2); } // if there is a lot of work left, enter the mark loop @@ -3522,7 +3537,8 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) jl_ptls_t ptls_dest = ptls; jl_gc_markqueue_t *mq_dest = mq; if (!single_threaded_mark) { - ptls_dest = gc_all_tls_states[gc_first_tid + t_i % jl_n_markthreads]; + int dest_tid = gc_ith_parallel_collector_thread_id(t_i % jl_n_markthreads); + ptls_dest = gc_all_tls_states[dest_tid]; mq_dest = &ptls_dest->mark_queue; } if (ptls2 != NULL) { @@ -3787,8 +3803,9 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) ptls2->heap.remset->len = 0; } // free empty GC state for threads that have exited - if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL && - (ptls->tid < gc_first_tid || ptls2->tid >= gc_first_tid + jl_n_gcthreads)) { + if (jl_atomic_load_relaxed(&ptls2->current_task) == NULL) { + if (gc_is_parallel_collector_thread(t_i)) + continue; jl_thread_heap_t *heap = &ptls2->heap; if (heap->weak_refs.len == 0) small_arraylist_free(&heap->weak_refs); diff --git a/src/gc.h b/src/gc.h index 7d78545966ecc6..600c2ec0bdf747 100644 --- a/src/gc.h +++ b/src/gc.h @@ -449,6 +449,54 @@ extern int gc_n_threads; extern jl_ptls_t* gc_all_tls_states; extern gc_heapstatus_t gc_heap_stats; +STATIC_INLINE int gc_first_parallel_collector_thread_id(void) JL_NOTSAFEPOINT +{ + if (jl_n_markthreads == 0) { + return 0; + } + return gc_first_tid; +} + +STATIC_INLINE int gc_last_parallel_collector_thread_id(void) JL_NOTSAFEPOINT +{ + if (jl_n_markthreads == 0) { + return -1; + } + return gc_first_tid + jl_n_markthreads - 1; +} + +STATIC_INLINE int gc_ith_parallel_collector_thread_id(int i) JL_NOTSAFEPOINT +{ + assert(i >= 0 && i < jl_n_markthreads); + return gc_first_tid + i; +} + +STATIC_INLINE int gc_is_parallel_collector_thread(int tid) JL_NOTSAFEPOINT +{ + return tid >= gc_first_tid && tid <= gc_last_parallel_collector_thread_id(); +} + +STATIC_INLINE int gc_random_parallel_collector_thread_id(jl_ptls_t ptls) JL_NOTSAFEPOINT +{ + assert(jl_n_markthreads > 0); + int v = gc_first_tid + (int)cong(jl_n_markthreads - 1, &ptls->rngseed); + assert(v >= gc_first_tid && v <= gc_last_parallel_collector_thread_id()); + return v; +} + +STATIC_INLINE int gc_parallel_collector_threads_enabled(void) JL_NOTSAFEPOINT +{ + return jl_n_markthreads > 0; +} + +STATIC_INLINE void gc_check_ptls_of_parallel_collector_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT +{ + (void)ptls; + assert(gc_parallel_collector_threads_enabled()); + assert(ptls != NULL); + assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD); +} + STATIC_INLINE bigval_t *bigval_header(jl_taggedvalue_t *o) JL_NOTSAFEPOINT { return container_of(o, bigval_t, header); diff --git a/src/julia_threads.h b/src/julia_threads.h index 0064493f1d75c1..41aa4fe6c092e0 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -209,6 +209,10 @@ typedef struct _jl_tls_states_t { #define JL_GC_STATE_SAFE 2 // gc_state = 2 means the thread is running unmanaged code that can be // execute at the same time with the GC. +#define JL_GC_PARALLEL_COLLECTOR_THREAD 3 + // gc_state = 3 means the thread is a parallel collector thread (i.e. never runs Julia code) +#define JL_GC_CONCURRENT_COLLECTOR_THREAD 4 + // gc_state = 4 means the thread is a concurrent collector thread (background sweeper thread that never runs Julia code) _Atomic(int8_t) gc_state; // read from foreign threads // execution of certain certain impure // statements is prohibited from certain @@ -340,6 +344,8 @@ void jl_sigint_safepoint(jl_ptls_t tls); STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state, int8_t old_state) { + assert(old_state != JL_GC_PARALLEL_COLLECTOR_THREAD); + assert(old_state != JL_GC_CONCURRENT_COLLECTOR_THREAD); jl_atomic_store_release(&ptls->gc_state, state); if (state == JL_GC_STATE_UNSAFE || old_state == JL_GC_STATE_UNSAFE) jl_gc_safepoint_(ptls); diff --git a/src/scheduler.c b/src/scheduler.c index 18f42049fdfce6..18adecb937f559 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -136,7 +136,7 @@ void jl_parallel_gc_threadfun(void *arg) JL_GC_PROMISE_ROOTED(ct); (void)jl_atomic_fetch_add_relaxed(&nrunning, -1); // wait for all threads - jl_gc_state_set(ptls, JL_GC_STATE_WAITING, JL_GC_STATE_UNSAFE); + jl_gc_state_set(ptls, JL_GC_PARALLEL_COLLECTOR_THREAD, JL_GC_STATE_UNSAFE); uv_barrier_wait(targ->barrier); // free the thread argument here @@ -148,8 +148,10 @@ void jl_parallel_gc_threadfun(void *arg) uv_cond_wait(&gc_threads_cond, &gc_threads_lock); } uv_mutex_unlock(&gc_threads_lock); + assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD); gc_mark_loop_parallel(ptls, 0); - if (may_sweep(ptls)) { // not an else! + if (may_sweep(ptls)) { + assert(jl_atomic_load_relaxed(&ptls->gc_state) == JL_GC_PARALLEL_COLLECTOR_THREAD); gc_sweep_pool_parallel(ptls); jl_atomic_fetch_add(&ptls->gc_sweeps_requested, -1); } @@ -170,7 +172,7 @@ void jl_concurrent_gc_threadfun(void *arg) JL_GC_PROMISE_ROOTED(ct); (void)jl_atomic_fetch_add_relaxed(&nrunning, -1); // wait for all threads - jl_gc_state_set(ptls, JL_GC_STATE_WAITING, JL_GC_STATE_UNSAFE); + jl_gc_state_set(ptls, JL_GC_CONCURRENT_COLLECTOR_THREAD, JL_GC_STATE_UNSAFE); uv_barrier_wait(targ->barrier); // free the thread argument here