-
Notifications
You must be signed in to change notification settings - Fork 17.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: io: add a buffered pipe #28790
Comments
I must be missing something. What's lacking from using bufio.NewReader on the read side and bufio.NewWriter on the write side? |
I imagine that would be different; for example, it's not guaranteed that a large enough write could unblock a read, if the writer forgot to flush the buffered writer. This appears to be mentioned in vpnkit's implementation. |
I don't think that's necessary, actually. The implementation of func (b *Writer) Write(p []byte) (nn int, err error) {
for len(p) > b.Available() && b.err == nil {
var n int
if b.Buffered() == 0 {
// Large write, empty buffer.
// Write directly from p to avoid copy.
n, b.err = b.wr.Write(p)
} else {
n = copy(b.buf[b.n:], p)
b.n += n
b.Flush()
}
nn += n
p = p[n:]
}
if b.err != nil {
return nn, b.err
}
n := copy(b.buf[b.n:], p)
b.n += n
nn += n
return nn, nil
} It flushes automatically whenever a write is larger than the buffer or the buffer has filled up, and there's a // Is it already a Writer?
b, ok := w.(*Writer)
if ok && len(b.buf) >= size {
return b
} from Maybe this is primarily a documentation issue. |
Yes, a An |
“Buffered pipe” seems like a very vague concept — I'd like to understand the use-cases better. How many of the existing implementations you found could this proposal replace, and how much (if any) overhead would it introduce relative to what they're using today? What properties do those implementations share in common, and what variations do they require? |
I wrote up a huge post, complete with playground example, read through the proposal again, and realized that I was completely misunderstanding this. You're asking for a pipe that's not just buffered, but asynchronous. You want the writing side to be able to just write something into a buffer and not have to worry about whether or not a reading side ever sees it, while a reading side would block until something was available, much like a buffered channel. I'm still not exactly sure why just using both a
Not sure, but it's possible that it simply doesn't cover specific edge cases for some of them, which really isn't the job of the standard library, per se. For example, the |
Can you think of a name that would be more specific, whatever its meaning may be?
From what I saw, most of the implementations are just a reimplementation of The only notable outliers were what I covered in the original post;
Correct. This is why I called it "buffered pipe", to mirror "buffered channel". |
I also just realised I forgot to mention the parallel with buffered channels in the original post. That's entirely my bad. |
To take the analogy with buffered channels a bit further, in my experience there are only three generally-useful buffer sizes for channels: 0 (synchronous messages), 1 (mutually-excluded data), and “an upper bound on the total number of elements sent” (asynchronous results). (A program that is micro-optimized for cache locality might use buffers of other sizes, but the code that actually uses those buffers pretty much always ends up equivalent to what you'd write for a buffer size of 0 or 1.) By analogy, the useful buffer sizes for an |
I think the "buffered channel" analogy only goes as far as the name and the high-level idea. It breaks quickly, because a buffered channel always passes messages intact, while an Similarly, a buffered pipe could mean that two pipe writes of 50 bytes end up as a single pipe read of 100 bytes, joining the two sent chunks of bytes. We can agree that As for “an upper bound on the total number of elements sent”, I think the equivalent in a buffered pipe would be "the maximum size of one message sent on the pipe", where one message could consist of many writes. In practice, programs tend to know the maximum message size, so they can use that knowledge to guarantee that they can write a message without blocking. (thanks to @rogpeppe for helping word this paragraph clearly) As for practical examples - net_fake uses |
FWIW you can get the "exactly one Write at a time" semantic with something like this: https://play.golang.org/p/YQ14XqARLV6 Messages don't have to be written in a single write though. |
Another way to think about it would be that |
If you know that the write won't block, why use a fixed buffer size at all? (That is, why should the pipe block when it reaches For that matter, why use an (If it's important to satisfy the |
If you mean, as opposed to having the buffer grow as needed - I'd personally prefer to be in control of how much memory a program can use. There's no way to create a buffered channel without a fixed size, for example.
Like you say further below, satisfying
I wouldn't be so sure; I imagine that a full implementation with
|
It seems pretty trivial to me: concurrent calls just need a
That may be true, but it doesn't contradict my point about buffer sizes: even if it is useful to provide a standard buffered |
Just FYI. As described in #28650, the existing net_fake.go is broken. Surely, networking stuff uses queueing theory by default, but it is just for supporting reliable data transfer and the stuff holds various complicated control as a cost. Without considering the control, future buffered pipe users might be trapped in #28650 or similar issues. For adaptation of the net package, I feel like it might be better to search a cheap way of making queueing network between the io and net packages, in another issue. |
It actually does work: https://play.golang.org/p/iGeuSFtebI- The linked program in the prior example had a subtle bug: the pipe closed before the buffer flushed.
Because the pipe has no internal buffering, calling This is WAD. Edit: Checking the error message on the buffer close method (in the original example) will yield a write on closed pipe error. |
Let me answer that question with another question :) If writing a correct buffered pipe is trivial, why do the implementations out there vary so wildly? In other words, it doesn't look like there's a well-understood canonical way to write a buffered pipe. Perhaps the solution to that will be an example or piece of documentation somewhere; I honestly don't know. This proposal was more about starting a conversation than adding an API.
That's a fair point. I haven't done a deep dive of the implmementations and use cases in the wild, so I don't have an answer right now. From what I've seen so far, buffering up to a number of bytes seems to be the most common need.
Thanks for chiming in - it does look like |
I was hoping you could answer from having looked at those implementations already. 🙂 I have a few hypotheses you could check.
I took a quick glance at the links and saw a lot of |
Chiming in as V2Ray is listed in the thread. As @bcmills suggests, the buffered pipe in V2Ray is tailored for V2Ray's use case. It is more or less a The |
While writing |
@neelance, that sounds like (4), but with the “without questioning” part as a very intentional (and reasonable!) decision. 🙂 |
Ah, whoops. Got the |
Thanks all for the input. It seems clear that there are multiple understandings of what a "buffered pipe" should be and how it should behave, so I'm thinking it's best if we don't proceed with the proposal for now. I'll leave it open for a little longer, in case anyone has anything else to add. |
Per @mvdan, closing proposal. |
I implemented (variable-sized) buffered pipe by simply wrapping bytes.Buffer. |
This is necessary to be able to unblock `Buffer` readers that are waiting for the next packet. Note: The way `Buffer` is currently used it acts much like `io.Pipe`, except that it allows packets to be buffered. There's probably a way to achieve this without requiring an allocation on every `Write`. See also golang/go#28790.
This is necessary to be able to unblock `Buffer` readers that are waiting for the next packet. Note: The way `Buffer` is currently used it acts much like `io.Pipe`, except that it allows packets to be buffered. There's probably a way to achieve this without requiring an allocation on every `Write`. See also golang/go#28790.
I would like to add a use case, and rationale for why having a limit to the input buffer size is important. Consider a remote data source, streaming bytes via network / Internet. This can be e.g. video, or a large file. Consumer, running locally, receives the byte stream and does some processing on the data. Let's say it compresses the data, or some similar bursty behaviour. The data source may vary in transfer speed, which is easy especially if travelling through a wide area network. So, you have a data source with varying transfer speed, and a bursty consumer. You want a buffer in between, to avoid stalling either end. And you want the buffer to be asynchronous, so you can have one goroutine reading from the network and another consuming the data. Think e.g. of a streaming RPC method, where you receive an arbitrarily large file. Your RPC handler spawns a data processing goroutine that reads from the buffered pipe. Then, the handler loops reading chunks of data from the stream and writing them to the buffered pipe. Finally, the handler waits on the data processing goroutine, and returns. Why a limit to the buffer size? Because RAM has a physical limit, and incoming data doesn't. You may not want to accumulate 1 GiB of streamed input in RAM if the data processor becomes slower than the network for a while. Or maybe you can live with 1 GiB, but not with 10 GiB. Or 100 GiB. So you specify a buffer size. Now, instead or letting one client kill the serving task, or bring the machine to a halt through thrashing, you get automatic throttling. Once the buffer is full, the RPC handler will block on the write to the buffered pipe, so the client will block on sending the stream. For a more general use case, just think of anything where you have a producer sending arbitrary amounts of data at variable or bursty rates, and a consumer receiving at variable or bursty rates. Even writing to tape drives (or optical discs, for that matter) will benefit from a memory buffer. Regarding the understanding of what a "buffered pipe" is, I would point to the arguably most basic of interprocess communication mechanisms, after which I presume io.Pipe was named. Having a bounded buffer and being asynchronous (in that writes are decoupled from reads) are pretty much the defining characteristics of a pipe :) Could you please revisit the decision to close? |
@israel-lugo This proposal was closed because it was not clear what it meant. As you seem to have a specific meaning in mind, I recommend that you open a new proposal describing the exact API you are after. Note in particular that |
Summary
Add a way to easily create a buffered
io.Pipe
. Currently, one has to write their own from scratch. This leads to dozens of separate implementations in the wild, and likely many correctness issues and hidden bugs.Description
io.Pipe
is synchronous, as per https://golang.org/pkg/io/#Pipe:This is fine for most use cases. However, a buffered pipe can still be useful or necessary in many situations. Below is a list of third-party implementations I was able to find after a ten-minute search:
go/src/net/net_fake.go
Line 173 in 0436b16
go/src/net/http/h2_bundle.go
Line 3688 in f58b02a
Note the first two, which live in the standard library itself. They're also interesting, because
net
's requires deadlines, andhttp
's supports any underlying pipe buffer implementation.The main point of the proposal is that there are dozens of well-used buffered pipe implementations out there. This is particularly worrying, because writing a correct one isn't easy. It's very likely that some of the implementations out there have subtle bugs, such as misusing
sync.Cond
or being racy when many reads and writes happen concurrently.I'm raising this proposal precisely because I had written my own buggy buffered pipe, which made
go test -race
find data races about 5% of the time. I "fixed" that by rewriting my tests to work around the lack of buffering, and usingio.Pipe
instead: mvdan/sh@5ffc4d9Granted, my code being incorrect doesn't mean everyone's code is too. But it does help illustrate the point; writing a buffered pipe seems easy, and the lack of an implementation in the standard library seems to encourage that people write their own, likely buggy implementation.
This has been discussed in golang-dev before, although no common implementation was found or proposed: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE
There's also a proposal to make
net.Pipe
asynchronous in Go2; #24205. That proposal is somewhat related to this one, sincenet.Pipe
currently usesio.Pipe
under the hood. This could however be changed in Go2.Proposed API
The simplest change would be to add something like:
Ideally
PipeReader
andPipeWriter
would have been interface types from the start. Within Go1, we can work around that by having thePipeWriter
struct simply embed the actual pipe implementation, be it synchronous or asynchronous.Another possible path in Go1 is to go the http2 route, allowing any underlying pipe buffer implementation. I personally think this would be a bit too niche for the
io
package, but perhaps @bradfitz has opinions on that. Aside from http2, all buffered pipe packages seem to build on the currentio.Pipe
semantics.If the
net
package needs read and write deadlines, they could always be added to the reader and writer returned by the constructors. This should be a backwards compatible change, and could be discussed as a separate proposal.Finally, if we were to consider this as a Go2 change, the cleaned up version of my first proposed change would be:
We could even go one step further and mirror
make(chan T, size)
, only havingfunc Pipe(size int)
wherePipe(0)
would return a synchronous/unbuffered pipe.The
net
package could either mirror these proposed Go2 changes, or break with the connection to theio
package entirely./cc @bcmills @dsnet @bradfitz @rogpeppe (and @neelance for
net_fake.go
, and @djs55 for vpnkit'sloopbackconn.go
)The text was updated successfully, but these errors were encountered: