Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make python GC always run on thread 1 #520

Closed
wants to merge 14 commits into from
8 changes: 7 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ jobs:
julia:
name: Test (${{ matrix.os }}, julia ${{ matrix.jlversion }})
runs-on: ${{ matrix.os }}
env:
JULIA_NUM_THREADS: ${{ matrix.jl_nthreads }}
strategy:
fail-fast: true
matrix:
arch: [x64] # x86 unsupported by MicroMamba
os: [ubuntu-latest, windows-latest, macos-latest]
jlversion: ['1','1.6']
jlversion: ['1', '1.6']
jl_nthreads: ['1', '2']
steps:
- uses: actions/checkout@v2
- name: Set up Julia ${{ matrix.jlversion }}
Expand Down Expand Up @@ -52,11 +55,14 @@ jobs:
python:
name: Test (${{ matrix.os }}, python ${{ matrix.pyversion }})
runs-on: ${{ matrix.os }}
env:
JULIA_NUM_THREADS: ${{ matrix.jl_nthreads }}
strategy:
fail-fast: true
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
pyversion: ["3.x", "3.8"]
jl_nthreads: ['1', '2']
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.pyversion }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Manifest.toml
Manifest-*.toml
.ipynb_checkpoints
__pycache__
*.egg-info
Expand Down
43 changes: 43 additions & 0 deletions gcbench.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using PythonCall, Test

function wait_for_queue_to_be_empty()
ret = timedwait(5) do
isempty(PythonCall.GC.QUEUE)
end
ret === :ok || error("QUEUE still not empty")
end

# https://github.com/JuliaCI/GCBenchmarks/blob/fc288c696381ebfdef8f002168addd0ec1b08e34/benches/serial/append/append.jl
macro gctime(ex)
quote
local prior = PythonCall.GC.SECONDS_SPENT_IN_GC[]
local ret = @timed $(esc(ex))
Base.time_print(stdout, ret.time * 1e9, ret.gcstats.allocd, ret.gcstats.total_time, Base.gc_alloc_count(ret.gcstats); msg="Runtime")
println(stdout)
local waiting = @elapsed wait_for_queue_to_be_empty()
local after = PythonCall.GC.SECONDS_SPENT_IN_GC[]
Base.time_print(stdout, (after - prior) * 1e9; msg="Python GC time")
println(stdout)
Base.time_print(stdout, waiting * 1e9; msg="Python GC time (waiting)")
println(stdout)
ret.value
end
end

function append_lots(iters=100 * 1024, size=1596)
v = pylist()
for i = 1:iters
v.append(pylist(rand(size)))
end
return v
end

GC.enable_logging(false)
PythonCall.GC.enable_logging(false)
@time "Total" begin
@gctime append_lots()
@time "Next full GC" begin
GC.gc(true)
wait_for_queue_to_be_empty()
end
end
29 changes: 11 additions & 18 deletions pysrc/juliacall/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ def args_from_config():
CONFIG['opt_handle_signals'] = choice('handle_signals', ['yes', 'no'])[0]
CONFIG['opt_startup_file'] = choice('startup_file', ['yes', 'no'])[0]

if CONFIG['opt_handle_signals'] is None:
cli_threads = CONFIG['opt_threads']
multi = isinstance(cli_threads, int) and cli_threads > 1
env_threads = os.environ.get('JULIA_NUM_THREADS', '0')
if env_threads == 'auto' or int(env_threads) > 1:
multi=True
if multi:
# We must handle signals if Julia has more than 1 thread.
# Should we issue a warning here that we are turning on signal-handling?
CONFIG['opt_handle_signals'] = "yes"

# Stop if we already initialised
if CONFIG['inited']:
return
Expand Down Expand Up @@ -240,24 +251,6 @@ def jlstr(x):

CONFIG['inited'] = True

