Skip to content

Commit

Permalink
make read!() resize its array argument on EOFError
Browse files Browse the repository at this point in the history
Make 'read!(::Base.LibuvStream, ::Array{UInt8, 1})' resize its array
argument on EOFError to indicate the number of bytes actually received.
  • Loading branch information
Mike Usenko committed Jan 16, 2016
1 parent 85512d6 commit 8985362
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 4 deletions.
12 changes: 10 additions & 2 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -913,13 +913,19 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb

if nb_available(sbuf) >= nb
nba = nb_available(sbuf)
if nba >= nb
return read!(sbuf, a)
end

if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
wait_readnb(s, nb)
read!(sbuf, a)
nba = nb_available(sbuf)
try
read!(sbuf, a)
finally
nba < nb && resize!(a, nba)
end
else
try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
Expand All @@ -929,7 +935,9 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
write(newbuf, sbuf)
wait_readnb(s, nb)
finally
nba = nb_available(s.buffer)
s.buffer = sbuf
nba < nb && resize!(a, nba)
if !isempty(s.readnotify.waitq)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
Expand Down
4 changes: 2 additions & 2 deletions test/choosetests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function choosetests(choices = [])
"base64", "serialize", "functors", "misc", "threads",
"enums", "cmdlineargs", "i18n", "workspace", "libdl", "int",
"checked", "intset", "floatfuncs", "compile", "parallel", "inline",
"boundscheck"
"boundscheck", "stream"
]

if Base.USE_GPL_LIBS
Expand Down Expand Up @@ -79,7 +79,7 @@ function choosetests(choices = [])
prepend!(tests, linalgtests)
end

net_required_for = ["socket", "parallel"]
net_required_for = ["socket", "parallel", "stream"]
net_on = true
try
getipaddr()
Expand Down
57 changes: 57 additions & 0 deletions test/stream.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license


using Base.Test


let
# PR#14627
Base.connect!(sock::TCPSocket, addr::Base.InetAddr) = Base.connect!(sock, addr.host, addr.port)

addr = Base.InetAddr(ip"127.0.0.1", 4444)
srv = listen(addr)

function oneshot_accept(_srv::Base.TCPServer, _send::Vector{UInt8})
@async try
c = accept(_srv)
write(c, _send)
close(c)
end
yield()
nothing
end

# method 'read!(::Base.LibuvStream, ::Array{UInt8, 1})'
# resizes its array argument on EOFError
# to indicate the number of bytes actually received
send_buf = UInt8[0,1,2]
recv_buf = UInt8[5,5,5,5,5]
oneshot_accept(srv, send_buf)
c = connect(addr)
try
read!(c, recv_buf)
catch x
if isa(x,EOFError)
@test length(recv_buf) == length(send_buf) # receive buffer resized
@test recv_buf == send_buf # check up the content
else
rethrow()
end
finally
close(c)
end

# test a normal (nonexceptional) case
send_buf = UInt8[0,1,2,3,4]
recv_buf = UInt8[5,5,5]
recvbuf_len = length(recv_buf)
oneshot_accept(srv, send_buf)
c = connect(addr)
read!(c, recv_buf)
@test length(recv_buf) == recvbuf_len # receive buffer's length not changed
@test recv_buf == send_buf[1:recvbuf_len] # check up the content
close(c)

close(srv)
end

0 comments on commit 8985362

Please sign in to comment.