diff --git a/Project.toml b/Project.toml index 4189b77..523343b 100644 --- a/Project.toml +++ b/Project.toml @@ -5,7 +5,6 @@ version = "1.5.1" [deps] ArgTools = "0dad84c5-d112-42e6-8d28-ef12dabb789f" -FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21" NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908" diff --git a/src/Curl/Curl.jl b/src/Curl/Curl.jl index fcfea37..e4f5171 100644 --- a/src/Curl/Curl.jl +++ b/src/Curl/Curl.jl @@ -26,38 +26,15 @@ export remove_handle using LibCURL -using LibCURL: curl_off_t, libcurl +using LibCURL: curl_off_t # not exported: https://github.com/JuliaWeb/LibCURL.jl/issues/87 # constants that LibCURL should have but doesn't const CURLE_PEER_FAILED_VERIFICATION = 60 const CURLSSLOPT_REVOKE_BEST_EFFORT = 1 << 3 -# these are incorrectly defined on Windows by LibCURL: -if Sys.iswindows() - const curl_socket_t = Base.OS_HANDLE - const CURL_SOCKET_TIMEOUT = Base.INVALID_OS_HANDLE -else - const curl_socket_t = Cint - const CURL_SOCKET_TIMEOUT = -1 -end - -# definitions affected by incorrect curl_socket_t (copied verbatim): -function curl_multi_socket_action(multi_handle, s, ev_bitmask, running_handles) - ccall((:curl_multi_socket_action, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Cint, Ptr{Cint}), multi_handle, s, ev_bitmask, running_handles) -end -function curl_multi_assign(multi_handle, sockfd, sockp) - ccall((:curl_multi_assign, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Ptr{Cvoid}), multi_handle, sockfd, sockp) -end - -# additional curl_multi_socket_action method -function curl_multi_socket_action(multi_handle, s, ev_bitmask) - curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}()) -end - -using FileWatching using NetworkOptions -using Base: OS_HANDLE, preserve_handle, unpreserve_handle +using Base: preserve_handle, unpreserve_handle include("utils.jl") diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index c56b94a..eb16d00 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -1,14 +1,17 @@ mutable struct Multi lock :: ReentrantLock handle :: Ptr{Cvoid} - timer :: Timer + timer :: Ptr{Cvoid} easies :: Vector{Easy} grace :: UInt64 function Multi(grace::Integer = typemax(UInt64)) - multi = new(ReentrantLock(), C_NULL, Timer(0), Easy[], grace) + timer = jl_malloc(Base._sizeof_uv_timer) + uv_timer_init(timer) + multi = new(ReentrantLock(), C_NULL, timer, Easy[], grace) finalizer(multi) do multi - close(multi.timer) + uv_timer_stop(multi.timer) + uv_close(multi.timer, cglobal(:jl_free)) done!(multi) end end @@ -29,11 +32,19 @@ end # adding & removing easy handles +function cleanup_callback(uv_timer_p::Ptr{Cvoid})::Cvoid + ## TODO: use a member access API + multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p)) + multi = unsafe_pointer_to_objref(multi_p)::Multi + done!(multi) + return +end + function add_handle(multi::Multi, easy::Easy) lock(multi.lock) do if isempty(multi.easies) preserve_handle(multi) - close(multi.timer) # stop grace timer + uv_timer_stop(multi.timer) # stop grace timer end push!(multi.easies, easy) init!(multi) @@ -46,14 +57,11 @@ function remove_handle(multi::Multi, easy::Easy) @check curl_multi_remove_handle(multi.handle, easy.handle) deleteat!(multi.easies, findlast(==(easy), multi.easies)::Int) !isempty(multi.easies) && return + cleanup_cb = @cfunction(cleanup_callback, Cvoid, (Ptr{Cvoid},)) if multi.grace <= 0 done!(multi) elseif 0 < multi.grace < typemax(multi.grace) - multi.timer = Timer(multi.grace/1000) - @async begin - wait(multi.timer) - isopen(multi.timer) && done!(multi) - end + uv_timer_start(multi.timer, cleanup_cb, multi.grace, 0) end unpreserve_handle(multi) end @@ -65,7 +73,7 @@ function set_defaults(multi::Multi) # currently no defaults end -# multi-socket handle state updates +# libuv callbacks struct CURLMsg msg :: CURLMSG @@ -73,6 +81,7 @@ struct CURLMsg code :: CURLcode end +# should already be locked function check_multi_info(multi::Multi) while true p = curl_multi_info_read(multi.handle, Ref{Cint}()) @@ -95,15 +104,37 @@ function check_multi_info(multi::Multi) end end -# curl callbacks +function event_callback( + uv_poll_p :: Ptr{Cvoid}, + status :: Cint, + events :: Cint, +)::Cvoid + ## TODO: use a member access API + multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_poll_p)) + multi = unsafe_pointer_to_objref(multi_p)::Multi + sock_p = uv_poll_p + Base._sizeof_uv_poll + sock = unsafe_load(convert(Ptr{curl_socket_t}, sock_p)) + flags = 0 + events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN) + events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT) + lock(multi.lock) do + @check curl_multi_socket_action(multi.handle, sock, flags) + check_multi_info(multi) + end +end -function do_multi(multi::Multi) +function timeout_callback(uv_timer_p::Ptr{Cvoid})::Cvoid + ## TODO: use a member access API + multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p)) + multi = unsafe_pointer_to_objref(multi_p)::Multi lock(multi.lock) do @check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0) check_multi_info(multi) end end +# curl callbacks + function timer_callback( multi_h :: Ptr{Cvoid}, timeout_ms :: Clong, @@ -112,13 +143,15 @@ function timer_callback( multi = unsafe_pointer_to_objref(multi_p)::Multi @assert multi_h == multi.handle if timeout_ms == 0 - do_multi(multi) - elseif timeout_ms >= 0 - multi.timer = Timer(timeout_ms/1000) do timer - do_multi(multi) + lock(multi.lock) do + @check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0) + check_multi_info(multi) end + elseif timeout_ms >= 0 + timeout_cb = @cfunction(timeout_callback, Cvoid, (Ptr{Cvoid},)) + uv_timer_start(multi.timer, timeout_cb, max(1, timeout_ms), 0) elseif timeout_ms == -1 - close(multi.timer) + uv_timer_stop(multi.timer) else @async @error("timer_callback: invalid timeout value", timeout_ms) return -1 @@ -131,47 +164,46 @@ function socket_callback( sock :: curl_socket_t, action :: Cint, multi_p :: Ptr{Cvoid}, - watcher_p :: Ptr{Cvoid}, + uv_poll_p :: Ptr{Cvoid}, )::Cint - if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE) - @async @error("socket_callback: unexpected action", action) - return -1 - end multi = unsafe_pointer_to_objref(multi_p)::Multi - if watcher_p != C_NULL - old_watcher = unsafe_pointer_to_objref(watcher_p)::FDWatcher - @check curl_multi_assign(multi.handle, sock, C_NULL) - unpreserve_handle(old_watcher) - end if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT) - readable = action in (CURL_POLL_IN, CURL_POLL_INOUT) - writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT) - watcher = FDWatcher(OS_HANDLE(sock), readable, writable) - preserve_handle(watcher) - watcher_p = pointer_from_objref(watcher) - @check curl_multi_assign(multi.handle, sock, watcher_p) - task = @async while true - events = try wait(watcher) - catch err - err isa EOFError && break - rethrow() + if uv_poll_p == C_NULL + uv_poll_p = uv_poll_alloc() + uv_poll_init(uv_poll_p, sock) + ## TODO: use a member access API + unsafe_store!(convert(Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p) + sock_p = uv_poll_p + Base._sizeof_uv_poll + unsafe_store!(convert(Ptr{curl_socket_t}, sock_p), sock) + lock(multi.lock) do + @check curl_multi_assign(multi.handle, sock, uv_poll_p) end - flags = CURL_CSELECT_IN * isreadable(events) + - CURL_CSELECT_OUT * iswritable(events) + - CURL_CSELECT_ERR * events.disconnect + end + events = 0 + action != CURL_POLL_IN && (events |= UV_WRITABLE) + action != CURL_POLL_OUT && (events |= UV_READABLE) + event_cb = @cfunction(event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint)) + uv_poll_start(uv_poll_p, events, event_cb) + elseif action == CURL_POLL_REMOVE + if uv_poll_p != C_NULL + uv_poll_stop(uv_poll_p) + uv_close(uv_poll_p, cglobal(:jl_free)) lock(multi.lock) do - @check curl_multi_socket_action(multi.handle, sock, flags) - check_multi_info(multi) + @check curl_multi_assign(multi.handle, sock, C_NULL) end end - @isdefined(errormonitor) && errormonitor(task) + else + @async @error("socket_callback: unexpected action", action) + return -1 end - @isdefined(old_watcher) && close(old_watcher) return 0 end function add_callbacks(multi::Multi) + # stash multi handle pointer in timer multi_p = pointer_from_objref(multi) + ## TODO: use a member access API + unsafe_store!(convert(Ptr{Ptr{Cvoid}}, multi.timer), multi_p) # set timer callback timer_cb = @cfunction(timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid})) diff --git a/src/Curl/utils.jl b/src/Curl/utils.jl index e7aa08a..18b7456 100644 --- a/src/Curl/utils.jl +++ b/src/Curl/utils.jl @@ -1,17 +1,17 @@ +# basic C stuff + if !@isdefined(contains) contains(haystack, needle) = occursin(needle, haystack) export contains end -# basic C stuff - puts(s::Union{String,SubString{String}}) = ccall(:puts, Cint, (Ptr{Cchar},), s) jl_malloc(n::Integer) = ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), n) -# check if a call failed +# check if a function or C call failed -macro check(ex::Expr) +function check(ex::Expr, lock::Bool) ex.head == :call || error("@check: not a call: $ex") arg1 = ex.args[1] :: Symbol @@ -24,13 +24,75 @@ macro check(ex::Expr) f = arg1 end prefix = "$f: " + ex = esc(ex) + if lock + ex = quote + Base.iolock_begin() + value = $ex + Base.iolock_end() + value + end + end quote - r = $(esc(ex)) + r = $ex iszero(r) || @async @error($prefix * string(r)) r end end +macro check(ex::Expr) check(ex, false) end +macro check_iolock(ex::Expr) check(ex, true) end + +# some libuv wrappers + +const UV_READABLE = 1 +const UV_WRITABLE = 2 + +function uv_poll_alloc() + # allocate memory for: uv_poll_t struct + extra for curl_socket_t + jl_malloc(Base._sizeof_uv_poll + sizeof(curl_socket_t)) +end + +function uv_poll_init(p::Ptr{Cvoid}, sock::curl_socket_t) + @check_iolock ccall(:uv_poll_init, Cint, + (Ptr{Cvoid}, Ptr{Cvoid}, curl_socket_t), Base.eventloop(), p, sock) +end + +function uv_poll_start(p::Ptr{Cvoid}, events::Integer, cb::Ptr{Cvoid}) + @check_iolock ccall(:uv_poll_start, Cint, + (Ptr{Cvoid}, Cint, Ptr{Cvoid}), p, events, cb) +end + +function uv_poll_stop(p::Ptr{Cvoid}) + @check_iolock ccall(:uv_poll_stop, Cint, (Ptr{Cvoid},), p) +end + +function uv_close(p::Ptr{Cvoid}, cb::Ptr{Cvoid}) + Base.iolock_begin() + ccall(:uv_close, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), p, cb) + Base.iolock_end() +end + +function uv_timer_init(p::Ptr{Cvoid}) + @check_iolock ccall(:uv_timer_init, Cint, + (Ptr{Cvoid}, Ptr{Cvoid}), Base.eventloop(), p) +end + +function uv_timer_start(p::Ptr{Cvoid}, cb::Ptr{Cvoid}, t::Integer, r::Integer) + @check_iolock ccall(:uv_timer_start, Cint, + (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), p, cb, t, r) +end + +function uv_timer_stop(p::Ptr{Cvoid}) + @check_iolock ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), p) +end + +# additional libcurl methods + +function curl_multi_socket_action(multi_handle, s, ev_bitmask) + LibCURL.curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}()) +end + # curl string list structure struct curl_slist_t