if CONFIG['opt_handle_signals'] is None:
if Base.Threads.nthreads() > 1:
# a warning to help multithreaded users
# TODO: should we set PYTHON_JULIACALL_HANDLE_SIGNALS=yes whenever PYTHON_JULIACALL_THREADS != 1?
warnings.warn(
"Julia was started with multiple threads "
"but multithreading support is experimental in JuliaCall. "
"It is recommended to restart Python with the environment variable "
"PYTHON_JULIACALL_HANDLE_SIGNALS=yes "
"set, otherwise you may experience segfaults or other crashes. "
"Note however that this interferes with Python's own signal handling, "
"so for example Ctrl-C will not raise KeyboardInterrupt. "
"See https://juliapy.github.io/PythonCall.jl/stable/faq/#Is-PythonCall/JuliaCall-thread-safe? "
"for further information. "
"You can suppress this warning by setting "
"PYTHON_JULIACALL_HANDLE_SIGNALS=no."
)

# Next, automatically load the juliacall extension if we are in IPython or Jupyter
CONFIG['autoload_ipython_extension'] = choice('autoload_ipython_extension', ['yes', 'no'])[0]
if CONFIG['autoload_ipython_extension'] in {'yes', None}:
Expand Down
29 changes: 29 additions & 0 deletions pytest/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,32 @@ def test_issue_433():
"""
)
assert out == 25

def test_julia_gc():
from juliacall import Main as jl
# We make a bunch of python objects with no reference to them,
# then call GC to try to finalize them.
# We want to make sure we don't segfault.
# Here we can (manually) verify that the background task is running successfully,
# by seeing the printout "Python GC (100 items): 0.000000 seconds."
# We also programmatically check things are working by verifying the queue is empty.
# Debugging note: if you get segfaults, then run the tests with
# `PYTHON_JULIACALL_HANDLE_SIGNALS=yes python3 -X faulthandler -m pytest -p no:faulthandler -s --nbval --cov=pysrc ./pytest/`
# in order to recover a bit more information from the segfault.
jl.seval(
"""
using PythonCall, Test
PythonCall.GC.enable_logging()
let
pyobjs = map(pylist, 1:100)
Threads.@threads for obj in pyobjs
finalize(obj)
end
end
GC.gc()
ret = timedwait(5) do
isempty(PythonCall.GC.QUEUE)
end
@test ret === :ok
"""
)
187 changes: 153 additions & 34 deletions src/GC/GC.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,77 +9,196 @@ module GC

using ..C: C

const ENABLED = Ref(true)
const QUEUE = C.PyPtr[]
# `ENABLED`: whether or not python GC is enabled, or paused to process later
const ENABLED = Threads.Atomic{Bool}(true)
# this event allows us to `wait` in a task until GC is re-enabled
# we have both this and `ENABLED` since there is no `isready(::Event)`
# for us to do a non-blocking check. Instead we must keep the event being triggered
# in-sync with `ENABLED[]`.
# We therefore modify both in `enable()` and `disable()` and nowhere else.
const ENABLED_EVENT = Threads.Event()

# this is the queue to process pointers for GC (`C.Py_DecRef`)
const QUEUE = Channel{C.PyPtr}(Inf)

# this is the task which performs GC from thread 1
const GC_TASK = Ref{Task}()

# This we use in testing to know when our GC is running
const GC_FINISHED = Threads.Condition()

# This is used for basic profiling
const SECONDS_SPENT_IN_GC = Threads.Atomic{Float64}()

const LOGGING_ENABLED = Ref{Bool}(false)

"""
PythonCall.GC.disable()
PythonCall.GC.enable_logging(enable=true)

