Skip to content

Commit

Permalink
use FDWatcher instead of libuv poll API
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanKarpinski committed Oct 26, 2021
1 parent 11493ff commit 4c1d2af
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 94 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "1.5.1"

[deps]
ArgTools = "0dad84c5-d112-42e6-8d28-ef12dabb789f"
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21"
NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908"

Expand Down
3 changes: 2 additions & 1 deletion src/Curl/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ using LibCURL: curl_off_t
const CURLE_PEER_FAILED_VERIFICATION = 60
const CURLSSLOPT_REVOKE_BEST_EFFORT = 1 << 3

using FileWatching
using NetworkOptions
using Base: preserve_handle, unpreserve_handle
using Base: OS_HANDLE, preserve_handle, unpreserve_handle

include("utils.jl")

Expand Down
76 changes: 30 additions & 46 deletions src/Curl/Multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function remove_handle(multi::Multi, easy::Easy)
multi.timer = Timer(multi.grace/1000)
@async begin
wait(multi.timer)
done!(multi)
isopen(multi.timer) && done!(multi)
end
end
unpreserve_handle(multi)
Expand All @@ -65,15 +65,14 @@ function set_defaults(multi::Multi)
# currently no defaults
end

# libuv callbacks
# multi-socket handle state updates

struct CURLMsg
msg :: CURLMSG
easy :: Ptr{Cvoid}
code :: CURLcode
end

# should already be locked
function check_multi_info(multi::Multi)
while true
p = curl_multi_info_read(multi.handle, Ref{Cint}())
Expand All @@ -96,25 +95,6 @@ function check_multi_info(multi::Multi)
end
end

function event_callback(
uv_poll_p :: Ptr{Cvoid},
status :: Cint,
events :: Cint,
)::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_poll_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
sock_p = uv_poll_p + Base._sizeof_uv_poll
sock = unsafe_load(convert(Ptr{curl_socket_t}, sock_p))
flags = 0
events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN)
events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT)
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, sock, flags)
check_multi_info(multi)
end
end

# curl callbacks

function do_multi(multi::Multi)
Expand Down Expand Up @@ -151,38 +131,42 @@ function socket_callback(
sock :: curl_socket_t,
action :: Cint,
multi_p :: Ptr{Cvoid},
uv_poll_p :: Ptr{Cvoid},
watcher_p :: Ptr{Cvoid},
)::Cint
if action (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE)
@async @error("socket_callback: unexpected action", action)
return -1
end
multi = unsafe_pointer_to_objref(multi_p)::Multi
if watcher_p != C_NULL
old_watcher = unsafe_pointer_to_objref(watcher_p)::FDWatcher
@check curl_multi_assign(multi.handle, sock, C_NULL)
unpreserve_handle(old_watcher)
end
if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT)
if uv_poll_p == C_NULL
uv_poll_p = uv_poll_alloc()
uv_poll_init(uv_poll_p, sock)
## TODO: use a member access API
unsafe_store!(convert(Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p)
sock_p = uv_poll_p + Base._sizeof_uv_poll
unsafe_store!(convert(Ptr{curl_socket_t}, sock_p), sock)
lock(multi.lock) do
@check curl_multi_assign(multi.handle, sock, uv_poll_p)
readable = action in (CURL_POLL_IN, CURL_POLL_INOUT)
writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT)
watcher = FDWatcher(OS_HANDLE(sock), readable, writable)
preserve_handle(watcher)
watcher_p = pointer_from_objref(watcher)
@check curl_multi_assign(multi.handle, sock, watcher_p)
task = @async while true
events = try wait(watcher)
catch err
err isa EOFError && break
rethrow()
end
end
events = 0
action != CURL_POLL_IN && (events |= UV_WRITABLE)
action != CURL_POLL_OUT && (events |= UV_READABLE)
event_cb = @cfunction(event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint))
uv_poll_start(uv_poll_p, events, event_cb)
elseif action == CURL_POLL_REMOVE
if uv_poll_p != C_NULL
uv_poll_stop(uv_poll_p)
uv_close(uv_poll_p, cglobal(:jl_free))
flags = CURL_CSELECT_IN * isreadable(events) +
CURL_CSELECT_OUT * iswritable(events) +
CURL_CSELECT_ERR * events.disconnect
lock(multi.lock) do
@check curl_multi_assign(multi.handle, sock, C_NULL)
@check curl_multi_socket_action(multi.handle, sock, flags)
check_multi_info(multi)
end
end
else
@async @error("socket_callback: unexpected action", action)
return -1
@isdefined(errormonitor) && errormonitor(task)
end
@isdefined(old_watcher) && close(old_watcher)
return 0
end

Expand Down
52 changes: 5 additions & 47 deletions src/Curl/utils.jl
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# basic C stuff

if !@isdefined(contains)
contains(haystack, needle) = occursin(needle, haystack)
export contains
end

# basic C stuff

puts(s::Union{String,SubString{String}}) = ccall(:puts, Cint, (Ptr{Cchar},), s)

jl_malloc(n::Integer) = ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), n)

# check if a function or C call failed
# check if a call failed

function check(ex::Expr, lock::Bool)
macro check(ex::Expr)
ex.head == :call ||
error("@check: not a call: $ex")
arg1 = ex.args[1] :: Symbol
Expand All @@ -24,55 +24,13 @@ function check(ex::Expr, lock::Bool)
f = arg1
end
prefix = "$f: "
ex = esc(ex)
if lock
ex = quote
Base.iolock_begin()
value = $ex
Base.iolock_end()
value
end
end
quote
r = $ex
r = $(esc(ex))
iszero(r) || @async @error($prefix * string(r))
r
end
end

macro check(ex::Expr) check(ex, false) end
macro check_iolock(ex::Expr) check(ex, true) end

# some libuv wrappers

const UV_READABLE = 1
const UV_WRITABLE = 2

function uv_poll_alloc()
# allocate memory for: uv_poll_t struct + extra for curl_socket_t
jl_malloc(Base._sizeof_uv_poll + sizeof(curl_socket_t))
end

function uv_poll_init(p::Ptr{Cvoid}, sock::curl_socket_t)
@check_iolock ccall(:uv_poll_init, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}, curl_socket_t), Base.eventloop(), p, sock)
end

function uv_poll_start(p::Ptr{Cvoid}, events::Integer, cb::Ptr{Cvoid})
@check_iolock ccall(:uv_poll_start, Cint,
(Ptr{Cvoid}, Cint, Ptr{Cvoid}), p, events, cb)
end

function uv_poll_stop(p::Ptr{Cvoid})
@check_iolock ccall(:uv_poll_stop, Cint, (Ptr{Cvoid},), p)
end

function uv_close(p::Ptr{Cvoid}, cb::Ptr{Cvoid})
Base.iolock_begin()
ccall(:uv_close, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), p, cb)
Base.iolock_end()
end

# additional libcurl methods

function curl_multi_socket_action(multi_handle, s, ev_bitmask)
Expand Down

0 comments on commit 4c1d2af

Please sign in to comment.