diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index a3462b48..221fced8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,12 +14,15 @@ jobs: julia: name: Test (${{ matrix.os }}, julia ${{ matrix.jlversion }}) runs-on: ${{ matrix.os }} + env: + JULIA_NUM_THREADS: ${{ matrix.jl_nthreads }} strategy: fail-fast: true matrix: arch: [x64] # x86 unsupported by MicroMamba os: [ubuntu-latest, windows-latest, macos-latest] - jlversion: ['1','1.6'] + jlversion: ['1', '1.6'] + jl_nthreads: ['1', '2'] steps: - uses: actions/checkout@v2 - name: Set up Julia ${{ matrix.jlversion }} @@ -52,11 +55,14 @@ jobs: python: name: Test (${{ matrix.os }}, python ${{ matrix.pyversion }}) runs-on: ${{ matrix.os }} + env: + JULIA_NUM_THREADS: ${{ matrix.jl_nthreads }} strategy: fail-fast: true matrix: os: [ubuntu-latest, windows-latest, macos-latest] pyversion: ["3.x", "3.8"] + jl_nthreads: ['1', '2'] steps: - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.pyversion }} diff --git a/.gitignore b/.gitignore index 399503ad..d5e8c5c1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ Manifest.toml +Manifest-*.toml .ipynb_checkpoints __pycache__ *.egg-info diff --git a/gcbench.jl b/gcbench.jl new file mode 100644 index 00000000..c3189fd9 --- /dev/null +++ b/gcbench.jl @@ -0,0 +1,43 @@ +using PythonCall, Test + +function wait_for_queue_to_be_empty() + ret = timedwait(5) do + isempty(PythonCall.GC.QUEUE) + end + ret === :ok || error("QUEUE still not empty") +end + +# https://github.com/JuliaCI/GCBenchmarks/blob/fc288c696381ebfdef8f002168addd0ec1b08e34/benches/serial/append/append.jl +macro gctime(ex) + quote + local prior = PythonCall.GC.SECONDS_SPENT_IN_GC[] + local ret = @timed $(esc(ex)) + Base.time_print(stdout, ret.time * 1e9, ret.gcstats.allocd, ret.gcstats.total_time, Base.gc_alloc_count(ret.gcstats); msg="Runtime") + println(stdout) + local waiting = @elapsed wait_for_queue_to_be_empty() + local after = PythonCall.GC.SECONDS_SPENT_IN_GC[] + Base.time_print(stdout, (after - prior) * 1e9; msg="Python GC time") + println(stdout) + Base.time_print(stdout, waiting * 1e9; msg="Python GC time (waiting)") + println(stdout) + ret.value + end +end + +function append_lots(iters=100 * 1024, size=1596) + v = pylist() + for i = 1:iters + v.append(pylist(rand(size))) + end + return v +end + +GC.enable_logging(false) +PythonCall.GC.enable_logging(false) +@time "Total" begin + @gctime append_lots() + @time "Next full GC" begin + GC.gc(true) + wait_for_queue_to_be_empty() + end +end diff --git a/pysrc/juliacall/__init__.py b/pysrc/juliacall/__init__.py index 509dc415..9c1ff185 100644 --- a/pysrc/juliacall/__init__.py +++ b/pysrc/juliacall/__init__.py @@ -147,6 +147,17 @@ def args_from_config(): CONFIG['opt_handle_signals'] = choice('handle_signals', ['yes', 'no'])[0] CONFIG['opt_startup_file'] = choice('startup_file', ['yes', 'no'])[0] + if CONFIG['opt_handle_signals'] is None: + cli_threads = CONFIG['opt_threads'] + multi = isinstance(cli_threads, int) and cli_threads > 1 + env_threads = os.environ.get('JULIA_NUM_THREADS', '0') + if env_threads == 'auto' or int(env_threads) > 1: + multi=True + if multi: + # We must handle signals if Julia has more than 1 thread. + # Should we issue a warning here that we are turning on signal-handling? + CONFIG['opt_handle_signals'] = "yes" + # Stop if we already initialised if CONFIG['inited']: return @@ -240,24 +251,6 @@ def jlstr(x): CONFIG['inited'] = True - if CONFIG['opt_handle_signals'] is None: - if Base.Threads.nthreads() > 1: - # a warning to help multithreaded users - # TODO: should we set PYTHON_JULIACALL_HANDLE_SIGNALS=yes whenever PYTHON_JULIACALL_THREADS != 1? - warnings.warn( - "Julia was started with multiple threads " - "but multithreading support is experimental in JuliaCall. " - "It is recommended to restart Python with the environment variable " - "PYTHON_JULIACALL_HANDLE_SIGNALS=yes " - "set, otherwise you may experience segfaults or other crashes. " - "Note however that this interferes with Python's own signal handling, " - "so for example Ctrl-C will not raise KeyboardInterrupt. " - "See https://juliapy.github.io/PythonCall.jl/stable/faq/#Is-PythonCall/JuliaCall-thread-safe? " - "for further information. " - "You can suppress this warning by setting " - "PYTHON_JULIACALL_HANDLE_SIGNALS=no." - ) - # Next, automatically load the juliacall extension if we are in IPython or Jupyter CONFIG['autoload_ipython_extension'] = choice('autoload_ipython_extension', ['yes', 'no'])[0] if CONFIG['autoload_ipython_extension'] in {'yes', None}: diff --git a/pytest/test_all.py b/pytest/test_all.py index c6cff009..70a18808 100644 --- a/pytest/test_all.py +++ b/pytest/test_all.py @@ -75,3 +75,32 @@ def test_issue_433(): """ ) assert out == 25 + +def test_julia_gc(): + from juliacall import Main as jl + # We make a bunch of python objects with no reference to them, + # then call GC to try to finalize them. + # We want to make sure we don't segfault. + # Here we can (manually) verify that the background task is running successfully, + # by seeing the printout "Python GC (100 items): 0.000000 seconds." + # We also programmatically check things are working by verifying the queue is empty. + # Debugging note: if you get segfaults, then run the tests with + # `PYTHON_JULIACALL_HANDLE_SIGNALS=yes python3 -X faulthandler -m pytest -p no:faulthandler -s --nbval --cov=pysrc ./pytest/` + # in order to recover a bit more information from the segfault. + jl.seval( + """ + using PythonCall, Test + PythonCall.GC.enable_logging() + let + pyobjs = map(pylist, 1:100) + Threads.@threads for obj in pyobjs + finalize(obj) + end + end + GC.gc() + ret = timedwait(5) do + isempty(PythonCall.GC.QUEUE) + end + @test ret === :ok + """ + ) diff --git a/src/GC/GC.jl b/src/GC/GC.jl index 0d1fa9a8..718ba22c 100644 --- a/src/GC/GC.jl +++ b/src/GC/GC.jl @@ -9,77 +9,196 @@ module GC using ..C: C -const ENABLED = Ref(true) -const QUEUE = C.PyPtr[] +# `ENABLED`: whether or not python GC is enabled, or paused to process later +const ENABLED = Threads.Atomic{Bool}(true) +# this event allows us to `wait` in a task until GC is re-enabled +# we have both this and `ENABLED` since there is no `isready(::Event)` +# for us to do a non-blocking check. Instead we must keep the event being triggered +# in-sync with `ENABLED[]`. +# We therefore modify both in `enable()` and `disable()` and nowhere else. +const ENABLED_EVENT = Threads.Event() + +# this is the queue to process pointers for GC (`C.Py_DecRef`) +const QUEUE = Channel{C.PyPtr}(Inf) + +# this is the task which performs GC from thread 1 +const GC_TASK = Ref{Task}() + +# This we use in testing to know when our GC is running +const GC_FINISHED = Threads.Condition() + +# This is used for basic profiling +const SECONDS_SPENT_IN_GC = Threads.Atomic{Float64}() + +const LOGGING_ENABLED = Ref{Bool}(false) """ - PythonCall.GC.disable() + PythonCall.GC.enable_logging(enable=true) + +Enables printed logging (similar to Julia's `GC.enable_logging`). +""" +function enable_logging(enable=true) + LOGGING_ENABLED[] = enable + return nothing +end -Disable the PythonCall garbage collector. +""" + PythonCall.GC.disable() -This means that whenever a Python object owned by Julia is finalized, it is not immediately -freed but is instead added to a queue of objects to free later when `enable()` is called. +Disable the PythonCall garbage collector. This should generally not be required. -Like most PythonCall functions, you must only call this from the main thread. """ function disable() ENABLED[] = false + reset(ENABLED_EVENT) return end """ PythonCall.GC.enable() -Re-enable the PythonCall garbage collector. +Re-enable the PythonCall garbage collector. This should generally not be required. -This frees any Python objects which were finalized while the GC was disabled, and allows -objects finalized in the future to be freed immediately. - -Like most PythonCall functions, you must only call this from the main thread. """ function enable() ENABLED[] = true - if !isempty(QUEUE) - C.with_gil(false) do - for ptr in QUEUE - if ptr != C.PyNULL - C.Py_DecRef(ptr) + notify(ENABLED_EVENT) + return +end + +# This is called within a finalizer so we must not task switch +# (so no printing nor blocking on Julia-side locks) +function enqueue_wrapper(f, g) + t = @elapsed begin + if C.CTX.is_initialized + # Eager path: if we are already on thread 1, + # we eagerly decrement + handled = false + if ENABLED[] && Threads.threadid() == 1 + # temporarily disable thread migration to be sure + # we call `C.Py_DecRef` from thread 1 + old_sticky = current_task().sticky + if !old_sticky + current_task().sticky = true + end + if Threads.threadid() == 1 + f() + handled = true + end + if !old_sticky + current_task().sticky = old_sticky end end + if !handled + g() + end end end - empty!(QUEUE) + Threads.atomic_add!(SECONDS_SPENT_IN_GC, t) return end function enqueue(ptr::C.PyPtr) - if ptr != C.PyNULL && C.CTX.is_initialized - if ENABLED[] - C.with_gil(false) do + # if we are on thread 1: + f = () -> begin + C.with_gil(false) do + if ptr != C.PyNULL C.Py_DecRef(ptr) end - else - push!(QUEUE, ptr) end end - return + # otherwise: + g = () -> begin + if ptr != C.PyNULL + put!(QUEUE, ptr) + end + end + enqueue_wrapper(f, g) end function enqueue_all(ptrs) - if C.CTX.is_initialized - if ENABLED[] - C.with_gil(false) do - for ptr in ptrs - if ptr != C.PyNULL - C.Py_DecRef(ptr) - end + # if we are on thread 1: + f = () -> begin + C.with_gil(false) do + for ptr in ptrs + if ptr != C.PyNULL + C.Py_DecRef(ptr) end end - else - append!(QUEUE, ptrs) end end - return + # otherwise: + g = () -> begin + for ptr in ptrs + if ptr != C.PyNULL + put!(QUEUE, ptr) + end + end + end + enqueue_wrapper(f, g) +end + +# must only be called from thread 1 by the task in `GC_TASK[]` +function unsafe_process_queue!() + n = 0 + if !isempty(QUEUE) + t = @elapsed C.with_gil(false) do + while !isempty(QUEUE) && ENABLED[] + # This should never block, since there should + # only be one consumer + # (we would like to not block while holding the GIL) + ptr = take!(QUEUE) + if ptr != C.PyNULL + C.Py_DecRef(ptr) + n += 1 + end + end + end + if LOGGING_ENABLED[] + Base.time_print(stdout, t; msg="Python GC ($n items)") + println(stdout) + end + else + t = 0.0 + end + return t +end + +function gc_loop() + while true + if ENABLED[] && !isempty(QUEUE) + t = unsafe_process_queue!() + Threads.atomic_add!(SECONDS_SPENT_IN_GC, t) + # just for testing purposes + Base.@lock GC_FINISHED notify(GC_FINISHED) + end + # wait until there is both something to process + # and GC is `enabled` + wait(QUEUE) + wait(ENABLED_EVENT) + end +end + +function launch_gc_task() + if isassigned(GC_TASK) && Base.istaskstarted(GC_TASK[]) && !Base.istaskdone(GC_TASK[]) + throw(ConcurrencyViolationError("PythonCall GC task already running!")) + end + task = Task(gc_loop) + task.sticky = VERSION >= v"1.7" # disallow task migration which was introduced in 1.7 + # ensure the task runs from thread 1 + ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, 0) + schedule(task) + if isdefined(Base, :errormonitor) + Base.errormonitor(task) + end + GC_TASK[] = task + task +end + +function __init__() + launch_gc_task() + enable() # start enabled + nothing end end # module GC diff --git a/test/GC.jl b/test/GC.jl index 46409041..9655925c 100644 --- a/test/GC.jl +++ b/test/GC.jl @@ -1 +1,53 @@ -# TODO +@testset "201: GC segfaults" begin + # https://github.com/JuliaPy/PythonCall.jl/issues/201 + # This should not segfault! + cmd = Base.julia_cmd() + path = joinpath(@__DIR__, "finalize_test_script.jl") + p = run(`$cmd -t2 --project $path`) + @test p.exitcode == 0 +end + +@testset "GC.enable() and GC.disable()" begin + PythonCall.GC.disable() + try + # Put some stuff in the GC queue + let + pyobjs = map(pylist, 1:100) + foreach(PythonCall.Core.py_finalizer, pyobjs) + end + # Since GC is disabled, we should have built up entries in the queue + # (there is no race since we push to the queue eagerly) + @test !isempty(PythonCall.GC.QUEUE) + # now, setup a task so we can know once GC has triggered. + # We start waiting *before* enabling GC, so we know we'll catch + # the notification that GC finishes + # We also don't enable GC until we get confirmation via `is_waiting[]`. + # At this point we have acquired the lock for `PythonCall.GC.GC_FINISHED`. + # We won't relinquish it until the next line where we `wait`, + # at which point the GC can trigger it. Therefore we should be certain + # that the ordering is correct and we can't miss the event. + is_waiting = Threads.Atomic{Bool}(false) + rdy = Threads.@spawn begin + Base.@lock PythonCall.GC.GC_FINISHED begin + is_waiting[] = true + wait(PythonCall.GC.GC_FINISHED) + end + end + # Wait until the task starts + ret = timedwait(5) do + istaskstarted(rdy) && is_waiting[] + end + @test ret == :ok + # Now, re-enable GC + PythonCall.GC.enable() + # Wait for GC to run + wait(rdy) + # There should be nothing left in the queue, since we fully process it. + @test isempty(PythonCall.GC.QUEUE) + finally + # Make sure we re-enable GC regardless of whether our tests pass + PythonCall.GC.enable() + end +end + +@test_throws ConcurrencyViolationError("PythonCall GC task already running!") PythonCall.GC.launch_gc_task() diff --git a/test/finalize_test_script.jl b/test/finalize_test_script.jl new file mode 100644 index 00000000..ecacad9e --- /dev/null +++ b/test/finalize_test_script.jl @@ -0,0 +1,9 @@ +using PythonCall + +# This would consistently segfault pre-GC-thread-safety +let + pyobjs = map(pylist, 1:100) + Threads.@threads for obj in pyobjs + finalize(obj) + end +end