Skip to content

Commit

Permalink
started to backport concurrent page sweeping
Browse files Browse the repository at this point in the history
  • Loading branch information
d-netto committed Aug 7, 2023
1 parent 7e94c00 commit 1abc6c9
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 43 deletions.
3 changes: 2 additions & 1 deletion base/options.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ struct JLOptions
cpu_target::Ptr{UInt8}
nthreadpools::Int16
nthreads::Int16
ngcthreads::Int16
nmarkthreads::Int16
nsweepthreads::Int8
nthreads_per_pool::Ptr{Int16}
nprocs::Int32
machine_file::Ptr{UInt8}
Expand Down
1 change: 1 addition & 0 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ end
Threads.ngcthreads() -> Int
Returns the number of GC threads currently configured.
This includes both mark threads and concurrent sweep threads.
"""
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1

Expand Down
10 changes: 9 additions & 1 deletion src/gc-pages.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT
#endif
jl_gc_pagemeta_t *meta = NULL;

// try to get page from `pool_lazily_freed`
meta = pop_lf_page_metadata_back(&global_page_pool_lazily_freed);
if (meta != NULL) {
gc_alloc_map_set(meta->data, 1);
// page is already mapped
return meta;
}

// try to get page from `pool_clean`
meta = pop_lf_page_metadata_back(&global_page_pool_clean);
if (meta != NULL) {
Expand All @@ -133,7 +141,7 @@ NOINLINE jl_gc_pagemeta_t *jl_gc_alloc_page(void) JL_NOTSAFEPOINT
}

uv_mutex_lock(&gc_perm_lock);
// another thread may have allocated a large block while we're waiting...
// another thread may have allocated a large block while we were waiting...
meta = pop_lf_page_metadata_back(&global_page_pool_clean);
if (meta != NULL) {
uv_mutex_unlock(&gc_perm_lock);
Expand Down
40 changes: 33 additions & 7 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@
extern "C" {
#endif

// Number of GC threads that may run parallel marking
int jl_n_markthreads;
// Number of GC threads that may run concurrent sweeping (0 or 1)
int jl_n_sweepthreads;
// Number of threads currently running the GC mark-loop
_Atomic(int) gc_n_threads_marking;
// `tid` of mutator thread that triggered GC
_Atomic(int) gc_master_tid;
// `tid` of first GC thread
int gc_first_tid;
// To indicate whether concurrent sweeping should run
uv_sem_t gc_sweep_assists_needed;

// Mutex/cond used to synchronize sleep/wakeup of GC threads
uv_mutex_t gc_threads_lock;
Expand Down Expand Up @@ -1548,8 +1556,18 @@ static jl_taggedvalue_t **gc_sweep_page(jl_gc_pool_t *p, jl_gc_pagemeta_t **allo
push_page_metadata_back(lazily_freed, pg);
}
else {
#ifdef _P64 // only enable concurrent sweeping on 64bit
if (jl_n_sweepthreads == 0) {
jl_gc_free_page(pg);
push_lf_page_metadata_back(&global_page_pool_freed, pg);
}
else {
push_lf_page_metadata_back(&global_page_pool_lazily_freed, pg);
}
#else
jl_gc_free_page(pg);
push_lf_page_metadata_back(&global_page_pool_freed, pg);
#endif
}
gc_time_count_page(freedall, pg_skpd);
gc_num.freed += (nfree - old_nfree) * osize;
Expand Down Expand Up @@ -1672,6 +1690,13 @@ static void gc_sweep_pool(int sweep_full)
}
}

#ifdef _P64 // only enable concurrent sweeping on 64bit
// wake thread up to sweep concurrently
if (jl_n_sweepthreads > 0) {
uv_sem_post(&gc_sweep_assists_needed);
}
#endif

gc_time_pool_end(sweep_full);
}

Expand Down Expand Up @@ -2718,8 +2743,8 @@ void gc_mark_and_steal(jl_ptls_t ptls)
// of work for the mark loop
steal : {
// Try to steal chunk from random GC thread
for (int i = 0; i < 4 * jl_n_gcthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads;
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
c = gc_chunkqueue_steal_from(mq2);
if (c.cid != GC_empty_chunk) {
Expand All @@ -2728,7 +2753,7 @@ void gc_mark_and_steal(jl_ptls_t ptls)
}
}
// Sequentially walk GC threads to try to steal chunk
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
c = gc_chunkqueue_steal_from(mq2);
if (c.cid != GC_empty_chunk) {
Expand All @@ -2745,15 +2770,15 @@ void gc_mark_and_steal(jl_ptls_t ptls)
}
}
// Try to steal pointer from random GC thread
for (int i = 0; i < 4 * jl_n_gcthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_gcthreads;
for (int i = 0; i < 4 * jl_n_markthreads; i++) {
uint32_t v = gc_first_tid + cong(UINT64_MAX, UINT64_MAX, &ptls->rngseed) % jl_n_markthreads;
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[v]->mark_queue;
new_obj = gc_ptr_queue_steal_from(mq2);
if (new_obj != NULL)
goto mark;
}
// Sequentially walk GC threads to try to steal pointer
for (int i = gc_first_tid; i < gc_first_tid + jl_n_gcthreads; i++) {
for (int i = gc_first_tid; i < gc_first_tid + jl_n_markthreads; i++) {
jl_gc_markqueue_t *mq2 = &gc_all_tls_states[i]->mark_queue;
new_obj = gc_ptr_queue_steal_from(mq2);
if (new_obj != NULL)
Expand Down Expand Up @@ -2795,7 +2820,7 @@ void gc_mark_loop_parallel(jl_ptls_t ptls, int master)

void gc_mark_loop(jl_ptls_t ptls)
{
if (jl_n_gcthreads == 0 || gc_heap_snapshot_enabled) {
if (jl_n_markthreads == 0 || gc_heap_snapshot_enabled) {
gc_mark_loop_serial(ptls);
}
else {
Expand Down Expand Up @@ -3548,6 +3573,7 @@ void jl_gc_init(void)
uv_mutex_init(&gc_perm_lock);
uv_mutex_init(&gc_threads_lock);
uv_cond_init(&gc_threads_cond);
uv_sem_init(&gc_sweep_assists_needed, 0);

jl_gc_init_page();
jl_gc_debug_init();
Expand Down
2 changes: 2 additions & 0 deletions src/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ typedef struct {
_Atomic(jl_gc_pagemeta_t *) page_metadata_back;
} jl_gc_global_page_pool_t;

extern jl_gc_global_page_pool_t global_page_pool_lazily_freed;
extern jl_gc_global_page_pool_t global_page_pool_clean;
extern jl_gc_global_page_pool_t global_page_pool_freed;

Expand Down Expand Up @@ -434,6 +435,7 @@ STATIC_INLINE void gc_big_object_link(bigval_t *hdr, bigval_t **list) JL_NOTSAFE
*list = hdr;
}

extern uv_sem_t gc_sweep_assists_needed;
extern _Atomic(int) gc_n_threads_marking;
void gc_mark_queue_all_roots(jl_ptls_t ptls, jl_gc_markqueue_t *mq);
void gc_mark_finlist_(jl_gc_markqueue_t *mq, jl_value_t **fl_begin, jl_value_t **fl_end) JL_NOTSAFEPOINT;
Expand Down
29 changes: 20 additions & 9 deletions src/jloptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ JL_DLLEXPORT void jl_init_options(void)
NULL, // cpu_target ("native", "core2", etc...)
0, // nthreadpools
0, // nthreads
0, // ngcthreads
0, // nmarkthreads
0, // nsweepthreads
NULL, // nthreads_per_pool
0, // nprocs
NULL, // machine_file
Expand Down Expand Up @@ -130,7 +131,8 @@ static const char opts[] =
" interface if supported (Linux and Windows) or to the number of CPU\n"
" threads if not supported (MacOS) or if process affinity is not\n"
" configured, and sets M to 1.\n"
" --gcthreads=N Use N threads for GC, set to half of the number of compute threads if unspecified.\n"
" --gcthreads=N[,M] Use N threads for the mark phase of GC and M (0 or 1) threads for the concurrent sweeping phase of GC.\n"
" N is set to half of the number of compute threads and M is set to 0 if unspecified.\n"
" -p, --procs {N|auto} Integer value N launches N additional local worker processes\n"
" \"auto\" launches as many workers as the number of local CPU threads (logical cores)\n"
" --machine-file <file> Run processes on hosts listed in <file>\n\n"
Expand Down Expand Up @@ -822,6 +824,22 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
if (jl_options.heap_size_hint == 0)
jl_errorf("julia: invalid argument to --heap-size-hint without memory size specified");

break;
case opt_gc_threads:
errno = 0;
long nmarkthreads = strtol(optarg, &endptr, 10);
if (errno != 0 || optarg == endptr || nmarkthreads < 1 || nmarkthreads >= INT16_MAX) {
jl_errorf("julia: --gcthreads=<n>[,<m>]; n must be an integer >= 1");
}
jl_options.nmarkthreads = (int16_t)nmarkthreads;
if (*endptr == ',') {
errno = 0;
char *endptri;
long nsweepthreads = strtol(&endptr[1], &endptri, 10);
if (errno != 0 || endptri == &endptr[1] || *endptri != 0 || nsweepthreads < 0 || nsweepthreads > 1)
jl_errorf("julia: --gcthreads=<n>,<m>; m must be 0 or 1");
jl_options.nsweepthreads = (int8_t)nsweepthreads;
}
break;
case opt_permalloc_pkgimg:
if (!strcmp(optarg,"yes"))
Expand All @@ -831,13 +849,6 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp)
else
jl_errorf("julia: invalid argument to --permalloc-pkgimg={yes|no} (%s)", optarg);
break;
case opt_gc_threads:
errno = 0;
long ngcthreads = strtol(optarg, &endptr, 10);
if (errno != 0 || optarg == endptr || *endptr != 0 || ngcthreads < 1 || ngcthreads >= INT16_MAX)
jl_errorf("julia: --gcthreads=<n>; n must be an integer >= 1");
jl_options.ngcthreads = (int16_t)ngcthreads;
break;
default:
jl_errorf("julia: unhandled option -- %c\n"
"This is a bug, please report it.", c);
Expand Down
3 changes: 2 additions & 1 deletion src/jloptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ typedef struct {
const char *cpu_target;
int8_t nthreadpools;
int16_t nthreads;
int16_t ngcthreads;
int16_t nmarkthreads;
int8_t nsweepthreads;
const int16_t *nthreads_per_pool;
int32_t nprocs;
const char *machine_file;
Expand Down
34 changes: 31 additions & 3 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ extern uv_cond_t gc_threads_cond;
extern _Atomic(int) gc_n_threads_marking;
extern void gc_mark_loop_parallel(jl_ptls_t ptls, int master);

static int may_mark(void) JL_NOTSAFEPOINT
static inline int may_mark(void) JL_NOTSAFEPOINT
{
return (jl_atomic_load(&gc_n_threads_marking) > 0);
}

// gc thread function
void jl_gc_threadfun(void *arg)
// gc thread mark function
void jl_gc_mark_threadfun(void *arg)
{
jl_threadarg_t *targ = (jl_threadarg_t*)arg;

Expand All @@ -143,6 +143,34 @@ void jl_gc_threadfun(void *arg)
}
}

// gc thread sweep function
void jl_gc_sweep_threadfun(void *arg)
{
jl_threadarg_t *targ = (jl_threadarg_t*)arg;

// initialize this thread (set tid and create heap)
jl_ptls_t ptls = jl_init_threadtls(targ->tid);

// wait for all threads
jl_gc_state_set(ptls, JL_GC_STATE_WAITING, 0);
uv_barrier_wait(targ->barrier);

// free the thread argument here
free(targ);

while (1) {
uv_sem_wait(&gc_sweep_assists_needed);
while (1) {
jl_gc_pagemeta_t *pg = pop_lf_page_metadata_back(&global_page_pool_lazily_freed);
if (pg == NULL) {
break;
}
jl_gc_free_page(pg);
push_lf_page_metadata_back(&global_page_pool_freed, pg);
}
}
}

// thread function: used by all mutator threads except the main thread
void jl_threadfun(void *arg)
{
Expand Down
46 changes: 33 additions & 13 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ static void jl_check_tls(void)
JL_DLLEXPORT const int jl_tls_elf_support = 0;
#endif

extern int jl_n_markthreads;
extern int jl_n_sweepthreads;
extern int gc_first_tid;

// interface to Julia; sets up to make the runtime thread-safe
Expand Down Expand Up @@ -616,22 +618,37 @@ void jl_init_threading(void)
}
}

int16_t ngcthreads = jl_options.ngcthreads - 1;
if (ngcthreads == -1 &&
(cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified

ngcthreads = (uint64_t)strtol(cp, NULL, 10) - 1;
}
if (ngcthreads == -1) {
// if `--gcthreads` was not specified, set the number of GC threads
// to half of compute threads
if (nthreads <= 1) {
ngcthreads = 0;
jl_n_markthreads = jl_options.nmarkthreads - 1;
jl_n_sweepthreads = jl_options.nsweepthreads;
if (jl_n_markthreads == -1) { // --gcthreads not specified
if ((cp = getenv(NUM_GC_THREADS_NAME))) { // ENV[NUM_GC_THREADS_NAME] specified
errno = 0;
jl_n_markthreads = (uint64_t)strtol(cp, &endptr, 10) - 1;
if (errno != 0 || endptr == cp || nthreads <= 0)
jl_n_markthreads = 0;
cp = endptr;
if (*cp == ',') {
cp++;
errno = 0;
jl_n_sweepthreads = strtol(cp, &endptri, 10);
if (errno != 0 || endptri == cp || jl_n_sweepthreads < 0) {
jl_n_sweepthreads = 0;
}
}
}
else {
ngcthreads = (nthreads / 2) - 1;
// if `--gcthreads` or ENV[NUM_GCTHREADS_NAME] was not specified,
// set the number of mark threads to half of compute threads
// and number of sweep threads to 0
if (nthreads <= 1) {
jl_n_markthreads = 0;
}
else {
jl_n_markthreads = (nthreads / 2) - 1;
}
}
}
int16_t ngcthreads = jl_n_markthreads + jl_n_sweepthreads;

jl_all_tls_states_size = nthreads + nthreadsi + ngcthreads;
jl_n_threads_per_pool = (int*)malloc_s(2 * sizeof(int));
Expand Down Expand Up @@ -697,8 +714,11 @@ void jl_start_threads(void)
mask[i] = 0;
}
}
else if (i == nthreads - 1 && jl_n_sweepthreads == 1) {
uv_thread_create(&uvtid, jl_gc_sweep_threadfun, t);
}
else {
uv_thread_create(&uvtid, jl_gc_threadfun, t);
uv_thread_create(&uvtid, jl_gc_mark_threadfun, t);
}
uv_thread_detach(&uvtid);
}
Expand Down
3 changes: 2 additions & 1 deletion src/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ jl_ptls_t jl_init_threadtls(int16_t tid);

// provided by a threading infrastructure
void jl_init_threadinginfra(void);
void jl_gc_threadfun(void *arg);
void jl_gc_mark_threadfun(void *arg);
void jl_gc_sweep_threadfun(void *arg);
void jl_threadfun(void *arg);

#ifdef __cplusplus
Expand Down
13 changes: 11 additions & 2 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,14 @@ function get_threads_spec(opts)
end
end

function get_gcthreads_spec(opts)
if opts.nmarkthreads > 0 || opts.nsweepthreads > 0
`--gcthreads=$(opts.nmarkthreads),$(opts.nsweepthreads)`
else
``
end
end

# Starts workers specified by (-n|--procs) and --machine-file command line options
function process_opts(opts)
# startup worker.
Expand All @@ -1345,8 +1353,9 @@ function process_opts(opts)
end

# Propagate --threads to workers
threads = opts.nthreads > 0 ? `--threads=$(opts.nthreads)` : ``
gcthreads = opts.ngcthreads > 0 ? `--gcthreads=$(opts.ngcthreads)` : ``
threads = get_threads_spec(opts)
# Propagate --gcthreads to workers
gcthreads = get_gcthreads_spec(opts)

exeflags = `$threads $gcthreads`

Expand Down
Loading

0 comments on commit 1abc6c9

Please sign in to comment.