Skip to content

Commit

Permalink
started to backport concurrent sweeping
Browse files Browse the repository at this point in the history
  • Loading branch information
d-netto committed Aug 25, 2023
1 parent 196c8e1 commit 6326343
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 57 deletions.
3 changes: 2 additions & 1 deletion base/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ struct JLOptions
cpu_target::Ptr{UInt8}
nthreadpools::Int16
nthreads::Int16
ngcthreads::Int16
nmarkthreads::Int16
nsweepthreads::Int8
nthreads_per_pool::Ptr{Int16}
nprocs::Int32
machine_file::Ptr{UInt8}
Expand Down
1 change: 1 addition & 0 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ end
Threads.ngcthreads() -> Int
Returns the number of GC threads currently configured.
This includes both mark threads and concurrent sweep threads.
"""
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1

Expand Down
10 changes: 9 additions & 1 deletion src/gc-pages.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT
#endif
jl_gc_pagemeta_t *meta = NULL;

// try to get page from `pool_lazily_freed`
meta = pop_lf_page_metadata_back(&global_page_pool_lazily_freed);
if (meta != NULL) {
gc_alloc_map_set(meta->data, GC_PAGE_ALLOCATED);
// page is already mapped
return meta;
}

// try to get page from `pool_clean`
meta = pop_lf_page_metadata_back(&global_page_pool_clean);
if (meta != NULL) {
Expand All @@ -139,7 +147,7 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT
}

uv_mutex_lock(&gc_perm_lock);
// another thread may have allocated a large block while we're waiting...
// another thread may have allocated a large block while we were waiting...
meta = pop_lf_page_metadata_back(&global_page_pool_clean);
if (meta != NULL) {
uv_mutex_unlock(&gc_perm_lock);
Expand Down
45 changes: 34 additions & 11 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@
extern "C" {
#endif

// Number of GC threads that may run parallel marking
int jl_n_markthreads;
// Number of GC threads that may run concurrent sweeping (0 or 1)
int jl_n_sweepthreads;
// Number of threads currently running the GC mark-loop
_Atomic(int) gc_n_threads_marking;
// `tid` of mutator thread that triggered GC
_Atomic(int) gc_master_tid;
// `tid` of first GC thread
int gc_first_tid;
// To indicate whether concurrent sweeping should run
uv_sem_t gc_sweep_assists_needed;

// Mutex/cond used to synchronize sleep/wakeup of GC threads
uv_mutex_t gc_threads_lock;
uv_cond_t gc_threads_cond;

// Number of threads currently running the GC mark-loop
_Atomic(int) gc_n_threads_marking;

// Linked list of callback functions

typedef void (*jl_gc_cb_func_t)(void);
Expand Down Expand Up @@ -1548,8 +1553,18 @@ static jl_taggedvalue_t **gc_sweep_page(jl_gc_pool_t *p, jl_gc_pagemeta_t **allo
push_page_metadata_back(lazily_freed, pg);
}
else {
#ifdef _P64 // only enable concurrent sweeping on 64bit
if (jl_n_sweepthreads == 0) {
jl_gc_free_page(pg);
push_lf_page_metadata_back(&global_page_pool_freed, pg);
}
else {
push_lf_page_metadata_back(&global_page_pool_lazily_freed, pg);
}
#else
jl_gc_free_page(pg);
push_lf_page_metadata_back(&global_page_pool_freed, pg);
#endif
}
gc_time_count_page(freedall, pg_skpd);
gc_num.freed += (nfree - old_nfree) * osize;
Expand Down Expand Up @@ -1672,6 +1687,13 @@ static void gc_sweep_pool(int sweep_full)
}
}

#ifdef _P64 // only enable concurrent sweeping on 64bit
// wake thread up to sweep concurrently
if (jl_n_sweepthreads > 0) {
uv_sem_post(&gc_sweep_assists_needed);
}
#endif

gc_time_pool_end(sweep_full);
}

Expand Down Expand Up @@ -2718,8 +2740,8 @@ void gc_mark_and_steal(jl_ptls_t ptls)
// of work for the mark loop
steal : {
// Try to steal chunk from random GC thread
for (int i = 0; i < 4 * jl_n_gcthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads;
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
c = gc_chunkqueue_steal_from(mq2);
if (c.cid != GC_empty_chunk) {
Expand All @@ -2728,7 +2750,7 @@ void gc_mark_and_steal(jl_ptls_t ptls)
}
}
// Sequentially walk GC threads to try to steal chunk
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
c = gc_chunkqueue_steal_from(mq2);
if (c.cid != GC_empty_chunk) {
Expand All @@ -2745,15 +2767,15 @@ void gc_mark_and_steal(jl_ptls_t ptls)
}
}
// Try to steal pointer from random GC thread
for (int i = 0; i < 4 * jl_n_gcthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads;
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
new_obj = gc_ptr_queue_steal_from(mq2);
if (new_obj != NULL)
goto mark;
}
// Sequentially walk GC threads to try to steal pointer
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
new_obj = gc_ptr_queue_steal_from(mq2);
if (new_obj != NULL)
Expand Down Expand Up @@ -2795,7 +2817,7 @@ void gc_mark_loop_parallel(jl_ptls_t ptls, int master)

void gc_mark_loop(jl_ptls_t ptls)
{
if (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled) {
if (jl_n_markthreads == 0 || gc_heap_snapshot_enabled) {
gc_mark_loop_serial(ptls);
}
else {
Expand Down Expand Up @@ -3114,7 +3136,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection)
jl_gc_markqueue_t *mq2 = mq;
jl_ptls_t ptls_gc_thread = NULL;
if (!single_threaded) {
ptls_gc_thread = gc_all_tls_states[gc_first_tid + t_i % jl_n_gcthreads];
ptls_gc_thread = gc_all_tls_states[gc_first_tid + t_i % jl_n_markthreads];
mq2 = &ptls_gc_thread->mark_queue;
}
if (ptls2 != NULL) {
Expand Down Expand Up @@ -3548,6 +3570,7 @@ void jl_gc_init(void)
uv_mutex_init(&gc_perm_lock);
uv_mutex_init(&gc_threads_lock);
uv_cond_init(&gc_threads_cond);
uv_sem_init(&gc_sweep_assists_needed, 0);

jl_gc_init_page();
jl_gc_debug_init();
Expand Down
2 changes: 2 additions & 0 deletions src/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ typedef struct {
_Atomic(jl_gc_pagemeta_t *) page_metadata_back;
} jl_gc_global_page_pool_t;

extern jl_gc_global_page_pool_t global_page_pool_lazily_freed;
extern jl_gc_global_page_pool_t global_page_pool_clean;
extern jl_gc_global_page_pool_t global_page_pool_freed;

Expand Down Expand Up @@ -434,6 +435,7 @@ STATIC_INLINE void gc_big_object_link(bigval_t *hdr, bigval_t **list) JL_NOTSAFE
*list = hdr;
}

extern uv_sem_t gc_sweep_assists_needed;
extern _Atomic(int) gc_n_threads_marking;
void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq);
void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT;
Expand Down
2 changes: 2 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_
if (jl_base_module == NULL) {
// nthreads > 1 requires code in Base
jl_atomic_store_relaxed(&jl_n_threads, 1);
jl_n_markthreads = 0;
jl_n_sweepthreads = 0;
jl_n_gcthreads = 0;
}
jl_start_threads();
Expand Down
29 changes: 20 additions & 9 deletions src/jloptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ JL_DLLEXPORT void jl_init_options(void)
NULL, // cpu_target ("native", "core2", etc...)
0, // nthreadpools
0, // nthreads
0, // ngcthreads
0, // nmarkthreads
0, // nsweepthreads
NULL, // nthreads_per_pool
0, // nprocs
NULL, // machine_file
Expand Down Expand Up @@ -130,7 +131,8 @@ static const char opts[] =
" interface if supported (Linux and Windows) or to the number of CPU\n"
" threads if not supported (MacOS) or if process affinity is not\n"
" configured, and sets M to 1.\n"
" --gcthreads=N Use N threads for GC, set to half of the number of compute threads if unspecified.\n"
" --gcthreads=M[,N] Use M threads for the mark phase of GC and N (0 or 1) threads for the concurrent sweeping phase of GC.\n"
" M is set to half of the number of compute threads and N is set to 0 if unspecified.\n"
" -p, --procs {N|auto} Integer value N launches N additional local worker processes\n"
" \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n"
" --machine-file <file> Run processes on hosts listed in <file>\n\n"
Expand Down Expand Up @@ -822,6 +824,22 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
if (jl_options.heap_size_hint == 0)
jl_errorf("julia: invalid argument to --heap-size-hint without memory size specified");

break;
case opt_gc_threads:
errno = 0;
long nmarkthreads = strtol(optarg, &endptr, 10);
if (errno != 0 || optarg == endptr || nmarkthreads < 1 || nmarkthreads >= INT16_MAX) {
jl_errorf("julia: --gcthreads=<n>[,<m>]; n must be an integer >= 1");
}
jl_options.nmarkthreads = (int16_t)nmarkthreads;
if (*endptr == ',') {
errno = 0;
char *endptri;
long nsweepthreads = strtol(&endptr[1], &endptri, 10);
if (errno != 0 || endptri == &endptr[1] || *endptri != 0 || nsweepthreads < 0 || nsweepthreads > 1)
jl_errorf("julia: --gcthreads=<n>,<m>; n must be 0 or 1");
jl_options.nsweepthreads = (int8_t)nsweepthreads;
}
break;
case opt_permalloc_pkgimg:
if (!strcmp(optarg,"yes"))
Expand All @@ -831,13 +849,6 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
else
jl_errorf("julia: invalid argument to --permalloc-pkgimg={yes|no} (%s)", optarg);
break;
case opt_gc_threads:
errno = 0;
long ngcthreads = strtol(optarg, &endptr, 10);
if (errno != 0 || optarg == endptr || *endptr != 0 || ngcthreads < 1 || ngcthreads >= INT16_MAX)
jl_errorf("julia: --gcthreads=<n>; n must be an integer >= 1");
jl_options.ngcthreads = (int16_t)ngcthreads;
break;
default:
jl_errorf("julia: unhandled option -- %c\n"
"This is a bug, please report it.", c);
Expand Down
3 changes: 2 additions & 1 deletion src/jloptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ typedef struct {
const char *cpu_target;
int8_t nthreadpools;
int16_t nthreads;
int16_t ngcthreads;
int16_t nmarkthreads;
int8_t nsweepthreads;
const int16_t *nthreads_per_pool;
int32_t nprocs;
const char *machine_file;
Expand Down
2 changes: 2 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,8 @@ JL_DLLEXPORT jl_value_t *jl_get_libllvm(void) JL_NOTSAFEPOINT;
extern JL_DLLIMPORT int jl_n_threadpools;
extern JL_DLLIMPORT _Atomic(int) jl_n_threads;
extern JL_DLLIMPORT int jl_n_gcthreads;
extern int jl_n_markthreads;
extern int jl_n_sweepthreads;
extern JL_DLLIMPORT int *jl_n_threads_per_pool;

// environment entries
Expand Down
34 changes: 31 additions & 3 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ extern uv_cond_t gc_threads_cond;
extern _Atomic(int) gc_n_threads_marking;
extern void gc_mark_loop_parallel(jl_ptls_t ptls, int master);

static int may_mark(void) JL_NOTSAFEPOINT
static inline int may_mark(void) JL_NOTSAFEPOINT
{
return (jl_atomic_load(&gc_n_threads_marking) > 0);
}

// gc thread function
void jl_gc_threadfun(void *arg)
// gc thread mark function
void jl_gc_mark_threadfun(void *arg)
{
jl_threadarg_t *targ = (jl_threadarg_t*)arg;

Expand All @@ -143,6 +143,34 @@ void jl_gc_threadfun(void *arg)
}
}

// gc thread sweep function
void jl_gc_sweep_threadfun(void *arg)
{
jl_threadarg_t *targ = (jl_threadarg_t*)arg;

// initialize this thread (set tid and create heap)
jl_ptls_t ptls = jl_init_threadtls(targ->tid);

// wait for all threads
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
uv_barrier_wait(targ->barrier);

// free the thread argument here
free(targ);

while (1) {
uv_sem_wait(&gc_sweep_assists_needed);
while (1) {
jl_gc_pagemeta_t *pg = pop_lf_page_metadata_back(&global_page_pool_lazily_freed);
if (pg == NULL) {
break;
}
jl_gc_free_page(pg);
push_lf_page_metadata_back(&global_page_pool_freed, pg);
}
}
}

// thread function: used by all mutator threads except the main thread
void jl_threadfun(void *arg)
{
Expand Down
Loading

0 comments on commit 6326343

Please sign in to comment.