diff --git a/Project.toml b/Project.toml index 523343b..4189b77 100644 --- a/Project.toml +++ b/Project.toml @@ -5,6 +5,7 @@ version = "1.5.1" [deps] ArgTools = "0dad84c5-d112-42e6-8d28-ef12dabb789f" +FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21" NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908" diff --git a/src/Curl/Curl.jl b/src/Curl/Curl.jl index e4f5171..3858689 100644 --- a/src/Curl/Curl.jl +++ b/src/Curl/Curl.jl @@ -33,8 +33,9 @@ using LibCURL: curl_off_t const CURLE_PEER_FAILED_VERIFICATION = 60 const CURLSSLOPT_REVOKE_BEST_EFFORT = 1 << 3 +using FileWatching using NetworkOptions -using Base: preserve_handle, unpreserve_handle +using Base: OS_HANDLE, preserve_handle, unpreserve_handle include("utils.jl") diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index 8cec787..c56b94a 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -52,7 +52,7 @@ function remove_handle(multi::Multi, easy::Easy) multi.timer = Timer(multi.grace/1000) @async begin wait(multi.timer) - done!(multi) + isopen(multi.timer) && done!(multi) end end unpreserve_handle(multi) @@ -65,7 +65,7 @@ function set_defaults(multi::Multi) # currently no defaults end -# libuv callbacks +# multi-socket handle state updates struct CURLMsg msg :: CURLMSG @@ -73,7 +73,6 @@ struct CURLMsg code :: CURLcode end -# should already be locked function check_multi_info(multi::Multi) while true p = curl_multi_info_read(multi.handle, Ref{Cint}()) @@ -96,25 +95,6 @@ function check_multi_info(multi::Multi) end end -function event_callback( - uv_poll_p :: Ptr{Cvoid}, - status :: Cint, - events :: Cint, -)::Cvoid - ## TODO: use a member access API - multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_poll_p)) - multi = unsafe_pointer_to_objref(multi_p)::Multi - sock_p = uv_poll_p + Base._sizeof_uv_poll - sock = unsafe_load(convert(Ptr{curl_socket_t}, sock_p)) - flags = 0 - events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN) - events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT) - lock(multi.lock) do - @check curl_multi_socket_action(multi.handle, sock, flags) - check_multi_info(multi) - end -end - # curl callbacks function do_multi(multi::Multi) @@ -151,38 +131,42 @@ function socket_callback( sock :: curl_socket_t, action :: Cint, multi_p :: Ptr{Cvoid}, - uv_poll_p :: Ptr{Cvoid}, + watcher_p :: Ptr{Cvoid}, )::Cint + if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE) + @async @error("socket_callback: unexpected action", action) + return -1 + end multi = unsafe_pointer_to_objref(multi_p)::Multi + if watcher_p != C_NULL + old_watcher = unsafe_pointer_to_objref(watcher_p)::FDWatcher + @check curl_multi_assign(multi.handle, sock, C_NULL) + unpreserve_handle(old_watcher) + end if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT) - if uv_poll_p == C_NULL - uv_poll_p = uv_poll_alloc() - uv_poll_init(uv_poll_p, sock) - ## TODO: use a member access API - unsafe_store!(convert(Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p) - sock_p = uv_poll_p + Base._sizeof_uv_poll - unsafe_store!(convert(Ptr{curl_socket_t}, sock_p), sock) - lock(multi.lock) do - @check curl_multi_assign(multi.handle, sock, uv_poll_p) + readable = action in (CURL_POLL_IN, CURL_POLL_INOUT) + writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT) + watcher = FDWatcher(OS_HANDLE(sock), readable, writable) + preserve_handle(watcher) + watcher_p = pointer_from_objref(watcher) + @check curl_multi_assign(multi.handle, sock, watcher_p) + task = @async while true + events = try wait(watcher) + catch err + err isa EOFError && break + rethrow() end - end - events = 0 - action != CURL_POLL_IN && (events |= UV_WRITABLE) - action != CURL_POLL_OUT && (events |= UV_READABLE) - event_cb = @cfunction(event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint)) - uv_poll_start(uv_poll_p, events, event_cb) - elseif action == CURL_POLL_REMOVE - if uv_poll_p != C_NULL - uv_poll_stop(uv_poll_p) - uv_close(uv_poll_p, cglobal(:jl_free)) + flags = CURL_CSELECT_IN * isreadable(events) + + CURL_CSELECT_OUT * iswritable(events) + + CURL_CSELECT_ERR * events.disconnect lock(multi.lock) do - @check curl_multi_assign(multi.handle, sock, C_NULL) + @check curl_multi_socket_action(multi.handle, sock, flags) + check_multi_info(multi) end end - else - @async @error("socket_callback: unexpected action", action) - return -1 + @isdefined(errormonitor) && errormonitor(task) end + @isdefined(old_watcher) && close(old_watcher) return 0 end diff --git a/src/Curl/utils.jl b/src/Curl/utils.jl index e8d19e6..d77c4ab 100644 --- a/src/Curl/utils.jl +++ b/src/Curl/utils.jl @@ -1,17 +1,17 @@ -# basic C stuff - if !@isdefined(contains) contains(haystack, needle) = occursin(needle, haystack) export contains end +# basic C stuff + puts(s::Union{String,SubString{String}}) = ccall(:puts, Cint, (Ptr{Cchar},), s) jl_malloc(n::Integer) = ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), n) -# check if a function or C call failed +# check if a call failed -function check(ex::Expr, lock::Bool) +macro check(ex::Expr) ex.head == :call || error("@check: not a call: $ex") arg1 = ex.args[1] :: Symbol @@ -24,55 +24,13 @@ function check(ex::Expr, lock::Bool) f = arg1 end prefix = "$f: " - ex = esc(ex) - if lock - ex = quote - Base.iolock_begin() - value = $ex - Base.iolock_end() - value - end - end quote - r = $ex + r = $(esc(ex)) iszero(r) || @async @error($prefix * string(r)) r end end -macro check(ex::Expr) check(ex, false) end -macro check_iolock(ex::Expr) check(ex, true) end - -# some libuv wrappers - -const UV_READABLE = 1 -const UV_WRITABLE = 2 - -function uv_poll_alloc() - # allocate memory for: uv_poll_t struct + extra for curl_socket_t - jl_malloc(Base._sizeof_uv_poll + sizeof(curl_socket_t)) -end - -function uv_poll_init(p::Ptr{Cvoid}, sock::curl_socket_t) - @check_iolock ccall(:uv_poll_init, Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, curl_socket_t), Base.eventloop(), p, sock) -end - -function uv_poll_start(p::Ptr{Cvoid}, events::Integer, cb::Ptr{Cvoid}) - @check_iolock ccall(:uv_poll_start, Cint, - (Ptr{Cvoid}, Cint, Ptr{Cvoid}), p, events, cb) -end - -function uv_poll_stop(p::Ptr{Cvoid}) - @check_iolock ccall(:uv_poll_stop, Cint, (Ptr{Cvoid},), p) -end - -function uv_close(p::Ptr{Cvoid}, cb::Ptr{Cvoid}) - Base.iolock_begin() - ccall(:uv_close, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), p, cb) - Base.iolock_end() -end - # additional libcurl methods function curl_multi_socket_action(multi_handle, s, ev_bitmask)