Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synchronize FileWatching #31981

Merged
merged 1 commit into from
May 10, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 114 additions & 58 deletions stdlib/FileWatching/src/FileWatching.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ FDEvent(flags::Integer) = FDEvent((flags & UV_READABLE) != 0,
mutable struct FileMonitor
handle::Ptr{Cvoid}
file::String
notify::Condition
notify::Base.ThreadSynchronizer
events::Int32
active::Bool
FileMonitor(file::AbstractString) = FileMonitor(String(file))
function FileMonitor(file::String)
handle = Libc.malloc(_sizeof_uv_fs_event)
this = new(handle, file, Condition(), 0, false)
this = new(handle, file, Base.ThreadSynchronizer(), 0, false)
associate_julia_struct(handle, this)
err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle)
if err != 0
Expand Down Expand Up @@ -118,14 +118,14 @@ mutable struct PollingFileWatcher
handle::Ptr{Cvoid}
file::String
interval::UInt32
notify::Condition
notify::Base.ThreadSynchronizer
active::Bool
curr_error::Int32
curr_stat::StatStruct
PollingFileWatcher(file::AbstractString, interval::Float64=5.007) = PollingFileWatcher(String(file), interval)
function PollingFileWatcher(file::String, interval::Float64=5.007) # same default as nodejs
handle = Libc.malloc(_sizeof_uv_fs_poll)
this = new(handle, file, round(UInt32, interval * 1000), Condition(), false, 0, StatStruct())
this = new(handle, file, round(UInt32, interval * 1000), Base.ThreadSynchronizer(), false, 0, StatStruct())
associate_julia_struct(handle, this)
err = ccall(:uv_fs_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle)
if err != 0
Expand All @@ -141,7 +141,7 @@ mutable struct _FDWatcher
handle::Ptr{Cvoid}
fdnum::Int # this is NOT the file descriptor
refcount::Tuple{Int, Int}
notify::Condition
notify::Base.ThreadSynchronizer
events::Int32
active::Tuple{Bool, Bool}

Expand Down Expand Up @@ -171,7 +171,7 @@ mutable struct _FDWatcher
handle,
fdnum,
(Int(readable), Int(writable)),
Condition(),
Base.ThreadSynchronizer(),
0,
(false, false))
associate_julia_struct(handle, this)
Expand All @@ -187,19 +187,24 @@ mutable struct _FDWatcher
end

function uvfinalize(t::_FDWatcher)
if t.handle != C_NULL
disassociate_julia_struct(t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
t.handle = C_NULL
end
t.refcount = (0, 0)
t.active = (false, false)
@static if Sys.isunix()
if FDWatchers[t.fdnum] == t
FDWatchers[t.fdnum] = nothing
lock(t.notify)
try
if t.handle != C_NULL
disassociate_julia_struct(t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
t.handle = C_NULL
end
t.refcount = (0, 0)
t.active = (false, false)
@static if Sys.isunix()
if FDWatchers[t.fdnum] == t
FDWatchers[t.fdnum] = nothing
end
end
notify(t.notify, FDEvent())
finally
unlock(t.notify)
end
notify(t.notify, FDEvent())
nothing
end
end
Expand All @@ -219,7 +224,7 @@ mutable struct _FDWatcher
handle,
0,
(Int(readable), Int(writable)),
Condition(),
Base.ThreadSynchronizer(),
0,
(false, false))
associate_julia_struct(handle, this)
Expand Down Expand Up @@ -289,16 +294,26 @@ function _uv_hook_close(uv::_FDWatcher)
end

function _uv_hook_close(uv::PollingFileWatcher)
uv.handle = C_NULL
uv.active = false
notify(uv.notify, StatStruct())
lock(uv.notify)
try
uv.handle = C_NULL
uv.active = false
notify(uv.notify, StatStruct())
finally
unlock(uv.notify)
end
nothing
end

function _uv_hook_close(uv::FileMonitor)
uv.handle = C_NULL
uv.active = false
notify(uv.notify, FileEvent())
lock(uv.notify)
try
uv.handle = C_NULL
uv.active = false
notify(uv.notify, FileEvent())
finally
unlock(uv.notify)
end
nothing
end

Expand All @@ -311,11 +326,16 @@ end

function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32)
t = @handle_as handle FileMonitor
if status != 0
notify_error(t.notify, _UVError("FileMonitor", status))
else
t.events |= events
notify(t.notify, FileEvent(events))
lock(t.notify)
try
if status != 0
notify_error(t.notify, _UVError("FileMonitor", status))
else
t.events |= events
notify(t.notify, FileEvent(events))
end
finally
unlock(t.notify)
end
nothing
end
Expand All @@ -333,19 +353,24 @@ end

