diff --git a/NEWS.md b/NEWS.md index e7cf33f0b1c46..4d034bc63bd84 100644 --- a/NEWS.md +++ b/NEWS.md @@ -23,6 +23,8 @@ Multi-threading changes * The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`. With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]). + * It is possible to schedule and switch Tasks during `@threads` loops, and perform limited I/O ([#31438]). + Language changes ---------------- * Empty entries in `JULIA_DEPOT_PATH` are now expanded to default depot entries ([#31009]). diff --git a/base/libuv.jl b/base/libuv.jl index a24434f8078fc..760d97e91eed6 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -48,18 +48,23 @@ disassociate_julia_struct(handle::Ptr{Cvoid}) = # A dict of all libuv handles that are being waited on somewhere in the system # and should thus not be garbage collected const uvhandles = IdDict() +const preserve_handle_lock = Threads.SpinLock() function preserve_handle(x) + lock(preserve_handle_lock) v = get(uvhandles, x, 0)::Int uvhandles[x] = v + 1 + unlock(preserve_handle_lock) nothing end function unpreserve_handle(x) + lock(preserve_handle_lock) v = uvhandles[x]::Int if v == 1 pop!(uvhandles, x) else uvhandles[x] = v - 1 end + unlock(preserve_handle_lock) nothing end diff --git a/base/stream.jl b/base/stream.jl index 8d90bd8be9ead..e97d984fcfbe8 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -120,7 +120,7 @@ mutable struct PipeEndpoint <: LibuvStream buffer::IOBuffer readnotify::Condition connectnotify::Condition - closenotify::Condition + closenotify::ThreadSynchronizer sendbuf::Union{IOBuffer, Nothing} lock::ReentrantLock throttle::Int @@ -130,7 +130,7 @@ mutable struct PipeEndpoint <: LibuvStream PipeBuffer(), Condition(), Condition(), - Condition(), + ThreadSynchronizer(), nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) @@ -165,7 +165,7 @@ mutable struct TTY <: LibuvStream status::Int buffer::IOBuffer readnotify::Condition - closenotify::Condition + closenotify::ThreadSynchronizer sendbuf::Union{IOBuffer, Nothing} lock::ReentrantLock throttle::Int @@ -176,7 +176,7 @@ mutable struct TTY <: LibuvStream status, PipeBuffer(), Condition(), - Condition(), + ThreadSynchronizer(), nothing, ReentrantLock(), DEFAULT_READ_BUFFER_SZ) @@ -380,8 +380,13 @@ function wait_readnb(x::LibuvStream, nb::Int) end function wait_close(x::Union{LibuvStream, LibuvServer}) - if isopen(x) - stream_wait(x, x.closenotify) + lock(x.closenotify) + try + if isopen(x) + stream_wait(x, x.closenotify) + end + finally + unlock(x.closenotify) end nothing end @@ -389,16 +394,24 @@ end function close(stream::Union{LibuvStream, LibuvServer}) if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - elseif isopen(stream) - if stream.status != StatusClosing - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) - stream.status = StatusClosing - end - if uv_handle_data(stream) != C_NULL - stream_wait(stream, stream.closenotify) + return nothing + end + lock(stream.closenotify) + try + if isopen(stream) + should_wait = uv_handle_data(stream) != C_NULL + if stream.status != StatusClosing + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) + stream.status = StatusClosing + end + if should_wait + stream_wait(stream, stream.closenotify) + end end + finally + unlock(stream.closenotify) end - nothing + return nothing end function uvfinalize(uv::Union{LibuvStream, LibuvServer}) @@ -547,7 +560,12 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) if isa(stream, TTY) stream.status = StatusEOF # libuv called uv_stop_reading already notify(stream.readnotify) - notify(stream.closenotify) + lock(stream.closenotify) + try + notify(stream.closenotify) + finally + unlock(stream.closenotify) + end elseif stream.status != StatusClosing # begin shutdown of the stream ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) @@ -589,10 +607,15 @@ function reseteof(x::TTY) end function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) - uv.handle = C_NULL - uv.status = StatusClosed - # notify any listeners that exist on this libuv stream type - notify(uv.closenotify) + lock(uv.closenotify) + try + uv.handle = C_NULL + uv.status = StatusClosed + # notify any listeners that exist on this libuv stream type + notify(uv.closenotify) + finally + unlock(uv.closenotify) + end isdefined(uv, :readnotify) && notify(uv.readnotify) isdefined(uv, :connectnotify) && notify(uv.connectnotify) nothing @@ -842,14 +865,13 @@ end uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p))) function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt) - uvw = uv_write_async(s, p, n) ct = current_task() + uvw = uv_write_async(s, p, n, ct) preserve_handle(ct) try # wait for the last chunk to complete (or error) # assume that any errors would be sticky, # (so we don't need to monitor the error status of the intermediate writes) - uv_req_set_data(uvw, ct) wait() finally if uv_req_data(uvw) != C_NULL @@ -867,11 +889,11 @@ end # helper function for uv_write that returns the uv_write_t struct for the write # rather than waiting on it -function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt) +function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt, reqdata) check_open(s) while true uvw = Libc.malloc(_sizeof_uv_write) - uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call + uv_req_set_data(uvw, reqdata) nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle. # TODO: use writev, when that is added to uv-win err = ccall(:jl_uv_write, diff --git a/src/jl_uv.c b/src/jl_uv.c index b3dd674cdd400..5aa5c3ddf0962 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -57,6 +57,20 @@ void jl_init_uv(void) JL_MUTEX_INIT(&jl_uv_mutex); // a file-scope initializer can be used instead } +int jl_uv_n_waiters = 0; + +void JL_UV_LOCK(void) +{ + if (jl_mutex_trylock(&jl_uv_mutex)) { + } + else { + jl_atomic_fetch_add(&jl_uv_n_waiters, 1); + jl_wake_libuv(); + JL_LOCK(&jl_uv_mutex); + jl_atomic_fetch_add(&jl_uv_n_waiters, -1); + } +} + void jl_uv_call_close_callback(jl_value_t *val) { jl_value_t *args[2]; @@ -190,18 +204,6 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop) return 0; } -JL_DLLEXPORT void jl_run_event_loop(uv_loop_t *loop) -{ - jl_ptls_t ptls = jl_get_ptls_states(); - if (loop) { - jl_gc_safepoint_(ptls); - JL_UV_LOCK(); - loop->stop_flag = 0; - uv_run(loop,UV_RUN_DEFAULT); - JL_UV_UNLOCK(); - } -} - JL_DLLEXPORT int jl_process_events(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); diff --git a/src/julia_internal.h b/src/julia_internal.h index d864bc4e5908b..9024bfcf23c5a 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -115,7 +115,8 @@ static inline void jl_assume_(int cond) static uv_loop_t *const unused_uv_loop_arg = (uv_loop_t *)0xBAD10; extern jl_mutex_t jl_uv_mutex; -#define JL_UV_LOCK() JL_LOCK(&jl_uv_mutex) +extern int jl_uv_n_waiters; +void JL_UV_LOCK(void); #define JL_UV_UNLOCK() JL_UNLOCK(&jl_uv_mutex) #ifdef __cplusplus diff --git a/src/partr.c b/src/partr.c index 6520c936da8f4..81b91ea7abbfc 100644 --- a/src/partr.c +++ b/src/partr.c @@ -226,8 +226,9 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) uv_cond_broadcast(&sleep_alarm); // TODO: make this uv_cond_signal / just wake up correct thread uv_mutex_unlock(&sleep_lock); } - /* stop the event loop too, if on thread 1 and alerting thread 1 */ - if (ptls->tid == 0 && (tid == 0 || tid == -1)) + if (_threadedregion && jl_uv_mutex.owner != jl_thread_self()) + jl_wake_libuv(); + else uv_stop(jl_global_event_loop()); } @@ -260,8 +261,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) if (task) return task; - if (ptls->tid == 0) { - if (!_threadedregion) { + if (!_threadedregion) { + if (ptls->tid == 0) { if (jl_run_once(jl_global_event_loop()) == 0) { task = get_next_task(getsticky); if (task) @@ -274,12 +275,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) } } else { - jl_process_events(jl_global_event_loop()); - } - } - else { - int sleepnow = 0; - if (!_threadedregion) { + int sleepnow = 0; uv_mutex_lock(&sleep_lock); if (!_threadedregion) { sleepnow = 1; @@ -287,16 +283,29 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky) else { uv_mutex_unlock(&sleep_lock); } + if (sleepnow) { + int8_t gc_state = jl_gc_safe_enter(ptls); + uv_cond_wait(&sleep_alarm, &sleep_lock); + uv_mutex_unlock(&sleep_lock); + jl_gc_safe_leave(ptls, gc_state); + } + } + } + else { + if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) { + task = get_next_task(getsticky); + if (task) { + JL_UV_UNLOCK(); + return task; + } + uv_loop_t *loop = jl_global_event_loop(); + loop->stop_flag = 0; + uv_run(loop, UV_RUN_ONCE); + JL_UV_UNLOCK(); } else { jl_cpu_pause(); } - if (sleepnow) { - int8_t gc_state = jl_gc_safe_enter(ptls); - uv_cond_wait(&sleep_alarm, &sleep_lock); - uv_mutex_unlock(&sleep_lock); - jl_gc_safe_leave(ptls, gc_state); - } } } }