diff --git a/base/options.jl b/base/options.jl index fb043672dc19ac..a94936391fa8da 100644 --- a/base/options.jl +++ b/base/options.jl @@ -11,7 +11,8 @@ struct JLOptions cpu_target::Ptr{UInt8} nthreadpools::Int16 nthreads::Int16 - ngcthreads::Int16 + nmarkthreads::Int16 + nsweepthreads::Int8 nthreads_per_pool::Ptr{Int16} nprocs::Int32 machine_file::Ptr{UInt8} diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index a951bcd920b04e..6832f4428d3ff5 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -134,6 +134,7 @@ end Threads.ngcthreads() -> Int Returns the number of GC threads currently configured. +This includes both mark threads and concurrent sweep threads. """ ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1 diff --git a/src/gc-pages.c b/src/gc-pages.c index 6358c9ca6f9f72..39d841d5f5c02f 100644 --- a/src/gc-pages.c +++ b/src/gc-pages.c @@ -118,6 +118,14 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT #endif jl_gc_pagemeta_t *meta = NULL; + // try to get page from `pool_lazily_freed` + meta = pop_lf_page_metadata_back(&global_page_pool_lazily_freed); + if (meta != NULL) { + gc_alloc_map_set(meta->data, 1); + // page is already mapped + return meta; + } + // try to get page from `pool_clean` meta = pop_lf_page_metadata_back(&global_page_pool_clean); if (meta != NULL) { @@ -133,7 +141,7 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT } uv_mutex_lock(&gc_perm_lock); - // another thread may have allocated a large block while we're waiting... + // another thread may have allocated a large block while we were waiting... meta = pop_lf_page_metadata_back(&global_page_pool_clean); if (meta != NULL) { uv_mutex_unlock(&gc_perm_lock); diff --git a/src/gc.c b/src/gc.c index 3e0140d6930c79..d8cec601745d47 100644 --- a/src/gc.c +++ b/src/gc.c @@ -11,10 +11,18 @@ extern "C" { #endif +// Number of GC threads that may run parallel marking +int jl_n_markthreads; +// Number of GC threads that may run concurrent sweeping (0 or 1) +int jl_n_sweepthreads; +// Number of threads currently running the GC mark-loop +_Atomic(int) gc_n_threads_marking; // `tid` of mutator thread that triggered GC _Atomic(int) gc_master_tid; // `tid` of first GC thread int gc_first_tid; +// To indicate whether concurrent sweeping should run +uv_sem_t gc_sweep_assists_needed; // Mutex/cond used to synchronize sleep/wakeup of GC threads uv_mutex_t gc_threads_lock; @@ -1548,8 +1556,18 @@ static jl_taggedvalue_t **gc_sweep_page(jl_gc_pool_t *p, jl_gc_pagemeta_t **allo push_page_metadata_back(lazily_freed, pg); } else { + #ifdef _P64 // only enable concurrent sweeping on 64bit + if (jl_n_sweepthreads == 0) { + jl_gc_free_page(pg); + push_lf_page_metadata_back(&global_page_pool_freed, pg); + } + else { + push_lf_page_metadata_back(&global_page_pool_lazily_freed, pg); + } + #else jl_gc_free_page(pg); push_lf_page_metadata_back(&global_page_pool_freed, pg); + #endif } gc_time_count_page(freedall, pg_skpd); gc_num.freed += (nfree - old_nfree) * osize; @@ -1672,6 +1690,13 @@ static void gc_sweep_pool(int sweep_full) } } +#ifdef _P64 // only enable concurrent sweeping on 64bit + // wake thread up to sweep concurrently + if (jl_n_sweepthreads > 0) { + uv_sem_post(&gc_sweep_assists_needed); + } +#endif + gc_time_pool_end(sweep_full); } @@ -2718,8 +2743,8 @@ void gc_mark_and_steal(jl_ptls_t ptls) // of work for the mark loop steal : { // Try to steal chunk from random GC thread - for (int i = 0; i < 4 * jl_n_gcthreads; i++) { - uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads; + for (int i = 0; i < 4 * jl_n_markthreads; i++) { + uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads; jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; c = gc_chunkqueue_steal_from(mq2); if (c.cid != GC_empty_chunk) { @@ -2728,7 +2753,7 @@ void gc_mark_and_steal(jl_ptls_t ptls) } } // Sequentially walk GC threads to try to steal chunk - for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) { + for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; c = gc_chunkqueue_steal_from(mq2); if (c.cid != GC_empty_chunk) { @@ -2745,15 +2770,15 @@ void gc_mark_and_steal(jl_ptls_t ptls) } } // Try to steal pointer from random GC thread - for (int i = 0; i < 4 * jl_n_gcthreads; i++) { - uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads; + for (int i = 0; i < 4 * jl_n_markthreads; i++) { + uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads; jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue; new_obj = gc_ptr_queue_steal_from(mq2); if (new_obj != NULL) goto mark; } // Sequentially walk GC threads to try to steal pointer - for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) { + for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) { jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue; new_obj = gc_ptr_queue_steal_from(mq2); if (new_obj != NULL) @@ -2795,7 +2820,7 @@ void gc_mark_loop_parallel(jl_ptls_t ptls, int master) void gc_mark_loop(jl_ptls_t ptls) { - if (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled) { + if (jl_n_markthreads == 0 || gc_heap_snapshot_enabled) { gc_mark_loop_serial(ptls); } else { @@ -3548,6 +3573,7 @@ void jl_gc_init(void) uv_mutex_init(&gc_perm_lock); uv_mutex_init(&gc_threads_lock); uv_cond_init(&gc_threads_cond); + uv_sem_init(&gc_sweep_assists_needed, 0); jl_gc_init_page(); jl_gc_debug_init(); diff --git a/src/gc.h b/src/gc.h index 3d62614a38da19..be10ed603dfdfe 100644 --- a/src/gc.h +++ b/src/gc.h @@ -183,6 +183,7 @@ typedef struct { _Atomic(jl_gc_pagemeta_t *) page_metadata_back; } jl_gc_global_page_pool_t; +extern jl_gc_global_page_pool_t global_page_pool_lazily_freed; extern jl_gc_global_page_pool_t global_page_pool_clean; extern jl_gc_global_page_pool_t global_page_pool_freed; @@ -434,6 +435,7 @@ STATIC_INLINE void gc_big_object_link(bigval_t *hdr, bigval_t **list) JL_NOTSAFE *list = hdr; } +extern uv_sem_t gc_sweep_assists_needed; extern _Atomic(int) gc_n_threads_marking; void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq); void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT; diff --git a/src/jloptions.c b/src/jloptions.c index f325452e19e41f..b66234d9e86fa9 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -40,7 +40,8 @@ JL_DLLEXPORT void jl_init_options(void) NULL, // cpu_target ("native", "core2", etc...) 0, // nthreadpools 0, // nthreads - 0, // ngcthreads + 0, // nmarkthreads + 0, // nsweepthreads NULL, // nthreads_per_pool 0, // nprocs NULL, // machine_file @@ -130,7 +131,8 @@ static const char opts[] = " interface if supported (Linux and Windows) or to the number of CPU\n" " threads if not supported (MacOS) or if process affinity is not\n" " configured, and sets M to 1.\n" - " --gcthreads=N Use N threads for GC, set to half of the number of compute threads if unspecified.\n" + " --gcthreads=N[,M] Use N threads for the mark phase of GC and M (0 or 1) threads for the concurrent sweeping phase of GC.\n" + " N is set to half of the number of compute threads and M is set to 0 if unspecified.\n" " -p, --procs {N|auto} Integer value N launches N additional local worker processes\n" " \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n" " --machine-file Run processes on hosts listed in \n\n" @@ -822,6 +824,22 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) if (jl_options.heap_size_hint == 0) jl_errorf("julia: invalid argument to --heap-size-hint without memory size specified"); + break; + case opt_gc_threads: + errno = 0; + long nmarkthreads = strtol(optarg, &endptr, 10); + if (errno != 0 || optarg == endptr || nmarkthreads < 1 || nmarkthreads >= INT16_MAX) { + jl_errorf("julia: --gcthreads=[,]; n must be an integer >= 1"); + } + jl_options.nmarkthreads = (int16_t)nmarkthreads; + if (*endptr == ',') { + errno = 0; + char *endptri; + long nsweepthreads = strtol(&endptr[1], &endptri, 10); + if (errno != 0 || endptri == &endptr[1] || *endptri != 0 || nsweepthreads < 0 || nsweepthreads > 1) + jl_errorf("julia: --gcthreads=,; m must be 0 or 1"); + jl_options.nsweepthreads = (int8_t)nsweepthreads; + } break; case opt_permalloc_pkgimg: if (!strcmp(optarg,"yes")) @@ -831,13 +849,6 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) else jl_errorf("julia: invalid argument to --permalloc-pkgimg={yes|no} (%s)", optarg); break; - case opt_gc_threads: - errno = 0; - long ngcthreads = strtol(optarg, &endptr, 10); - if (errno != 0 || optarg == endptr || *endptr != 0 || ngcthreads < 1 || ngcthreads >= INT16_MAX) - jl_errorf("julia: --gcthreads=; n must be an integer >= 1"); - jl_options.ngcthreads = (int16_t)ngcthreads; - break; default: jl_errorf("julia: unhandled option -- %c\n" "This is a bug, please report it.", c); diff --git a/src/jloptions.h b/src/jloptions.h index 93f6d321f38d6c..8649c405112d71 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -15,7 +15,8 @@ typedef struct { const char *cpu_target; int8_t nthreadpools; int16_t nthreads; - int16_t ngcthreads; + int16_t nmarkthreads; + int8_t nsweepthreads; const int16_t *nthreads_per_pool; int32_t nprocs; const char *machine_file; diff --git a/src/partr.c b/src/partr.c index 0194cd6a4b31e2..d83d094caa767d 100644 --- a/src/partr.c +++ b/src/partr.c @@ -113,13 +113,13 @@ extern uv_cond_t gc_threads_cond; extern _Atomic(int) gc_n_threads_marking; extern void gc_mark_loop_parallel(jl_ptls_t ptls, int master); -static int may_mark(void) JL_NOTSAFEPOINT +static inline int may_mark(void) JL_NOTSAFEPOINT { return (jl_atomic_load(&gc_n_threads_marking) > 0); } -// gc thread function -void jl_gc_threadfun(void *arg) +// gc thread mark function +void jl_gc_mark_threadfun(void *arg) { jl_threadarg_t *targ = (jl_threadarg_t*)arg; @@ -143,6 +143,34 @@ void jl_gc_threadfun(void *arg) } } +// gc thread sweep function +void jl_gc_sweep_threadfun(void *arg) +{ + jl_threadarg_t *targ = (jl_threadarg_t*)arg; + + // initialize this thread (set tid and create heap) + jl_ptls_t ptls = jl_init_threadtls(targ->tid); + + // wait for all threads + jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0); + uv_barrier_wait(targ->barrier); + + // free the thread argument here + free(targ); + + while (1) { + uv_sem_wait(&gc_sweep_assists_needed); + while (1) { + jl_gc_pagemeta_t *pg = pop_lf_page_metadata_back(&global_page_pool_lazily_freed); + if (pg == NULL) { + break; + } + jl_gc_free_page(pg); + push_lf_page_metadata_back(&global_page_pool_freed, pg); + } + } +} + // thread function: used by all mutator threads except the main thread void jl_threadfun(void *arg) { diff --git a/src/threading.c b/src/threading.c index f780859fb600de..cac81edc8b4ff8 100644 --- a/src/threading.c +++ b/src/threading.c @@ -562,6 +562,8 @@ static void jl_check_tls(void) JL_DLLEXPORT const int jl_tls_elf_support = 0; #endif +extern int jl_n_markthreads; +extern int jl_n_sweepthreads; extern int gc_first_tid; // interface to Julia; sets up to make the runtime thread-safe @@ -616,22 +618,37 @@ void jl_init_threading(void) } } - int16_t ngcthreads = jl_options.ngcthreads - 1; - if (ngcthreads == -1 && - (cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified - - ngcthreads = (uint64_t)strtol(cp, NULL, 10) - 1; - } - if (ngcthreads == -1) { - // if `--gcthreads` was not specified, set the number of GC threads - // to half of compute threads - if (nthreads <= 1) { - ngcthreads = 0; + jl_n_markthreads = jl_options.nmarkthreads - 1; + jl_n_sweepthreads = jl_options.nsweepthreads; + if (jl_n_markthreads == -1) { // --gcthreads not specified + if ((cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified + errno = 0; + jl_n_markthreads = (uint64_t)strtol(cp, &endptr, 10) - 1; + if (errno != 0 || endptr == cp || nthreads <= 0) + jl_n_markthreads = 0; + cp = endptr; + if (*cp == ',') { + cp++; + errno = 0; + jl_n_sweepthreads = strtol(cp, &endptri, 10); + if (errno != 0 || endptri == cp || jl_n_sweepthreads < 0) { + jl_n_sweepthreads = 0; + } + } } else { - ngcthreads = (nthreads / 2) - 1; + // if `--gcthreads` or ENV[NUM_GCTHREADS_NAME] was not specified, + // set the number of mark threads to half of compute threads + // and number of sweep threads to 0 + if (nthreads <= 1) { + jl_n_markthreads = 0; + } + else { + jl_n_markthreads = (nthreads / 2) - 1; + } } } + int16_t ngcthreads = jl_n_markthreads + jl_n_sweepthreads; jl_all_tls_states_size = nthreads + nthreadsi + ngcthreads; jl_n_threads_per_pool = (int*)malloc_s(2 * sizeof(int)); @@ -697,8 +714,11 @@ void jl_start_threads(void) mask[i] = 0; } } + else if (i == nthreads - 1 && jl_n_sweepthreads == 1) { + uv_thread_create(&uvtid, jl_gc_sweep_threadfun, t); + } else { - uv_thread_create(&uvtid, jl_gc_threadfun, t); + uv_thread_create(&uvtid, jl_gc_mark_threadfun, t); } uv_thread_detach(&uvtid); } diff --git a/src/threading.h b/src/threading.h index 1cf2d4a9d37112..52c06830a1c655 100644 --- a/src/threading.h +++ b/src/threading.h @@ -25,7 +25,8 @@ jl_ptls_t jl_init_threadtls(int16_t tid); // provided by a threading infrastructure void jl_init_threadinginfra(void); -void jl_gc_threadfun(void *arg); +void jl_gc_mark_threadfun(void *arg); +void jl_gc_sweep_threadfun(void *arg); void jl_threadfun(void *arg); #ifdef __cplusplus diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index d8039ba160d66b..d8cc052967d50e 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -1331,6 +1331,14 @@ function get_threads_spec(opts) end end +function get_gcthreads_spec(opts) + if opts.nmarkthreads > 0 || opts.nsweepthreads > 0 + `--gcthreads=$(opts.nmarkthreads),$(opts.nsweepthreads)` + else + `` + end +end + # Starts workers specified by (-n|--procs) and --machine-file command line options function process_opts(opts) # startup worker. @@ -1345,8 +1353,9 @@ function process_opts(opts) end # Propagate --threads to workers - threads = opts.nthreads > 0 ? `--threads=$(opts.nthreads)` : `` - gcthreads = opts.ngcthreads > 0 ? `--gcthreads=$(opts.ngcthreads)` : `` + threads = get_threads_spec(opts) + # Propagate --gcthreads to workers + gcthreads = get_gcthreads_spec(opts) exeflags = `$threads $gcthreads` diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index f73a7854fd2f11..b0fd30028b2ce8 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -281,12 +281,19 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` withenv("JULIA_NUM_GC_THREADS" => nt) do @test read(`$exename --gcthreads=2 -e $code`, String) == "2" end + withenv("JULIA_NUM_GC_THREADS" => nt) do + @test read(`$exename --gcthreads=2,1 -e $code`, String) == "3" + end end withenv("JULIA_NUM_GC_THREADS" => 2) do @test read(`$exename -e $code`, String) == "2" end + withenv("JULIA_NUM_GC_THREADS" => "2,1") do + @test read(`$exename -e $code`, String) == "3" + end + # --machine-file # this does not check that machine file works, # only that the filename gets correctly passed to the option struct diff --git a/test/gc.jl b/test/gc.jl index 9cc9d753dfc093..54df172e9f3f84 100644 --- a/test/gc.jl +++ b/test/gc.jl @@ -4,11 +4,13 @@ using Test function run_gctest(file) let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no $file` - for test_nthreads in (1, 2, 4) - new_env = copy(ENV) - new_env["JULIA_NUM_THREADS"] = string(test_nthreads) - new_env["JULIA_NUM_GC_THREADS"] = string(test_nthreads) - @time run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr)) + @testset for test_nthreads in (1, 2, 4) + @testset for concurrent_sweep in (0, 1) + new_env = copy(ENV) + new_env["JULIA_NUM_THREADS"] = string(test_nthreads) + new_env["JULIA_NUM_GC_THREADS"] = "$(test_nthreads),$(concurrent_sweep)" + @test success(run(pipeline(setenv(cmd, new_env), stdout = stdout, stderr = stderr))) + end end end end