-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make 'read!(::Base.LibuvStream, ::Array{UInt8, 1})' indicate the number of bytes actually received on EOFError #14630
Conversation
This is one of several methods that is broken for UDPSocket or any other subtype of LibuvStream that does not have a buffer field. |
a LibuvStream should be assumed to implement fields of those names if meaningful, or to fail to find them if not meaningful (for example: |
Without the fields that "the same operations" are assuming to be present, UDPSocket doesn't support the same operations. Ref #14552, it should probably not be a subtype if it doesn't implement the assumed interface. |
56a0c62
to
67b2f9d
Compare
Thanks for explanation. I little expected that it may have something to do with UDPSocket. I thought it is something concerned with GC and the stream buffer substitution trick in the read!(::Base.LibuvStream, ::Array{UInt8, 1}) for array size greater than SZ_UNBUFFERED_IO (65535 bytes) . Of course UDP socket is not of a "stream" type since it is not involve establishing a connection between both sides of communication. And in libuv itself uv_udp_t is not a "subtype" of uv_stream_t like uv_tcp_t is. Perhaps a more general supertype for all libuv-backed IO objects should be considered, e.g:
|
67b2f9d
to
b3f693a
Compare
if !isempty(s.readnotify.waitq) | ||
start_reading(x) # resume reading iff there are currently other read clients of the stream | ||
start_reading(s) # resume reading iff there are currently other read clients of the stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where did x come from? seems like like a typo and missing test coverage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no one having ever seen a UndefVarError or suchlike from here suggests that this branch has very little chance to be executed in concurrency context.
I also suspect that after the stream's buffer has been switched, all other simultaneous pended reads from the same socket that are waiting for the readnotify event in wait_readnb() loop begin to check the loop condition while isopen(x) && nb_available(x.buffer) < nb
against the substituted buffer. But when they return from the wait_readnb() and strat copying data (i.e. read!(sbuf, a)
) they do it from their saved local reference sbuf
rather than from the actual stream's buffer.
Yeah, I like that idea. Can be separate from this PR of course. Adding backport pending label for the typo fix commit. If you can come up with a test that hits that branch that'd be much appreciated. |
rethrow() | ||
end | ||
finally | ||
close(c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
b3f693a
to
8985362
Compare
It seems to me that The Note that there is ongoing discussion about the names of these functions. See also Jeff's comment here: #14660 (comment) |
If we want to merge |
There is no method of readbytes!() for
Which delegates the actual byte by byte read operation to
Apparently this is slow. (This have just now recalled me the PR #14667 which improves the performance for |
I was thinking that something like this might be a better way to get the effect you want: --- a/base/stream.jl
+++ b/base/stream.jl
@@ -881,8 +881,7 @@ function readbytes(stream::LibuvStream)
return takebuf_array(stream.buffer)
end
-function read!(s::LibuvStream, a::Array{UInt8, 1})
- nb = length(a)
+function readbytes!(s::LibuvStream, a::Array{UInt8, 1}, nb=length(a))
sbuf = s.buffer
@assert sbuf.seekable == false
@assert sbuf.maxsize >= nb
@@ -893,7 +892,7 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buf
wait_readnb(s, nb)
- read!(sbuf, a)
+ r = readbytes!(sbuf, a, nb)
else
try
stop_reading(s) # Just playing it safe, since we are going to switch buffers.
@@ -902,6 +901,7 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
s.buffer = newbuf
write(newbuf, sbuf)
wait_readnb(s, nb)
+ r = nb_available(newbuf)
finally
s.buffer = sbuf
if !isempty(s.readnotify.waitq)
@@ -909,7 +909,16 @@ function read!(s::LibuvStream, a::Array{UInt8, 1})
end
end
end
- return a
+ return r
+end
+
+function read!(s::LibuvStream, b::Array{UInt8, 1})
+ nb = length(b)
+ r = readbytes!(s, a, nb)
+ if r < nb
+ throw(EOFError())
+ end
+ return b
end |
I'm working on a test case to compare the behaviour of various |
It seems that there is, but that it maybe doesn't play nicely with the SZ_UNBUFFERED_IO optimisation... |
As far as I've understood you suggest to turn |
8985362
to
bcc6e2f
Compare
Taken from: JuliaLang#14630
make 'read!(::Base.LibuvStream, ::Array{UInt8, 1})' indicate the number of bytes actually received on EOFError
... and perhaps not lose data on UVError.
Before doing the changes I want to ask - what is the purpose of employing an additional reference to the LibuvStream internal buffer in the following excerpts?
Why are the functions not like just:
I want to change
read!(::Base.LibuvStream, ::Array{UInt8, 1})
so that on exceptions it resizes its array argument to be able somehow indicate the number of bytes actually received.