Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic support for printing to TTYs from multiple threads #31438

Merged
merged 1 commit into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Multi-threading changes
* The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`.
With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]).

* It is possible to schedule and switch Tasks during `@threads` loops, and perform limited I/O ([#31438]).

Language changes
----------------
* Empty entries in `JULIA_DEPOT_PATH` are now expanded to default depot entries ([#31009]).
Expand Down
5 changes: 5 additions & 0 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,23 @@ disassociate_julia_struct(handle::Ptr{Cvoid}) =
# A dict of all libuv handles that are being waited on somewhere in the system
# and should thus not be garbage collected
const uvhandles = IdDict()
const preserve_handle_lock = Threads.SpinLock()
function preserve_handle(x)
lock(preserve_handle_lock)
v = get(uvhandles, x, 0)::Int
uvhandles[x] = v + 1
unlock(preserve_handle_lock)
nothing
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = uvhandles[x]::Int
if v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
end
unlock(preserve_handle_lock)
nothing
end

Expand Down
68 changes: 45 additions & 23 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ mutable struct PipeEndpoint <: LibuvStream
buffer::IOBuffer
readnotify::Condition
connectnotify::Condition
closenotify::Condition
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
Expand All @@ -130,7 +130,7 @@ mutable struct PipeEndpoint <: LibuvStream
PipeBuffer(),
Condition(),
Condition(),
Condition(),
ThreadSynchronizer(),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -165,7 +165,7 @@ mutable struct TTY <: LibuvStream
status::Int
buffer::IOBuffer
readnotify::Condition
closenotify::Condition
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
Expand All @@ -176,7 +176,7 @@ mutable struct TTY <: LibuvStream
status,
PipeBuffer(),
Condition(),
Condition(),
ThreadSynchronizer(),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -380,25 +380,38 @@ function wait_readnb(x::LibuvStream, nb::Int)
end

function wait_close(x::Union{LibuvStream, LibuvServer})
if isopen(x)
stream_wait(x, x.closenotify)
lock(x.closenotify)
try
if isopen(x)
stream_wait(x, x.closenotify)
end
finally
unlock(x.closenotify)
end
nothing
end

function close(stream::Union{LibuvStream, LibuvServer})
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
elseif isopen(stream)
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
if uv_handle_data(stream) != C_NULL
stream_wait(stream, stream.closenotify)
return nothing
end
lock(stream.closenotify)
try
if isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
if should_wait
stream_wait(stream, stream.closenotify)
end
end
finally
unlock(stream.closenotify)
end
nothing
return nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
Expand Down Expand Up @@ -547,7 +560,12 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.readnotify)
notify(stream.closenotify)
lock(stream.closenotify)
try
notify(stream.closenotify)
finally
unlock(stream.closenotify)
end
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
Expand Down Expand Up @@ -589,10 +607,15 @@ function reseteof(x::TTY)
end

function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
lock(uv.closenotify)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
finally
unlock(uv.closenotify)
end
isdefined(uv, :readnotify) && notify(uv.readnotify)
isdefined(uv, :connectnotify) && notify(uv.connectnotify)
nothing
Expand Down Expand Up @@ -842,14 +865,13 @@ end
uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p)))

function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
uvw = uv_write_async(s, p, n)
ct = current_task()
uvw = uv_write_async(s, p, n, ct)
preserve_handle(ct)
try
# wait for the last chunk to complete (or error)
# assume that any errors would be sticky,
# (so we don't need to monitor the error status of the intermediate writes)
uv_req_set_data(uvw, ct)
wait()
finally
if uv_req_data(uvw) != C_NULL
Expand All @@ -867,11 +889,11 @@ end

# helper function for uv_write that returns the uv_write_t struct for the write
# rather than waiting on it
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt, reqdata)
check_open(s)
while true
uvw = Libc.malloc(_sizeof_uv_write)
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
uv_req_set_data(uvw, reqdata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't delete comments that say precisely why your change is wrong

nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
# TODO: use writev, when that is added to uv-win
err = ccall(:jl_uv_write,
Expand Down
26 changes: 14 additions & 12 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ void jl_init_uv(void)
JL_MUTEX_INIT(&jl_uv_mutex); // a file-scope initializer can be used instead
}

int jl_uv_n_waiters = 0;

void JL_UV_LOCK(void)
{
if (jl_mutex_trylock(&jl_uv_mutex)) {
}
else {
jl_atomic_fetch_add(&jl_uv_n_waiters, 1);
jl_wake_libuv();
JL_LOCK(&jl_uv_mutex);
jl_atomic_fetch_add(&jl_uv_n_waiters, -1);
}
}

void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
Expand Down Expand Up @@ -190,18 +204,6 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
return 0;
}

JL_DLLEXPORT void jl_run_event_loop(uv_loop_t *loop)
{
jl_ptls_t ptls = jl_get_ptls_states();
if (loop) {
jl_gc_safepoint_(ptls);
JL_UV_LOCK();
loop->stop_flag = 0;
uv_run(loop,UV_RUN_DEFAULT);
JL_UV_UNLOCK();
}
}

JL_DLLEXPORT int jl_process_events(uv_loop_t *loop)
{
jl_ptls_t ptls = jl_get_ptls_states();
Expand Down
3 changes: 2 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ static inline void jl_assume_(int cond)
static uv_loop_t *const unused_uv_loop_arg = (uv_loop_t *)0xBAD10;

extern jl_mutex_t jl_uv_mutex;
#define JL_UV_LOCK() JL_LOCK(&jl_uv_mutex)
extern int jl_uv_n_waiters;
void JL_UV_LOCK(void);
#define JL_UV_UNLOCK() JL_UNLOCK(&jl_uv_mutex)

#ifdef __cplusplus
Expand Down
41 changes: 25 additions & 16 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
uv_cond_broadcast(&sleep_alarm); // TODO: make this uv_cond_signal / just wake up correct thread
uv_mutex_unlock(&sleep_lock);
}
/* stop the event loop too, if on thread 1 and alerting thread 1 */
if (ptls->tid == 0 && (tid == 0 || tid == -1))
if (_threadedregion && jl_uv_mutex.owner != jl_thread_self())
jl_wake_libuv();
else
uv_stop(jl_global_event_loop());
}

Expand Down Expand Up @@ -260,8 +261,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
if (task)
return task;

if (ptls->tid == 0) {
if (!_threadedregion) {
if (!_threadedregion) {
if (ptls->tid == 0) {
if (jl_run_once(jl_global_event_loop()) == 0) {
task = get_next_task(getsticky);
if (task)
Expand All @@ -274,29 +275,37 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
}
}
else {
jl_process_events(jl_global_event_loop());
}
}
else {
int sleepnow = 0;
if (!_threadedregion) {
int sleepnow = 0;
uv_mutex_lock(&sleep_lock);
if (!_threadedregion) {
sleepnow = 1;
}
else {
uv_mutex_unlock(&sleep_lock);
}
if (sleepnow) {
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_cond_wait(&sleep_alarm, &sleep_lock);
uv_mutex_unlock(&sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
}
}
}
else {
if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
task = get_next_task(getsticky);
if (task) {
JL_UV_UNLOCK();
return task;
}
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
uv_run(loop, UV_RUN_ONCE);
JL_UV_UNLOCK();
}
else {
jl_cpu_pause();
}
if (sleepnow) {
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_cond_wait(&sleep_alarm, &sleep_lock);
uv_mutex_unlock(&sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
}
}
}
}
Expand Down