Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@async HTTP.get returns empty body. #117

Closed
samoconnor opened this issue Nov 15, 2017 · 3 comments
Closed

@async HTTP.get returns empty body. #117

samoconnor opened this issue Nov 15, 2017 · 3 comments

Comments

@samoconnor
Copy link
Contributor

samoconnor commented Nov 15, 2017

[ Update: See PR #119 ]

@sync for i in 1:3
    @async @show String(HTTP.get("http://octe.ch"))
end

String(HTTP.get("http://octe.ch")) = "<HEAD>\n<TITLE>octe.ch</TITLE> ... [snip] ... </BODY>\n"
String(HTTP.get("http://octe.ch")) = ""
String(HTTP.get("http://octe.ch")) = ""

I've also tried this with my branch where HTTP.FIFOBuffer is replaced by Base.BufferStream. The behaviour is the same. #110

Thanks @jingpengw for reporting this JuliaCloud/AWSS3.jl#19 (comment),
and contributing a test case JuliaCloud/AWSS3.jl@5abbcaa

@samoconnor samoconnor changed the title @sync HTTP.get returns empty body. @async HTTP.get returns empty body. Nov 15, 2017
@samoconnor
Copy link
Contributor Author

It looks like there is non-task-safe global state in HTTP.DEFAULT_CLIENT.
Constructing a new HTTP.Client for each get works:

@sync for i in 1:3
           @async @show String(HTTP.get(HTTP.Client(), "http://octe.ch"))
       end
String(HTTP.get(HTTP.Client(), "http://octe.ch")) = "<HEAD>\n<TITLE>octe.ch</TITLE>... "
String(HTTP.get(HTTP.Client(), "http://octe.ch")) = "<HEAD>\n<TITLE>octe.ch</TITLE>... "
String(HTTP.get(HTTP.Client(), "http://octe.ch")) = "<HEAD>\n<TITLE>octe.ch</TITLE>... "

@samoconnor
Copy link
Contributor Author

samoconnor commented Nov 16, 2017

The patch below puts a ReentrantLock around connection pool access and moves the Parser object from the Client struct to the Connection struct.

i.e. Don't share a connection between tasks and don't share a parser between connections.

This works. Not sure if it is the best fix.

diff --git a/src/client.jl b/src/client.jl
index 7f774bb..cae412c 100644
--- a/src/client.jl
+++ b/src/client.jl
@@ -11,10 +11,11 @@ mutable struct Connection{I <: IO}
     id::Int
     socket::I
     state::ConnectionState
+    parser::Parser
 end
 
-Connection(tcp::IO) = Connection(0, tcp, Busy)
-Connection(id::Int, tcp::IO) = Connection(id, tcp, Busy)
+Connection(tcp::IO) = Connection(0, tcp, Busy, Parser())
+Connection(id::Int, tcp::IO) = Connection(id, tcp, Busy, Parser())
 busy!(conn::Connection) = (conn.state == Dead || (conn.state = Busy); return nothing)
 idle!(conn::Connection) = (conn.state == Dead || (conn.state = Idle); return nothing)
 dead!(conn::Connection) = (conn.state == Dead || (conn.state = Dead; close(conn.socket)); return nothing)
@@ -42,22 +43,23 @@ Additional keyword arguments can be passed that will get transmitted with each H
 """
 mutable struct Client
     # connection pools for keep-alive; key is host
+    poollock::ReentrantLock
     httppool::Dict{String, Vector{Connection{TCPSocket}}}
     httpspool::Dict{String, Vector{Connection{TLS.SSLContext}}}
     # cookies are stored in-memory per host and automatically sent when appropriate
     cookies::Dict{String, Set{Cookie}}
     # buffer::Vector{UInt8} #TODO: create a fixed size buffer for reading bytes off the wire and having http_parser use, this should keep allocations down, need to make sure MbedTLS supports blocking readbytes!
-    parser::Parser
     logger::Option{IO}
     # global request settings
     options::RequestOptions
     connectioncount::Int
 end
 
-Client(logger::Option{IO}, options::RequestOptions) = Client(Dict{String, Vector{Connection{TCPSocket}}}(),
+Client(logger::Option{IO}, options::RequestOptions) = Client(ReentrantLock(),
+                                                     Dict{String, Vector{Connection{TCPSocket}}}(),
                                                      Dict{String, Vector{Connection{TLS.SSLContext}}}(),
                                                      Dict{String, Set{Cookie}}(),
-                                                     Parser(), logger, options, 1)
+                                                     logger, options, 1)
 
 # this is where we provide all the default request options
 const DEFAULT_OPTIONS = :((nothing, true, 15.0, 15.0, nothing, 5, true, false, 3, true, true, false, true, true))
@@ -153,7 +155,9 @@ function stalebytes!(c::TCPSocket)
 end
 stalebytes!(c::TLS.SSLContext) = stalebytes!(c.bio)
 
 function connect(client, sch, hostname, port, opts, verbose)
+    @lock client.poollock begin
     logger = client.logger
     if haskey(sch, client, hostname)
         @log "checking if any existing connections to '$hostname' are re-usable"
@@ -199,6 +203,7 @@ function connect(client, sch, hostname, port, opts, verbose)
     catch e
         throw(ConnectError(e, backtrace()))
     end
+    end
 end
 
 function addcookies!(client, host, req, verbose)
@@ -282,7 +287,7 @@ function processresponse!(client, conn, response, host, method, maintask, stream
         buffer, err = getbytes(conn.socket, tm)
         @log "received bytes from the wire, processing"
         # EH: throws a couple of "shouldn't get here" errors; probably not much we can do
-        errno, headerscomplete, messagecomplete, upgrade = HTTP.parse!(response, client.parser, buffer; host=host, method=method, maintask=maintask, canonicalizeheaders=canonicalizeheaders)
+        errno, headerscomplete, messagecomplete, upgrade = HTTP.parse!(response, conn.parser, buffer; host=host, method=method, maintask=maintask, canonicalizeheaders=canonicalizeheaders)
         @log "parsed bytes received from wire"
         if length(buffer) == 0 && !isopen(conn.socket) && !messagecomplete
             @log "socket closed before full response received"
@@ -295,7 +300,7 @@ function processresponse!(client, conn, response, host, method, maintask, stream
             dead!(conn)
             throw(ParsingError("error parsing response: $(ParsingErrorCodeMap[errno])\nCurrent response buffer contents: $(String(buffer))"))
         elseif messagecomplete
-            http_should_keep_alive(client.parser, response) || (@log("closing connection (no keep-alive)"); dead!(conn))
+            http_should_keep_alive(conn.parser, response) || (@log("closing connection (no keep-alive)"); dead!(conn))
             # idle! on a Dead will stay Dead
             idle!(conn)
             return true, StatusError(status(response), response)
@@ -322,7 +327,7 @@ function request(client::Client, req::Request, opts::RequestOptions, stream::Boo
     conn = @retryif ClosedError 4 connectandsend(client, sch, host, ifelse(p == "", "80", p), req, opts, verbose)
     
     response = Response(stream ? 2^24 : FIFOBuffers.DEFAULT_MAX, req)
-    reset!(client.parser)
+    reset!(conn.parser)
     success, err = processresponse!(client, conn, response, host, HTTP.method(req), current_task(), stream, opts.readtimeout::Float64, opts.canonicalizeheaders::Bool, verbose)
     if !success
         retry >= opts.retries::Int && throw(err)
diff --git a/src/utils.jl b/src/utils.jl
index 33d4561..125ba7a 100644
--- a/src/utils.jl
+++ b/src/utils.jl
@@ -190,3 +190,20 @@ function iso8859_1_to_utf8(bytes::Vector{UInt8})
     end
     return String(take!(io))
 end
+
+macro lock(l, expr)
+    esc(quote
+        lock($l)
+        try
+            $expr
+        catch
+            rethrow()
+        finally
+            unlock($l)
+        end
+    end)
+end

@xiuliren
Copy link

xiuliren commented Nov 20, 2017

@samoconnor I just tested, your branch works for me. Thanks!

can anyone take a look of the PR #119 ?
cc @quinnj @shashi

@quinnj quinnj closed this as completed Nov 26, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants