Skip to content

Commit

Permalink
http2: add I/O timeouts
Browse files Browse the repository at this point in the history
Addresses hanging transport when on blocking I/O. There are many scenario where
the roundtrip hangs on write or read and won't be unlocked by current
cancelation systems (context, Request.Cancel, ...).

This adds read and write deadlines support.
The writer disables the read deadline and enables the write deadline, then after
the write is successful, it disables the write deadline and re-enables the read
deadline.
The read loop also sets its read deadline after a successful read since the next
frame is not predictable.
It guarantees that an I/O will not timeout before IOTimeout and will timeout
after a complete block before at least IOTimeout.

See issue: golang/go#23559
  • Loading branch information
gwik committed Feb 3, 2018
1 parent 2fb46b1 commit 29ac661
Show file tree
Hide file tree
Showing 2 changed files with 342 additions and 4 deletions.
68 changes: 65 additions & 3 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type Transport struct {
// to mean no limit.
MaxHeaderListSize uint32

// IOTimeout, if non-zero, enables timeouts on read and write to the
// connection.
IOTimeout time.Duration

// t1, if non-nil, is the standard library Transport using
// this transport. Its settings are used (but not its
// RoundTrip method, etc).
Expand Down Expand Up @@ -321,7 +325,9 @@ func (noCachedConnError) Error() string { return "http2: no cached c
// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
// may coexist in the same running program.
func isNoCachedConnError(err error) bool {
_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
_, ok := err.(interface {
IsHTTP2NoCachedConnError()
})
return ok
}

Expand Down Expand Up @@ -529,6 +535,38 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
return t.newClientConn(c, false)
}

type timeoutWriter struct {
c net.Conn
timeout time.Duration
}

// Write writes to the underlying connection and manages both read and write
// deadlines.
func (w timeoutWriter) Write(p []byte) (int, error) {
// The read deadline is disabled to allow so the reader doesn't timeout
// while there are no pending requests.
// The write deadline is set.
// The write occurs and the connection is closed on timeout interrupting the
// read with an error.
// If the write was successful it sets the deadline for the current read.
now := time.Now()
w.c.SetReadDeadline(time.Time{})
w.c.SetWriteDeadline(now.Add(w.timeout))
n, err := w.c.Write(p)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
// The write end of the connection is no longer in a known
// consistent state, unlock the read loop by closing the connection
// and force a cleanup.
w.c.Close()
return n, err
}
}
w.c.SetWriteDeadline(time.Time{})
w.c.SetReadDeadline(time.Now().Add(w.timeout))
return n, err
}

func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{
t: t,
Expand All @@ -555,9 +593,14 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(initialWindowSize))

var w io.Writer = cc.tconn
if d := cc.ioTimeout(); d > 0 {
w = timeoutWriter{c: cc.tconn, timeout: d}
}

// TODO: adjust this writer size to account for frame size +
// MTU + crypto/tls record padding.
cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr})
cc.bw = bufio.NewWriter(stickyErrWriter{w, &cc.werr})
cc.br = bufio.NewReader(c)
cc.fr = NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
Expand Down Expand Up @@ -733,6 +776,10 @@ func (cc *ClientConn) responseHeaderTimeout() time.Duration {
return 0
}

func (cc *ClientConn) ioTimeout() time.Duration {
return cc.t.IOTimeout
}

// checkConnHeaders checks whether req has any invalid connection-level headers.
// per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields.
// Certain headers are special-cased as okay but not transmitted later.
Expand Down Expand Up @@ -1473,12 +1520,23 @@ func (rl *clientConnReadLoop) run() error {
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
timeout := cc.ioTimeout()
for {
f, err := cc.fr.ReadFrame()
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(StreamError); ok {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
cc.mu.Lock()
idle := len(cc.streams) == 0
if idle {
// let the idle timer handle the timeout.
cc.tconn.SetReadDeadline(time.Time{})
continue
}
cc.mu.Unlock()
return err
} else if se, ok := err.(StreamError); ok {
if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
cs.cc.forgetStreamID(cs.ID)
Expand Down Expand Up @@ -1537,6 +1595,10 @@ func (rl *clientConnReadLoop) run() error {
if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
if timeout > 0 {
// Upon successful read, set timeout on next read.
cc.tconn.SetReadDeadline(time.Now().Add(timeout))
}
}
}

Expand Down
Loading

0 comments on commit 29ac661

Please sign in to comment.