Enables printed logging (similar to Julia's `GC.enable_logging`).
"""
function enable_logging(enable=true)
LOGGING_ENABLED[] = enable
return nothing
end

Disable the PythonCall garbage collector.
"""
PythonCall.GC.disable()

This means that whenever a Python object owned by Julia is finalized, it is not immediately
freed but is instead added to a queue of objects to free later when `enable()` is called.
Disable the PythonCall garbage collector. This should generally not be required.

Like most PythonCall functions, you must only call this from the main thread.
"""
function disable()
ENABLED[] = false
reset(ENABLED_EVENT)
return
end

"""
PythonCall.GC.enable()

Re-enable the PythonCall garbage collector.
Re-enable the PythonCall garbage collector. This should generally not be required.

This frees any Python objects which were finalized while the GC was disabled, and allows
objects finalized in the future to be freed immediately.

Like most PythonCall functions, you must only call this from the main thread.
"""
function enable()
ENABLED[] = true
if !isempty(QUEUE)
C.with_gil(false) do
for ptr in QUEUE
if ptr != C.PyNULL
C.Py_DecRef(ptr)
notify(ENABLED_EVENT)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could wait for the GC_finished event here to preserve the semantics that this function call only returns after all GC events are processed

return
end

# This is called within a finalizer so we must not task switch
# (so no printing nor blocking on Julia-side locks)
function enqueue_wrapper(f, g)
t = @elapsed begin
if C.CTX.is_initialized
# Eager path: if we are already on thread 1,
# we eagerly decrement
handled = false
if ENABLED[] && Threads.threadid() == 1
# temporarily disable thread migration to be sure
# we call `C.Py_DecRef` from thread 1
old_sticky = current_task().sticky
if !old_sticky
current_task().sticky = true
end
if Threads.threadid() == 1
f()
handled = true
end
if !old_sticky
current_task().sticky = old_sticky
end
end
if !handled
g()
end
end
end
empty!(QUEUE)
Threads.atomic_add!(SECONDS_SPENT_IN_GC, t)
return
end

function enqueue(ptr::C.PyPtr)
if ptr != C.PyNULL && C.CTX.is_initialized
if ENABLED[]
C.with_gil(false) do
# if we are on thread 1:
f = () -> begin
C.with_gil(false) do
if ptr != C.PyNULL
C.Py_DecRef(ptr)
end
else
push!(QUEUE, ptr)
end
end
return
# otherwise:
g = () -> begin
if ptr != C.PyNULL
put!(QUEUE, ptr)
end
end
enqueue_wrapper(f, g)
end

function enqueue_all(ptrs)
if C.CTX.is_initialized
if ENABLED[]
C.with_gil(false) do
for ptr in ptrs
if ptr != C.PyNULL
C.Py_DecRef(ptr)
end
# if we are on thread 1:
f = () -> begin
C.with_gil(false) do
for ptr in ptrs
if ptr != C.PyNULL
C.Py_DecRef(ptr)
end
end
else
append!(QUEUE, ptrs)
end
end
return
# otherwise:
g = () -> begin
for ptr in ptrs
if ptr != C.PyNULL
put!(QUEUE, ptr)
end
end
end
enqueue_wrapper(f, g)
end

# must only be called from thread 1 by the task in `GC_TASK[]`
function unsafe_process_queue!()
n = 0
if !isempty(QUEUE)
t = @elapsed C.with_gil(false) do
while !isempty(QUEUE) && ENABLED[]
# This should never block, since there should
# only be one consumer
# (we would like to not block while holding the GIL)
ptr = take!(QUEUE)
if ptr != C.PyNULL
C.Py_DecRef(ptr)
n += 1
end
end
end
if LOGGING_ENABLED[]
Base.time_print(stdout, t; msg="Python GC ($n items)")
println(stdout)
end
else
t = 0.0
end
return t
end

function gc_loop()
while true
if ENABLED[] && !isempty(QUEUE)
t = unsafe_process_queue!()
Threads.atomic_add!(SECONDS_SPENT_IN_GC, t)
# just for testing purposes
Base.@lock GC_FINISHED notify(GC_FINISHED)
end
# wait until there is both something to process
# and GC is `enabled`
wait(QUEUE)
wait(ENABLED_EVENT)
end
end

function launch_gc_task()
if isassigned(GC_TASK) && Base.istaskstarted(GC_TASK[]) && !Base.istaskdone(GC_TASK[])
throw(ConcurrencyViolationError("PythonCall GC task already running!"))
end
task = Task(gc_loop)
task.sticky = VERSION >= v"1.7" # disallow task migration which was introduced in 1.7
# ensure the task runs from thread 1
ccall(:jl_set_task_tid, Cvoid, (Any, Cint), task, 0)
schedule(task)
Comment on lines +186 to +190
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isdefined(Base, :errormonitor)
Base.errormonitor(task)
end
GC_TASK[] = task
task
end

function __init__()
launch_gc_task()
enable() # start enabled
nothing
end

end # module GC
Loading
Loading