From 898536245ee7b3ceac5037af7dc8eaa2b56472b2 Mon Sep 17 00:00:00 2001 From: Mike Usenko Date: Sat, 16 Jan 2016 19:02:45 +0300 Subject: [PATCH] make read!() resize its array argument on EOFError Make 'read!(::Base.LibuvStream, ::Array{UInt8, 1})' resize its array argument on EOFError to indicate the number of bytes actually received. --- base/stream.jl | 12 ++++++++-- test/choosetests.jl | 4 ++-- test/stream.jl | 57 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 test/stream.jl diff --git a/base/stream.jl b/base/stream.jl index b3669a6b2c7de0..ed32c7259f9cc4 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -913,13 +913,19 @@ function read!(s::LibuvStream, a::Array{UInt8, 1}) @assert sbuf.seekable == false @assert sbuf.maxsize >= nb - if nb_available(sbuf) >= nb + nba = nb_available(sbuf) + if nba >= nb return read!(sbuf, a) end if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer wait_readnb(s, nb) - read!(sbuf, a) + nba = nb_available(sbuf) + try + read!(sbuf, a) + finally + nba < nb && resize!(a, nba) + end else try stop_reading(s) # Just playing it safe, since we are going to switch buffers. @@ -929,7 +935,9 @@ function read!(s::LibuvStream, a::Array{UInt8, 1}) write(newbuf, sbuf) wait_readnb(s, nb) finally + nba = nb_available(s.buffer) s.buffer = sbuf + nba < nb && resize!(a, nba) if !isempty(s.readnotify.waitq) start_reading(s) # resume reading iff there are currently other read clients of the stream end diff --git a/test/choosetests.jl b/test/choosetests.jl index b9346a96758eed..4e4005768b49b9 100644 --- a/test/choosetests.jl +++ b/test/choosetests.jl @@ -33,7 +33,7 @@ function choosetests(choices = []) "base64", "serialize", "functors", "misc", "threads", "enums", "cmdlineargs", "i18n", "workspace", "libdl", "int", "checked", "intset", "floatfuncs", "compile", "parallel", "inline", - "boundscheck" + "boundscheck", "stream" ] if Base.USE_GPL_LIBS @@ -79,7 +79,7 @@ function choosetests(choices = []) prepend!(tests, linalgtests) end - net_required_for = ["socket", "parallel"] + net_required_for = ["socket", "parallel", "stream"] net_on = true try getipaddr() diff --git a/test/stream.jl b/test/stream.jl new file mode 100644 index 00000000000000..28a93d6e418fe4 --- /dev/null +++ b/test/stream.jl @@ -0,0 +1,57 @@ +# This file is a part of Julia. License is MIT: http://julialang.org/license + + +using Base.Test + + +let + # PR#14627 + Base.connect!(sock::TCPSocket, addr::Base.InetAddr) = Base.connect!(sock, addr.host, addr.port) + + addr = Base.InetAddr(ip"127.0.0.1", 4444) + srv = listen(addr) + + function oneshot_accept(_srv::Base.TCPServer, _send::Vector{UInt8}) + @async try + c = accept(_srv) + write(c, _send) + close(c) + end + yield() + nothing + end + + # method 'read!(::Base.LibuvStream, ::Array{UInt8, 1})' + # resizes its array argument on EOFError + # to indicate the number of bytes actually received + send_buf = UInt8[0,1,2] + recv_buf = UInt8[5,5,5,5,5] + oneshot_accept(srv, send_buf) + c = connect(addr) + try + read!(c, recv_buf) + catch x + if isa(x,EOFError) + @test length(recv_buf) == length(send_buf) # receive buffer resized + @test recv_buf == send_buf # check up the content + else + rethrow() + end + finally + close(c) + end + + # test a normal (nonexceptional) case + send_buf = UInt8[0,1,2,3,4] + recv_buf = UInt8[5,5,5] + recvbuf_len = length(recv_buf) + oneshot_accept(srv, send_buf) + c = connect(addr) + read!(c, recv_buf) + @test length(recv_buf) == recvbuf_len # receive buffer's length not changed + @test recv_buf == send_buf[1:recvbuf_len] # check up the content + close(c) + + close(srv) +end +