function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32)
t = @handle_as handle _FDWatcher
if status != 0
notify_error(t.notify, _UVError("FDWatcher", status))
else
t.events |= events
if t.active[1] || t.active[2]
if isempty(t.notify)
# if we keep hearing about events when nobody appears to be listening,
# stop the poll to save cycles
t.active = (false, false)
ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle)
lock(t.notify)
try
if status != 0
notify_error(t.notify, _UVError("FDWatcher", status))
else
t.events |= events
if t.active[1] || t.active[2]
if isempty(t.notify)
# if we keep hearing about events when nobody appears to be listening,
# stop the poll to save cycles
t.active = (false, false)
ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle)
end
end
notify(t.notify, FDEvent(events))
end
notify(t.notify, FDEvent(events))
finally
unlock(t.notify)
end
nothing
end
Expand All @@ -359,7 +384,12 @@ function uv_fspollcb(handle::Ptr{Cvoid}, status::Int32, prev::Ptr, curr::Ptr)
end
if status == 0 || status != old_status
prev_stat = StatStruct(convert(Ptr{UInt8}, prev))
notify(t.notify, prev_stat)
lock(t.notify)
try
notify(t.notify, prev_stat)
finally
unlock(t.notify)
end
end
nothing
end
Expand Down Expand Up @@ -400,10 +430,15 @@ function start_watching(t::PollingFileWatcher)
end

function stop_watching(t::PollingFileWatcher)
if t.active && isempty(t.notify)
t.active = false
uv_error("PollingFileWatcher (stop)",
ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle))
lock(t.notify)
try
if t.active && isempty(t.notify)
t.active = false
uv_error("PollingFileWatcher (stop)",
ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle))
end
finally
unlock(t.notify)
end
nothing
end
Expand All @@ -420,10 +455,15 @@ function start_watching(t::FileMonitor)
end

function stop_watching(t::FileMonitor)
if t.active && isempty(t.notify)
t.active = false
uv_error("FileMonitor (stop)",
ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle))
lock(t.notify)
try
if t.active && isempty(t.notify)
t.active = false
uv_error("FileMonitor (stop)",
ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle))
end
finally
unlock(t.notify)
end
nothing
end
Expand Down Expand Up @@ -456,8 +496,13 @@ function wait(fdw::_FDWatcher; readable=true, writable=true)
if fdw.refcount == (0, 0) # !open
events = EOFError()
else
start_watching(fdw) # make sure the poll is active
events = stream_wait(fdw, fdw.notify)::FDEvent
lock(fdw.notify)
try
start_watching(fdw) # make sure the poll is active
events = stream_wait(fdw, fdw.notify)::FDEvent
finally
unlock(fdw.notify)
end
end
end
end
Expand All @@ -483,8 +528,14 @@ if Sys.iswindows()
end

function wait(pfw::PollingFileWatcher)
start_watching(pfw)
prevstat = stream_wait(pfw, pfw.notify)::StatStruct
lock(pfw.notify)
local prevstat
try
start_watching(pfw)
prevstat = stream_wait(pfw, pfw.notify)::StatStruct
finally
unlock(pfw.notify)
end
stop_watching(pfw)
if pfw.handle == C_NULL
return prevstat, EOFError()
Expand All @@ -496,10 +547,16 @@ function wait(pfw::PollingFileWatcher)
end

function wait(m::FileMonitor)
start_watching(m)
events = stream_wait(m, m.notify)::FileEvent
events |= FileEvent(m.events)
m.events = 0
lock(m.notify)
local events
try
start_watching(m)
events = stream_wait(m, m.notify)::FileEvent
events |= FileEvent(m.events)
m.events = 0
finally
unlock(m.notify)
end
stop_watching(m)
return events
end
Expand Down Expand Up @@ -608,7 +665,6 @@ This behavior of this function varies slightly across platforms. See
"""
watch_folder(s::AbstractString, timeout_s::Real=-1) = watch_folder(String(s), timeout_s)
function watch_folder(s::String, timeout_s::Real=-1)
wt = Condition()
fm = get!(watched_folders, s) do
return FolderMonitor(s)
end
Expand Down