From 07e26a42e113c16191cd79829959bd277bb2df6d Mon Sep 17 00:00:00 2001 From: Diogo Netto <61364108+d-netto@users.noreply.github.com> Date: Tue, 27 Jun 2023 22:25:51 -0300 Subject: [PATCH] started to backport concurrent sweeping --- base/options.jl | 3 +- base/threadingconstructs.jl | 1 + src/gc-pages.c | 10 ++++- src/gc.c | 45 ++++++++++++++----- src/gc.h | 2 + src/init.c | 2 + src/jloptions.c | 29 ++++++++---- src/jloptions.h | 3 +- src/julia.h | 2 + src/partr.c | 34 ++++++++++++-- src/threading.c | 73 +++++++++++++++++++++---------- src/threading.h | 3 +- stdlib/Distributed/src/cluster.jl | 13 +++++- test/cmdlineargs.jl | 7 +++ test/gc.jl | 12 ++--- 15 files changed, 182 insertions(+), 57 deletions(-) diff --git a/base/options.jl b/base/options.jl index fb043672dc19a..a94936391fa8d 100644 --- a/base/options.jl +++ b/base/options.jl @@ -11,7 +11,8 @@ struct JLOptions cpu_target::Ptr{UInt8} nthreadpools::Int16 nthreads::Int16 - ngcthreads::Int16 + nmarkthreads::Int16 + nsweepthreads::Int8 nthreads_per_pool::Ptr{Int16} nprocs::Int32 machine_file::Ptr{UInt8} diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index a951bcd920b04..6832f4428d3ff 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -134,6 +134,7 @@ end Threads.ngcthreads() -> Int Returns the number of GC threads currently configured. +This includes both mark threads and concurrent sweep threads. """ ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1 diff --git a/src/gc-pages.c b/src/gc-pages.c index 06284d1afb8c8..72e18cbb0bd62 100644 --- a/src/gc-pages.c +++ b/src/gc-pages.c @@ -128,6 +128,14 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT #endif jl_gc_pagemeta_t *meta = NULL; + // try to get page from `pool_lazily_freed` + meta = pop_lf_page_metadata_back(&global_page_pool_lazily_freed); + if (meta != NULL) { + gc_alloc_map_set(meta->data, GC_PAGE_ALLOCATED); + // page is already mapped + return meta; + } + // try to get page from `pool_clean` meta = pop_lf_page_metadata_back(&global_page_pool_clean); if (meta != NULL) { @@ -143,7 +151,7 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT } uv_mutex_lock(&gc_perm_lock); - // another thread may have allocated a large block while we're waiting... + // another thread may have allocated a large block while we were waiting... meta = pop_lf_page_metadata_back(&global_page_pool_clean); if (meta != NULL) { uv_mutex_unlock(&gc_perm_lock); diff --git a/src/gc.c b/src/gc.c index 764fec260e0fe..c71d1d35a0bb7 100644 --- a/src/gc.c +++ b/src/gc.c @@ -11,18 +11,23 @@ extern "C" { #endif +// Number of GC threads that may run parallel marking +int jl_n_markthreads; +// Number of GC threads that may run concurrent sweeping (0 or 1) +int jl_n_sweepthreads; +// Number of threads currently running the GC mark-loop +_Atomic(int) gc_n_threads_marking; // `tid` of mutator thread that triggered GC _Atomic(int) gc_master_tid; // `tid` of first GC thread int gc_first_tid; +// To indicate whether concurrent sweeping should run +uv_sem_t gc_sweep_assists_needed; // Mutex/cond used to synchronize sleep/wakeup of GC threads uv_mutex_t gc_threads_lock; uv_cond_t gc_threads_cond; -// Number of threads currently running the GC mark-loop -_Atomic(int) gc_n_threads_marking; - // Linked list of callback functions typedef void (*jl_gc_cb_func_t)(void); @@ -1638,8 +1643,18 @@ static jl_taggedvalue_t **gc_sweep_page(jl_gc_pool_t *p, jl_gc_pagemeta_t **allo push_page_metadata_back(lazily_freed, pg); } else { + #ifdef _P64 // only enable concurrent sweeping on 64bit + if (jl_n_sweepthreads == 0) { + jl_gc_free_page(pg); + push_lf_page_metadata_back(&global_page_pool_freed, pg); + } + else { + push_lf_page_metadata_back(&global_page_pool_lazily_freed, pg); + } + #else jl_gc_free_page(pg); push_lf_page_metadata_back(&global_page_pool_freed, pg); + #endif } gc_time_count_page(freedall, pg_skpd); gc_num.freed += (nfree - old_nfree) * osize; @@ -1762,6 +1777,13 @@ static void gc_sweep_pool(int sweep_full) } } +#ifdef _P64 // only enable concurrent sweeping on 64bit + // wake thread up to sweep concurrently + if (jl_n_sweepthreads > 0) { + uv_sem_post(&gc_sweep_assists_needed); + } +#endif + gc_time_pool_end(sweep_full); } @@ -2808,8 +2830,8 @@ void gc_mark_and_steal(jl_ptls_t ptls) // of work for the mark loop steal : { // Try to steal chunk from random GC thread - for (int i = 0; i < 4 * jl_n_gcthreads; i++) { - uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads; + for (int i = 0; i < 4 * jl_n_markthreads; i++) { + uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads; jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; c = gc_chunkqueue_steal_from(mq2); if (c.cid != GC_empty_chunk) { @@ -2818,7 +2840,7 @@ void gc_mark_and_steal(jl_ptls_t ptls) } } // Sequentially walk GC threads to try to steal chunk - for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) { + for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; c = gc_chunkqueue_steal_from(mq2); if (c.cid != GC_empty_chunk) { @@ -2835,15 +2857,15 @@ void gc_mark_and_steal(jl_ptls_t ptls) } } // Try to steal pointer from random GC thread - for (int i = 0; i < 4 * jl_n_gcthreads; i++) { - uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads; + for (int i = 0; i < 4 * jl_n_markthreads; i++) { + uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads; jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; new_obj = gc_ptr_queue_steal_from(mq2); if (new_obj != NULL) goto mark; } // Sequentially walk GC threads to try to steal pointer - for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) { + for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; new_obj = gc_ptr_queue_steal_from(mq2); if (new_obj != NULL) @@ -2885,7 +2907,7 @@ void gc_mark_loop_parallel(jl_ptls_t ptls, int master) void gc_mark_loop(jl_ptls_t ptls) { - if (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled) { + if (jl_n_markthreads == 0 || gc_heap_snapshot_enabled) { gc_mark_loop_serial(ptls); } else { @@ -3202,7 +3224,7 @@ static int _jl_gc_collect(jl_ptls_t ptls, jl_gc_collection_t collection) jl_gc_markqueue_t *mq2 = mq; jl_ptls_t ptls_gc_thread = NULL; if (!single_threaded) { - ptls_gc_thread = gc_all_tls_states[gc_first_tid + t_i % jl_n_gcthreads]; + ptls_gc_thread = gc_all_tls_states[gc_first_tid + t_i % jl_n_markthreads]; mq2 = &ptls_gc_thread->mark_queue; } if (ptls2 != NULL) { @@ -3636,6 +3658,7 @@ void jl_gc_init(void) uv_mutex_init(&gc_perm_lock); uv_mutex_init(&gc_threads_lock); uv_cond_init(&gc_threads_cond); + uv_sem_init(&gc_sweep_assists_needed, 0); jl_gc_init_page(); jl_gc_debug_init(); diff --git a/src/gc.h b/src/gc.h index e66e88a2d80e6..8a78d77660d93 100644 --- a/src/gc.h +++ b/src/gc.h @@ -187,6 +187,7 @@ typedef struct { _Atomic(jl_gc_pagemeta_t *) page_metadata_back; } jl_gc_global_page_pool_t; +extern jl_gc_global_page_pool_t global_page_pool_lazily_freed; extern jl_gc_global_page_pool_t global_page_pool_clean; extern jl_gc_global_page_pool_t global_page_pool_freed; @@ -438,6 +439,7 @@ STATIC_INLINE void gc_big_object_link(bigval_t *hdr, bigval_t **list) JL_NOTSAFE *list = hdr; } +extern uv_sem_t gc_sweep_assists_needed; extern _Atomic(int) gc_n_threads_marking; void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq); void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT; diff --git a/src/init.c b/src/init.c index 0ab6d59e6ec6a..07e97ff39b7ff 100644 --- a/src/init.c +++ b/src/init.c @@ -840,6 +840,8 @@ static NOINLINE void _finish_julia_init(JL_IMAGE_SEARCH rel, jl_ptls_t ptls, jl_ if (jl_base_module == NULL) { // nthreads > 1 requires code in Base jl_atomic_store_relaxed(&jl_n_threads, 1); + jl_n_markthreads = 0; + jl_n_sweepthreads = 0; jl_n_gcthreads = 0; } jl_start_threads(); diff --git a/src/jloptions.c b/src/jloptions.c index f325452e19e41..1a5654f29e2c2 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -40,7 +40,8 @@ JL_DLLEXPORT void jl_init_options(void) NULL, // cpu_target ("native", "core2", etc...) 0, // nthreadpools 0, // nthreads - 0, // ngcthreads + 0, // nmarkthreads + 0, // nsweepthreads NULL, // nthreads_per_pool 0, // nprocs NULL, // machine_file @@ -130,7 +131,8 @@ static const char opts[] = " interface if supported (Linux and Windows) or to the number of CPU\n" " threads if not supported (MacOS) or if process affinity is not\n" " configured, and sets M to 1.\n" - " --gcthreads=N Use N threads for GC, set to half of the number of compute threads if unspecified.\n" + " --gcthreads=M[,N] Use M threads for the mark phase of GC and N (0 or 1) threads for the concurrent sweeping phase of GC.\n" + " M is set to half of the number of compute threads and N is set to 0 if unspecified.\n" " -p, --procs {N|auto} Integer value N launches N additional local worker processes\n" " \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n" " --machine-file Run processes on hosts listed in \n\n" @@ -822,6 +824,22 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) if (jl_options.heap_size_hint == 0) jl_errorf("julia: invalid argument to --heap-size-hint without memory size specified"); + break; + case opt_gc_threads: + errno = 0; + long nmarkthreads = strtol(optarg, &endptr, 10); + if (errno != 0 || optarg == endptr || nmarkthreads < 1 || nmarkthreads >= INT16_MAX) { + jl_errorf("julia: --gcthreads=[,]; n must be an integer >= 1"); + } + jl_options.nmarkthreads = (int16_t)nmarkthreads; + if (*endptr == ',') { + errno = 0; + char *endptri; + long nsweepthreads = strtol(&endptr[1], &endptri, 10); + if (errno != 0 || endptri == &endptr[1] || *endptri != 0 || nsweepthreads < 0 || nsweepthreads > 1) + jl_errorf("julia: --gcthreads=,; n must be 0 or 1"); + jl_options.nsweepthreads = (int8_t)nsweepthreads; + } break; case opt_permalloc_pkgimg: if (!strcmp(optarg,"yes")) @@ -831,13 +849,6 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) else jl_errorf("julia: invalid argument to --permalloc-pkgimg={yes|no} (%s)", optarg); break; - case opt_gc_threads: - errno = 0; - long ngcthreads = strtol(optarg, &endptr, 10); - if (errno != 0 || optarg == endptr || *endptr != 0 || ngcthreads < 1 || ngcthreads >= INT16_MAX) - jl_errorf("julia: --gcthreads=; n must be an integer >= 1"); - jl_options.ngcthreads = (int16_t)ngcthreads; - break; default: jl_errorf("julia: unhandled option -- %c\n" "This is a bug, please report it.", c); diff --git a/src/jloptions.h b/src/jloptions.h index 93f6d321f38d6..8649c405112d7 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -15,7 +15,8 @@ typedef struct { const char *cpu_target; int8_t nthreadpools; int16_t nthreads; - int16_t ngcthreads; + int16_t nmarkthreads; + int8_t nsweepthreads; const int16_t *nthreads_per_pool; int32_t nprocs; const char *machine_file; diff --git a/src/julia.h b/src/julia.h index caebdf450ed75..c717ca5bcb0a8 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1685,6 +1685,8 @@ JL_DLLEXPORT jl_value_t *jl_get_libllvm(void) JL_NOTSAFEPOINT; extern JL_DLLIMPORT int jl_n_threadpools; extern JL_DLLIMPORT _Atomic(int) jl_n_threads; extern JL_DLLIMPORT int jl_n_gcthreads; +extern int jl_n_markthreads; +extern int jl_n_sweepthreads; extern JL_DLLIMPORT int *jl_n_threads_per_pool; // environment entries diff --git a/src/partr.c b/src/partr.c index 0194cd6a4b31e..d83d094caa767 100644 --- a/src/partr.c +++ b/src/partr.c @@ -113,13 +113,13 @@ extern uv_cond_t gc_threads_cond; extern _Atomic(int) gc_n_threads_marking; extern void gc_mark_loop_parallel(jl_ptls_t ptls, int master); -static int may_mark(void) JL_NOTSAFEPOINT +static inline int may_mark(void) JL_NOTSAFEPOINT { return (jl_atomic_load(&gc_n_threads_marking) > 0); } -// gc thread function -void jl_gc_threadfun(void *arg) +// gc thread mark function +void jl_gc_mark_threadfun(void *arg) { jl_threadarg_t *targ = (jl_threadarg_t*)arg; @@ -143,6 +143,34 @@ void jl_gc_threadfun(void *arg) } } +// gc thread sweep function +void jl_gc_sweep_threadfun(void *arg) +{ + jl_threadarg_t *targ = (jl_threadarg_t*)arg; + + // initialize this thread (set tid and create heap) + jl_ptls_t ptls = jl_init_threadtls(targ->tid); + + // wait for all threads + jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0); + uv_barrier_wait(targ->barrier); + + // free the thread argument here + free(targ); + + while (1) { + uv_sem_wait(&gc_sweep_assists_needed); + while (1) { + jl_gc_pagemeta_t *pg = pop_lf_page_metadata_back(&global_page_pool_lazily_freed); + if (pg == NULL) { + break; + } + jl_gc_free_page(pg); + push_lf_page_metadata_back(&global_page_pool_freed, pg); + } + } +} + // thread function: used by all mutator threads except the main thread void jl_threadfun(void *arg) { diff --git a/src/threading.c b/src/threading.c index 162141dce8aeb..d9788d77d67bd 100644 --- a/src/threading.c +++ b/src/threading.c @@ -562,6 +562,8 @@ static void jl_check_tls(void) JL_DLLEXPORT const int jl_tls_elf_support = 0; #endif +extern int jl_n_markthreads; +extern int jl_n_sweepthreads; extern int gc_first_tid; // interface to Julia; sets up to make the runtime thread-safe @@ -616,32 +618,54 @@ void jl_init_threading(void) } } - int16_t ngcthreads = jl_options.ngcthreads - 1; - if (ngcthreads == -1 && - (cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified - - ngcthreads = (uint64_t)strtol(cp, NULL, 10) - 1; - } - if (ngcthreads == -1) { - #if 0 - // if `--gcthreads` was not specified, set the number of GC threads - // to half of compute threads - if (nthreads <= 1) { - ngcthreads = 0; - } - else { - ngcthreads = (nthreads / 2) - 1; - } - #endif - // if `--gcthreads` was not specified, set the number of GC threads - // to the number of compute threads - if (nthreads <= 1) { - ngcthreads = 0; + jl_n_markthreads = jl_options.nmarkthreads - 1; + jl_n_sweepthreads = jl_options.nsweepthreads; + if (jl_n_markthreads == -1) { // --gcthreads not specified + if ((cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified + errno = 0; + jl_n_markthreads = (uint64_t)strtol(cp, &endptr, 10) - 1; + if (errno != 0 || endptr == cp || nthreads <= 0) + jl_n_markthreads = 0; + cp = endptr; + if (*cp == ',') { + cp++; + errno = 0; + jl_n_sweepthreads = strtol(cp, &endptri, 10); + if (errno != 0 || endptri == cp || jl_n_sweepthreads < 0) { + jl_n_sweepthreads = 0; + } + } } else { - ngcthreads = nthreads - 1; + #if 0 + // if `--gcthreads` or ENV[NUM_GCTHREADS_NAME] was not specified, + // set the number of mark threads to half of compute threads + // and number of sweep threads to 0 + if (nthreads <= 1) { + jl_n_markthreads = 0; + } + else { + jl_n_markthreads = (nthreads / 2) - 1; + } + #endif + // if `--gcthreads` was not specified, set the number of GC threads + // to the number of compute threads + if (nthreads <= 1) { + jl_n_markthreads = 0; + } + else { + jl_n_markthreads = nthreads -1; + } } } +#if 1 + // enable concurrent sweeping if parallel marking is enabled + // on multithreaded code + if (jl_n_markthreads > 0 && nthreads > 1) { + jl_n_sweepthreads = 1; + } +#endif + int16_t ngcthreads = jl_n_markthreads + jl_n_sweepthreads; jl_all_tls_states_size = nthreads + nthreadsi + ngcthreads; jl_n_threads_per_pool = (int*)malloc_s(2 * sizeof(int)); @@ -707,8 +731,11 @@ void jl_start_threads(void) mask[i] = 0; } } + else if (i == nthreads - 1 && jl_n_sweepthreads == 1) { + uv_thread_create(&uvtid, jl_gc_sweep_threadfun, t); + } else { - uv_thread_create(&uvtid, jl_gc_threadfun, t); + uv_thread_create(&uvtid, jl_gc_mark_threadfun, t); } uv_thread_detach(&uvtid); } diff --git a/src/threading.h b/src/threading.h index 1cf2d4a9d3711..52c06830a1c65 100644 --- a/src/threading.h +++ b/src/threading.h @@ -25,7 +25,8 @@ jl_ptls_t jl_init_threadtls(int16_t tid); // provided by a threading infrastructure void jl_init_threadinginfra(void); -void jl_gc_threadfun(void *arg); +void jl_gc_mark_threadfun(void *arg); +void jl_gc_sweep_threadfun(void *arg); void jl_threadfun(void *arg); #ifdef __cplusplus diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index d8039ba160d66..d8cc052967d50 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -1331,6 +1331,14 @@ function get_threads_spec(opts) end end +function get_gcthreads_spec(opts) + if opts.nmarkthreads > 0 || opts.nsweepthreads > 0 + `--gcthreads=$(opts.nmarkthreads),$(opts.nsweepthreads)` + else + `` + end +end + # Starts workers specified by (-n|--procs) and --machine-file command line options function process_opts(opts) # startup worker. @@ -1345,8 +1353,9 @@ function process_opts(opts) end # Propagate --threads to workers - threads = opts.nthreads > 0 ? `--threads=$(opts.nthreads)` : `` - gcthreads = opts.ngcthreads > 0 ? `--gcthreads=$(opts.ngcthreads)` : `` + threads = get_threads_spec(opts) + # Propagate --gcthreads to workers + gcthreads = get_gcthreads_spec(opts) exeflags = `$threads $gcthreads` diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index f73a7854fd2f1..b0fd30028b2ce 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -281,12 +281,19 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` withenv("JULIA_NUM_GC_THREADS" => nt) do @test read(`$exename --gcthreads=2 -e $code`, String) == "2" end + withenv("JULIA_NUM_GC_THREADS" => nt) do + @test read(`$exename --gcthreads=2,1 -e $code`, String) == "3" + end end withenv("JULIA_NUM_GC_THREADS" => 2) do @test read(`$exename -e $code`, String) == "2" end + withenv("JULIA_NUM_GC_THREADS" => "2,1") do + @test read(`$exename -e $code`, String) == "3" + end + # --machine-file # this does not check that machine file works, # only that the filename gets correctly passed to the option struct diff --git a/test/gc.jl b/test/gc.jl index 9cc9d753dfc09..54df172e9f3f8 100644 --- a/test/gc.jl +++ b/test/gc.jl @@ -4,11 +4,13 @@ using Test function run_gctest(file) let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no $file` - for test_nthreads in (1, 2, 4) - new_env = copy(ENV) - new_env["JULIA_NUM_THREADS"] = string(test_nthreads) - new_env["JULIA_NUM_GC_THREADS"] = string(test_nthreads) - @time run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) + @testset for test_nthreads in (1, 2, 4) + @testset for concurrent_sweep in (0, 1) + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = string(test_nthreads) + new_env["JULIA_NUM_GC_THREADS"] = "$(test_nthreads),$(concurrent_sweep)" + @test success(run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))) + end end end end