diff --git a/api/go.mod b/api/go.mod index 77d29f4fa7f..0a721b1286c 100644 --- a/api/go.mod +++ b/api/go.mod @@ -20,7 +20,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/net v0.19.0 // indirect + golang.org/x/net v0.23.0 // indirect golang.org/x/text v0.14.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/api/go.sum b/api/go.sum index 6658a97cf54..2c22d3d6dd4 100644 --- a/api/go.sum +++ b/api/go.sum @@ -49,8 +49,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/api/vendor/golang.org/x/net/http2/frame.go b/api/vendor/golang.org/x/net/http2/frame.go index c1f6b90dc32..43557ab7e97 100644 --- a/api/vendor/golang.org/x/net/http2/frame.go +++ b/api/vendor/golang.org/x/net/http2/frame.go @@ -1510,13 +1510,12 @@ func (mh *MetaHeadersFrame) checkPseudos() error { } func (fr *Framer) maxHeaderStringLen() int { - v := fr.maxHeaderListSize() - if uint32(int(v)) == v { - return int(v) + v := int(fr.maxHeaderListSize()) + if v < 0 { + // If maxHeaderListSize overflows an int, use no limit (0). + return 0 } - // They had a crazy big number for MaxHeaderBytes anyway, - // so give them unlimited header lengths: - return 0 + return v } // readMetaFrame returns 0 or more CONTINUATION frames from fr and @@ -1565,6 +1564,7 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) { if size > remainSize { hdec.SetEmitEnabled(false) mh.Truncated = true + remainSize = 0 return } remainSize -= size @@ -1577,6 +1577,36 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) { var hc headersOrContinuation = hf for { frag := hc.HeaderBlockFragment() + + // Avoid parsing large amounts of headers that we will then discard. + // If the sender exceeds the max header list size by too much, + // skip parsing the fragment and close the connection. + // + // "Too much" is either any CONTINUATION frame after we've already + // exceeded the max header list size (in which case remainSize is 0), + // or a frame whose encoded size is more than twice the remaining + // header list bytes we're willing to accept. + if int64(len(frag)) > int64(2*remainSize) { + if VerboseLogs { + log.Printf("http2: header list too large") + } + // It would be nice to send a RST_STREAM before sending the GOAWAY, + // but the structure of the server's frame writer makes this difficult. + return nil, ConnectionError(ErrCodeProtocol) + } + + // Also close the connection after any CONTINUATION frame following an + // invalid header, since we stop tracking the size of the headers after + // an invalid one. + if invalid != nil { + if VerboseLogs { + log.Printf("http2: invalid header: %v", invalid) + } + // It would be nice to send a RST_STREAM before sending the GOAWAY, + // but the structure of the server's frame writer makes this difficult. + return nil, ConnectionError(ErrCodeProtocol) + } + if _, err := hdec.Write(frag); err != nil { return nil, ConnectionError(ErrCodeCompression) } diff --git a/api/vendor/golang.org/x/net/http2/pipe.go b/api/vendor/golang.org/x/net/http2/pipe.go index 684d984fd96..3b9f06b9624 100644 --- a/api/vendor/golang.org/x/net/http2/pipe.go +++ b/api/vendor/golang.org/x/net/http2/pipe.go @@ -77,7 +77,10 @@ func (p *pipe) Read(d []byte) (n int, err error) { } } -var errClosedPipeWrite = errors.New("write on closed buffer") +var ( + errClosedPipeWrite = errors.New("write on closed buffer") + errUninitializedPipeWrite = errors.New("write on uninitialized buffer") +) // Write copies bytes from p into the buffer and wakes a reader. // It is an error to write more data than the buffer can hold. @@ -91,6 +94,12 @@ func (p *pipe) Write(d []byte) (n int, err error) { if p.err != nil || p.breakErr != nil { return 0, errClosedPipeWrite } + // pipe.setBuffer is never invoked, leaving the buffer uninitialized. + // We shouldn't try to write to an uninitialized pipe, + // but returning an error is better than panicking. + if p.b == nil { + return 0, errUninitializedPipeWrite + } return p.b.Write(d) } diff --git a/api/vendor/golang.org/x/net/http2/server.go b/api/vendor/golang.org/x/net/http2/server.go index ae94c6408d5..ce2e8b40eee 100644 --- a/api/vendor/golang.org/x/net/http2/server.go +++ b/api/vendor/golang.org/x/net/http2/server.go @@ -124,6 +124,7 @@ type Server struct { // IdleTimeout specifies how long until idle clients should be // closed with a GOAWAY frame. PING frames are not considered // activity for the purposes of IdleTimeout. + // If zero or negative, there is no timeout. IdleTimeout time.Duration // MaxUploadBufferPerConnection is the size of the initial flow @@ -434,7 +435,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { // passes the connection off to us with the deadline already set. // Write deadlines are set per stream in serverConn.newStream. // Disarm the net.Conn write deadline here. - if sc.hs.WriteTimeout != 0 { + if sc.hs.WriteTimeout > 0 { sc.conn.SetWriteDeadline(time.Time{}) } @@ -924,7 +925,7 @@ func (sc *serverConn) serve() { sc.setConnState(http.StateActive) sc.setConnState(http.StateIdle) - if sc.srv.IdleTimeout != 0 { + if sc.srv.IdleTimeout > 0 { sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) defer sc.idleTimer.Stop() } @@ -1637,7 +1638,7 @@ func (sc *serverConn) closeStream(st *stream, err error) { delete(sc.streams, st.id) if len(sc.streams) == 0 { sc.setConnState(http.StateIdle) - if sc.srv.IdleTimeout != 0 { + if sc.srv.IdleTimeout > 0 { sc.idleTimer.Reset(sc.srv.IdleTimeout) } if h1ServerKeepAlivesDisabled(sc.hs) { @@ -2017,7 +2018,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { // similar to how the http1 server works. Here it's // technically more like the http1 Server's ReadHeaderTimeout // (in Go 1.8), though. That's a more sane option anyway. - if sc.hs.ReadTimeout != 0 { + if sc.hs.ReadTimeout > 0 { sc.conn.SetReadDeadline(time.Time{}) st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout) } @@ -2038,7 +2039,7 @@ func (sc *serverConn) upgradeRequest(req *http.Request) { // Disable any read deadline set by the net/http package // prior to the upgrade. - if sc.hs.ReadTimeout != 0 { + if sc.hs.ReadTimeout > 0 { sc.conn.SetReadDeadline(time.Time{}) } @@ -2116,7 +2117,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream st.flow.conn = &sc.flow // link to conn-level counter st.flow.add(sc.initialStreamSendWindowSize) st.inflow.init(sc.srv.initialStreamRecvWindowSize()) - if sc.hs.WriteTimeout != 0 { + if sc.hs.WriteTimeout > 0 { st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) } diff --git a/api/vendor/golang.org/x/net/http2/testsync.go b/api/vendor/golang.org/x/net/http2/testsync.go new file mode 100644 index 00000000000..61075bd16d3 --- /dev/null +++ b/api/vendor/golang.org/x/net/http2/testsync.go @@ -0,0 +1,331 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package http2 + +import ( + "context" + "sync" + "time" +) + +// testSyncHooks coordinates goroutines in tests. +// +// For example, a call to ClientConn.RoundTrip involves several goroutines, including: +// - the goroutine running RoundTrip; +// - the clientStream.doRequest goroutine, which writes the request; and +// - the clientStream.readLoop goroutine, which reads the response. +// +// Using testSyncHooks, a test can start a RoundTrip and identify when all these goroutines +// are blocked waiting for some condition such as reading the Request.Body or waiting for +// flow control to become available. +// +// The testSyncHooks also manage timers and synthetic time in tests. +// This permits us to, for example, start a request and cause it to time out waiting for +// response headers without resorting to time.Sleep calls. +type testSyncHooks struct { + // active/inactive act as a mutex and condition variable. + // + // - neither chan contains a value: testSyncHooks is locked. + // - active contains a value: unlocked, and at least one goroutine is not blocked + // - inactive contains a value: unlocked, and all goroutines are blocked + active chan struct{} + inactive chan struct{} + + // goroutine counts + total int // total goroutines + condwait map[*sync.Cond]int // blocked in sync.Cond.Wait + blocked []*testBlockedGoroutine // otherwise blocked + + // fake time + now time.Time + timers []*fakeTimer + + // Transport testing: Report various events. + newclientconn func(*ClientConn) + newstream func(*clientStream) +} + +// testBlockedGoroutine is a blocked goroutine. +type testBlockedGoroutine struct { + f func() bool // blocked until f returns true + ch chan struct{} // closed when unblocked +} + +func newTestSyncHooks() *testSyncHooks { + h := &testSyncHooks{ + active: make(chan struct{}, 1), + inactive: make(chan struct{}, 1), + condwait: map[*sync.Cond]int{}, + } + h.inactive <- struct{}{} + h.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + return h +} + +// lock acquires the testSyncHooks mutex. +func (h *testSyncHooks) lock() { + select { + case <-h.active: + case <-h.inactive: + } +} + +// waitInactive waits for all goroutines to become inactive. +func (h *testSyncHooks) waitInactive() { + for { + <-h.inactive + if !h.unlock() { + break + } + } +} + +// unlock releases the testSyncHooks mutex. +// It reports whether any goroutines are active. +func (h *testSyncHooks) unlock() (active bool) { + // Look for a blocked goroutine which can be unblocked. + blocked := h.blocked[:0] + unblocked := false + for _, b := range h.blocked { + if !unblocked && b.f() { + unblocked = true + close(b.ch) + } else { + blocked = append(blocked, b) + } + } + h.blocked = blocked + + // Count goroutines blocked on condition variables. + condwait := 0 + for _, count := range h.condwait { + condwait += count + } + + if h.total > condwait+len(blocked) { + h.active <- struct{}{} + return true + } else { + h.inactive <- struct{}{} + return false + } +} + +// goRun starts a new goroutine. +func (h *testSyncHooks) goRun(f func()) { + h.lock() + h.total++ + h.unlock() + go func() { + defer func() { + h.lock() + h.total-- + h.unlock() + }() + f() + }() +} + +// blockUntil indicates that a goroutine is blocked waiting for some condition to become true. +// It waits until f returns true before proceeding. +// +// Example usage: +// +// h.blockUntil(func() bool { +// // Is the context done yet? +// select { +// case <-ctx.Done(): +// default: +// return false +// } +// return true +// }) +// // Wait for the context to become done. +// <-ctx.Done() +// +// The function f passed to blockUntil must be non-blocking and idempotent. +func (h *testSyncHooks) blockUntil(f func() bool) { + if f() { + return + } + ch := make(chan struct{}) + h.lock() + h.blocked = append(h.blocked, &testBlockedGoroutine{ + f: f, + ch: ch, + }) + h.unlock() + <-ch +} + +// broadcast is sync.Cond.Broadcast. +func (h *testSyncHooks) condBroadcast(cond *sync.Cond) { + h.lock() + delete(h.condwait, cond) + h.unlock() + cond.Broadcast() +} + +// broadcast is sync.Cond.Wait. +func (h *testSyncHooks) condWait(cond *sync.Cond) { + h.lock() + h.condwait[cond]++ + h.unlock() +} + +// newTimer creates a new fake timer. +func (h *testSyncHooks) newTimer(d time.Duration) timer { + h.lock() + defer h.unlock() + t := &fakeTimer{ + hooks: h, + when: h.now.Add(d), + c: make(chan time.Time), + } + h.timers = append(h.timers, t) + return t +} + +// afterFunc creates a new fake AfterFunc timer. +func (h *testSyncHooks) afterFunc(d time.Duration, f func()) timer { + h.lock() + defer h.unlock() + t := &fakeTimer{ + hooks: h, + when: h.now.Add(d), + f: f, + } + h.timers = append(h.timers, t) + return t +} + +func (h *testSyncHooks) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(ctx) + t := h.afterFunc(d, cancel) + return ctx, func() { + t.Stop() + cancel() + } +} + +func (h *testSyncHooks) timeUntilEvent() time.Duration { + h.lock() + defer h.unlock() + var next time.Time + for _, t := range h.timers { + if next.IsZero() || t.when.Before(next) { + next = t.when + } + } + if d := next.Sub(h.now); d > 0 { + return d + } + return 0 +} + +// advance advances time and causes synthetic timers to fire. +func (h *testSyncHooks) advance(d time.Duration) { + h.lock() + defer h.unlock() + h.now = h.now.Add(d) + timers := h.timers[:0] + for _, t := range h.timers { + t := t // remove after go.mod depends on go1.22 + t.mu.Lock() + switch { + case t.when.After(h.now): + timers = append(timers, t) + case t.when.IsZero(): + // stopped timer + default: + t.when = time.Time{} + if t.c != nil { + close(t.c) + } + if t.f != nil { + h.total++ + go func() { + defer func() { + h.lock() + h.total-- + h.unlock() + }() + t.f() + }() + } + } + t.mu.Unlock() + } + h.timers = timers +} + +// A timer wraps a time.Timer, or a synthetic equivalent in tests. +// Unlike time.Timer, timer is single-use: The timer channel is closed when the timer expires. +type timer interface { + C() <-chan time.Time + Stop() bool + Reset(d time.Duration) bool +} + +// timeTimer implements timer using real time. +type timeTimer struct { + t *time.Timer + c chan time.Time +} + +// newTimeTimer creates a new timer using real time. +func newTimeTimer(d time.Duration) timer { + ch := make(chan time.Time) + t := time.AfterFunc(d, func() { + close(ch) + }) + return &timeTimer{t, ch} +} + +// newTimeAfterFunc creates an AfterFunc timer using real time. +func newTimeAfterFunc(d time.Duration, f func()) timer { + return &timeTimer{ + t: time.AfterFunc(d, f), + } +} + +func (t timeTimer) C() <-chan time.Time { return t.c } +func (t timeTimer) Stop() bool { return t.t.Stop() } +func (t timeTimer) Reset(d time.Duration) bool { return t.t.Reset(d) } + +// fakeTimer implements timer using fake time. +type fakeTimer struct { + hooks *testSyncHooks + + mu sync.Mutex + when time.Time // when the timer will fire + c chan time.Time // closed when the timer fires; mutually exclusive with f + f func() // called when the timer fires; mutually exclusive with c +} + +func (t *fakeTimer) C() <-chan time.Time { return t.c } + +func (t *fakeTimer) Stop() bool { + t.mu.Lock() + defer t.mu.Unlock() + stopped := t.when.IsZero() + t.when = time.Time{} + return stopped +} + +func (t *fakeTimer) Reset(d time.Duration) bool { + if t.c != nil || t.f == nil { + panic("fakeTimer only supports Reset on AfterFunc timers") + } + t.mu.Lock() + defer t.mu.Unlock() + t.hooks.lock() + defer t.hooks.unlock() + active := !t.when.IsZero() + t.when = t.hooks.now.Add(d) + if !active { + t.hooks.timers = append(t.hooks.timers, t) + } + return active +} diff --git a/api/vendor/golang.org/x/net/http2/transport.go b/api/vendor/golang.org/x/net/http2/transport.go index df578b86c65..ce375c8c753 100644 --- a/api/vendor/golang.org/x/net/http2/transport.go +++ b/api/vendor/golang.org/x/net/http2/transport.go @@ -147,6 +147,12 @@ type Transport struct { // waiting for their turn. StrictMaxConcurrentStreams bool + // IdleConnTimeout is the maximum amount of time an idle + // (keep-alive) connection will remain idle before closing + // itself. + // Zero means no limit. + IdleConnTimeout time.Duration + // ReadIdleTimeout is the timeout after which a health check using ping // frame will be carried out if no frame is received on the connection. // Note that a ping response will is considered a received frame, so if @@ -178,6 +184,8 @@ type Transport struct { connPoolOnce sync.Once connPoolOrDef ClientConnPool // non-nil version of ConnPool + + syncHooks *testSyncHooks } func (t *Transport) maxHeaderListSize() uint32 { @@ -302,7 +310,7 @@ type ClientConn struct { readerErr error // set before readerDone is closed idleTimeout time.Duration // or 0 for never - idleTimer *time.Timer + idleTimer timer mu sync.Mutex // guards following cond *sync.Cond // hold mu; broadcast on flow/closed changes @@ -344,6 +352,60 @@ type ClientConn struct { werr error // first write error that has occurred hbuf bytes.Buffer // HPACK encoder writes into this henc *hpack.Encoder + + syncHooks *testSyncHooks // can be nil +} + +// Hook points used for testing. +// Outside of tests, cc.syncHooks is nil and these all have minimal implementations. +// Inside tests, see the testSyncHooks function docs. + +// goRun starts a new goroutine. +func (cc *ClientConn) goRun(f func()) { + if cc.syncHooks != nil { + cc.syncHooks.goRun(f) + return + } + go f() +} + +// condBroadcast is cc.cond.Broadcast. +func (cc *ClientConn) condBroadcast() { + if cc.syncHooks != nil { + cc.syncHooks.condBroadcast(cc.cond) + } + cc.cond.Broadcast() +} + +// condWait is cc.cond.Wait. +func (cc *ClientConn) condWait() { + if cc.syncHooks != nil { + cc.syncHooks.condWait(cc.cond) + } + cc.cond.Wait() +} + +// newTimer creates a new time.Timer, or a synthetic timer in tests. +func (cc *ClientConn) newTimer(d time.Duration) timer { + if cc.syncHooks != nil { + return cc.syncHooks.newTimer(d) + } + return newTimeTimer(d) +} + +// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests. +func (cc *ClientConn) afterFunc(d time.Duration, f func()) timer { + if cc.syncHooks != nil { + return cc.syncHooks.afterFunc(d, f) + } + return newTimeAfterFunc(d, f) +} + +func (cc *ClientConn) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { + if cc.syncHooks != nil { + return cc.syncHooks.contextWithTimeout(ctx, d) + } + return context.WithTimeout(ctx, d) } // clientStream is the state for a single HTTP/2 stream. One of these @@ -425,7 +487,7 @@ func (cs *clientStream) abortStreamLocked(err error) { // TODO(dneil): Clean up tests where cs.cc.cond is nil. if cs.cc.cond != nil { // Wake up writeRequestBody if it is waiting on flow control. - cs.cc.cond.Broadcast() + cs.cc.condBroadcast() } } @@ -435,7 +497,7 @@ func (cs *clientStream) abortRequestBodyWrite() { defer cc.mu.Unlock() if cs.reqBody != nil && cs.reqBodyClosed == nil { cs.closeReqBodyLocked() - cc.cond.Broadcast() + cc.condBroadcast() } } @@ -445,10 +507,10 @@ func (cs *clientStream) closeReqBodyLocked() { } cs.reqBodyClosed = make(chan struct{}) reqBodyClosed := cs.reqBodyClosed - go func() { + cs.cc.goRun(func() { cs.reqBody.Close() close(reqBodyClosed) - }() + }) } type stickyErrWriter struct { @@ -537,15 +599,6 @@ func authorityAddr(scheme string, authority string) (addr string) { return net.JoinHostPort(host, port) } -var retryBackoffHook func(time.Duration) *time.Timer - -func backoffNewTimer(d time.Duration) *time.Timer { - if retryBackoffHook != nil { - return retryBackoffHook(d) - } - return time.NewTimer(d) -} - // RoundTripOpt is like RoundTrip, but takes options. func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) { if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { @@ -573,13 +626,27 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res backoff := float64(uint(1) << (uint(retry) - 1)) backoff += backoff * (0.1 * mathrand.Float64()) d := time.Second * time.Duration(backoff) - timer := backoffNewTimer(d) + var tm timer + if t.syncHooks != nil { + tm = t.syncHooks.newTimer(d) + t.syncHooks.blockUntil(func() bool { + select { + case <-tm.C(): + case <-req.Context().Done(): + default: + return false + } + return true + }) + } else { + tm = newTimeTimer(d) + } select { - case <-timer.C: + case <-tm.C(): t.vlogf("RoundTrip retrying after failure: %v", roundTripErr) continue case <-req.Context().Done(): - timer.Stop() + tm.Stop() err = req.Context().Err() } } @@ -658,6 +725,9 @@ func canRetryError(err error) bool { } func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) { + if t.syncHooks != nil { + return t.newClientConn(nil, singleUse, t.syncHooks) + } host, _, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -666,7 +736,7 @@ func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse b if err != nil { return nil, err } - return t.newClientConn(tconn, singleUse) + return t.newClientConn(tconn, singleUse, nil) } func (t *Transport) newTLSConfig(host string) *tls.Config { @@ -732,10 +802,10 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 { } func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { - return t.newClientConn(c, t.disableKeepAlives()) + return t.newClientConn(c, t.disableKeepAlives(), nil) } -func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { +func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHooks) (*ClientConn, error) { cc := &ClientConn{ t: t, tconn: c, @@ -750,10 +820,15 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), reqHeaderMu: make(chan struct{}, 1), + syncHooks: hooks, + } + if hooks != nil { + hooks.newclientconn(cc) + c = cc.tconn } if d := t.idleConnTimeout(); d != 0 { cc.idleTimeout = d - cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) + cc.idleTimer = cc.afterFunc(d, cc.onIdleTimeout) } if VerboseLogs { t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) @@ -818,7 +893,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro return nil, cc.werr } - go cc.readLoop() + cc.goRun(cc.readLoop) return cc, nil } @@ -826,7 +901,7 @@ func (cc *ClientConn) healthCheck() { pingTimeout := cc.t.pingTimeout() // We don't need to periodically ping in the health check, because the readLoop of ClientConn will // trigger the healthCheck again if there is no frame received. - ctx, cancel := context.WithTimeout(context.Background(), pingTimeout) + ctx, cancel := cc.contextWithTimeout(context.Background(), pingTimeout) defer cancel() cc.vlogf("http2: Transport sending health check") err := cc.Ping(ctx) @@ -1056,7 +1131,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error { // Wait for all in-flight streams to complete or connection to close done := make(chan struct{}) cancelled := false // guarded by cc.mu - go func() { + cc.goRun(func() { cc.mu.Lock() defer cc.mu.Unlock() for { @@ -1068,9 +1143,9 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error { if cancelled { break } - cc.cond.Wait() + cc.condWait() } - }() + }) shutdownEnterWaitStateHook() select { case <-done: @@ -1080,7 +1155,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error { cc.mu.Lock() // Free the goroutine above cancelled = true - cc.cond.Broadcast() + cc.condBroadcast() cc.mu.Unlock() return ctx.Err() } @@ -1118,7 +1193,7 @@ func (cc *ClientConn) closeForError(err error) { for _, cs := range cc.streams { cs.abortStreamLocked(err) } - cc.cond.Broadcast() + cc.condBroadcast() cc.mu.Unlock() cc.closeConn() } @@ -1215,6 +1290,10 @@ func (cc *ClientConn) decrStreamReservationsLocked() { } func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { + return cc.roundTrip(req, nil) +} + +func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) { ctx := req.Context() cs := &clientStream{ cc: cc, @@ -1229,9 +1308,23 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { respHeaderRecv: make(chan struct{}), donec: make(chan struct{}), } - go cs.doRequest(req) + cc.goRun(func() { + cs.doRequest(req) + }) waitDone := func() error { + if cc.syncHooks != nil { + cc.syncHooks.blockUntil(func() bool { + select { + case <-cs.donec: + case <-ctx.Done(): + case <-cs.reqCancel: + default: + return false + } + return true + }) + } select { case <-cs.donec: return nil @@ -1292,7 +1385,24 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { return err } + if streamf != nil { + streamf(cs) + } + for { + if cc.syncHooks != nil { + cc.syncHooks.blockUntil(func() bool { + select { + case <-cs.respHeaderRecv: + case <-cs.abort: + case <-ctx.Done(): + case <-cs.reqCancel: + default: + return false + } + return true + }) + } select { case <-cs.respHeaderRecv: return handleResponseHeaders() @@ -1348,6 +1458,21 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { if cc.reqHeaderMu == nil { panic("RoundTrip on uninitialized ClientConn") // for tests } + var newStreamHook func(*clientStream) + if cc.syncHooks != nil { + newStreamHook = cc.syncHooks.newstream + cc.syncHooks.blockUntil(func() bool { + select { + case cc.reqHeaderMu <- struct{}{}: + <-cc.reqHeaderMu + case <-cs.reqCancel: + case <-ctx.Done(): + default: + return false + } + return true + }) + } select { case cc.reqHeaderMu <- struct{}{}: case <-cs.reqCancel: @@ -1372,6 +1497,10 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { } cc.mu.Unlock() + if newStreamHook != nil { + newStreamHook(cs) + } + // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? if !cc.t.disableCompression() && req.Header.Get("Accept-Encoding") == "" && @@ -1452,15 +1581,30 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { var respHeaderTimer <-chan time.Time var respHeaderRecv chan struct{} if d := cc.responseHeaderTimeout(); d != 0 { - timer := time.NewTimer(d) + timer := cc.newTimer(d) defer timer.Stop() - respHeaderTimer = timer.C + respHeaderTimer = timer.C() respHeaderRecv = cs.respHeaderRecv } // Wait until the peer half-closes its end of the stream, // or until the request is aborted (via context, error, or otherwise), // whichever comes first. for { + if cc.syncHooks != nil { + cc.syncHooks.blockUntil(func() bool { + select { + case <-cs.peerClosed: + case <-respHeaderTimer: + case <-respHeaderRecv: + case <-cs.abort: + case <-ctx.Done(): + case <-cs.reqCancel: + default: + return false + } + return true + }) + } select { case <-cs.peerClosed: return nil @@ -1609,7 +1753,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error { return nil } cc.pendingRequests++ - cc.cond.Wait() + cc.condWait() cc.pendingRequests-- select { case <-cs.abort: @@ -1871,8 +2015,24 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) cs.flow.take(take) return take, nil } - cc.cond.Wait() + cc.condWait() + } +} + +func validateHeaders(hdrs http.Header) string { + for k, vv := range hdrs { + if !httpguts.ValidHeaderFieldName(k) { + return fmt.Sprintf("name %q", k) + } + for _, v := range vv { + if !httpguts.ValidHeaderFieldValue(v) { + // Don't include the value in the error, + // because it may be sensitive. + return fmt.Sprintf("value for header %q", k) + } + } } + return "" } var errNilRequestURL = errors.New("http2: Request.URI is nil") @@ -1912,19 +2072,14 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail } } - // Check for any invalid headers and return an error before we + // Check for any invalid headers+trailers and return an error before we // potentially pollute our hpack state. (We want to be able to // continue to reuse the hpack encoder for future requests) - for k, vv := range req.Header { - if !httpguts.ValidHeaderFieldName(k) { - return nil, fmt.Errorf("invalid HTTP header name %q", k) - } - for _, v := range vv { - if !httpguts.ValidHeaderFieldValue(v) { - // Don't include the value in the error, because it may be sensitive. - return nil, fmt.Errorf("invalid HTTP header value for header %q", k) - } - } + if err := validateHeaders(req.Header); err != "" { + return nil, fmt.Errorf("invalid HTTP header %s", err) + } + if err := validateHeaders(req.Trailer); err != "" { + return nil, fmt.Errorf("invalid HTTP trailer %s", err) } enumerateHeaders := func(f func(name, value string)) { @@ -2143,7 +2298,7 @@ func (cc *ClientConn) forgetStreamID(id uint32) { } // Wake up writeRequestBody via clientStream.awaitFlowControl and // wake up RoundTrip if there is a pending request. - cc.cond.Broadcast() + cc.condBroadcast() closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 { @@ -2231,7 +2386,7 @@ func (rl *clientConnReadLoop) cleanup() { cs.abortStreamLocked(err) } } - cc.cond.Broadcast() + cc.condBroadcast() cc.mu.Unlock() } @@ -2266,10 +2421,9 @@ func (rl *clientConnReadLoop) run() error { cc := rl.cc gotSettings := false readIdleTimeout := cc.t.ReadIdleTimeout - var t *time.Timer + var t timer if readIdleTimeout != 0 { - t = time.AfterFunc(readIdleTimeout, cc.healthCheck) - defer t.Stop() + t = cc.afterFunc(readIdleTimeout, cc.healthCheck) } for { f, err := cc.fr.ReadFrame() @@ -2684,7 +2838,7 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error { }) return nil } - if !cs.firstByte { + if !cs.pastHeaders { cc.logf("protocol error: received DATA before a HEADERS frame") rl.endStreamError(cs, StreamError{ StreamID: f.StreamID, @@ -2867,7 +3021,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error { for _, cs := range cc.streams { cs.flow.add(delta) } - cc.cond.Broadcast() + cc.condBroadcast() cc.initialWindowSize = s.Val case SettingHeaderTableSize: @@ -2911,9 +3065,18 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { fl = &cs.flow } if !fl.add(int32(f.Increment)) { + // For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR + if cs != nil { + rl.endStreamError(cs, StreamError{ + StreamID: f.StreamID, + Code: ErrCodeFlowControl, + }) + return nil + } + return ConnectionError(ErrCodeFlowControl) } - cc.cond.Broadcast() + cc.condBroadcast() return nil } @@ -2955,24 +3118,38 @@ func (cc *ClientConn) Ping(ctx context.Context) error { } cc.mu.Unlock() } - errc := make(chan error, 1) - go func() { + var pingError error + errc := make(chan struct{}) + cc.goRun(func() { cc.wmu.Lock() defer cc.wmu.Unlock() - if err := cc.fr.WritePing(false, p); err != nil { - errc <- err + if pingError = cc.fr.WritePing(false, p); pingError != nil { + close(errc) return } - if err := cc.bw.Flush(); err != nil { - errc <- err + if pingError = cc.bw.Flush(); pingError != nil { + close(errc) return } - }() + }) + if cc.syncHooks != nil { + cc.syncHooks.blockUntil(func() bool { + select { + case <-c: + case <-errc: + case <-ctx.Done(): + case <-cc.readerDone: + default: + return false + } + return true + }) + } select { case <-c: return nil - case err := <-errc: - return err + case <-errc: + return pingError case <-ctx.Done(): return ctx.Err() case <-cc.readerDone: @@ -3141,9 +3318,17 @@ func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, err } func (t *Transport) idleConnTimeout() time.Duration { + // to keep things backwards compatible, we use non-zero values of + // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying + // http1 transport, followed by 0 + if t.IdleConnTimeout != 0 { + return t.IdleConnTimeout + } + if t.t1 != nil { return t.t1.IdleConnTimeout } + return 0 } diff --git a/api/vendor/modules.txt b/api/vendor/modules.txt index c48f7837ab4..8ff83bfdf16 100644 --- a/api/vendor/modules.txt +++ b/api/vendor/modules.txt @@ -35,7 +35,7 @@ github.com/pmezard/go-difflib/difflib ## explicit; go 1.17 github.com/stretchr/testify/assert github.com/stretchr/testify/require -# golang.org/x/net v0.19.0 +# golang.org/x/net v0.23.0 ## explicit; go 1.18 golang.org/x/net/http/httpguts golang.org/x/net/http2 diff --git a/build.env b/build.env index 751463687e8..65d945682e6 100644 --- a/build.env +++ b/build.env @@ -30,7 +30,7 @@ GOLANGCI_VERSION=v1.57.2 # external snapshotter version # Refer: https://github.com/kubernetes-csi/external-snapshotter/releases -SNAPSHOT_VERSION=v7.0.1 +SNAPSHOT_VERSION=v7.0.2 # "go test" configuration # set to stdout or html to enable coverage reporting, disabled by default @@ -54,11 +54,11 @@ ROOK_VERSION=v1.12.1 ROOK_CEPH_CLUSTER_IMAGE=quay.io/ceph/ceph:v18 # CSI sidecar version -CSI_ATTACHER_VERSION=v4.5.0 -CSI_SNAPSHOTTER_VERSION=v7.0.0 -CSI_RESIZER_VERSION=v1.10.0 -CSI_PROVISIONER_VERSION=v4.0.0 -CSI_NODE_DRIVER_REGISTRAR_VERSION=v2.10.0 +CSI_ATTACHER_VERSION=v4.5.1 +CSI_SNAPSHOTTER_VERSION=v7.0.2 +CSI_RESIZER_VERSION=v1.10.1 +CSI_PROVISIONER_VERSION=v4.0.1 +CSI_NODE_DRIVER_REGISTRAR_VERSION=v2.10.1 # e2e settings # - enable CEPH_CSI_RUN_ALL_TESTS when running tests with if it has root diff --git a/charts/ceph-csi-cephfs/README.md b/charts/ceph-csi-cephfs/README.md index 5ee44bde0fb..21a04bca765 100644 --- a/charts/ceph-csi-cephfs/README.md +++ b/charts/ceph-csi-cephfs/README.md @@ -124,7 +124,7 @@ charts and their default values. | `nodeplugin.imagePullSecrets` | Specifies imagePullSecrets for containers | `[]` | | `nodeplugin.profiling.enabled` | Specifies whether profiling should be enabled | `false` | | `nodeplugin.registrar.image.repository` | Node-Registrar image repository URL | `registry.k8s.io/sig-storage/csi-node-driver-registrar` | -| `nodeplugin.registrar.image.tag` | Image tag | `v2.10.0` | +| `nodeplugin.registrar.image.tag` | Image tag | `v2.10.1` | | `nodeplugin.registrar.image.pullPolicy` | Image pull policy | `IfNotPresent` | | `nodeplugin.plugin.image.repository` | Nodeplugin image repository URL | `quay.io/cephcsi/cephcsi` | | `nodeplugin.plugin.image.tag` | Image tag | `canary` | @@ -145,17 +145,17 @@ charts and their default values. | `provisioner.imagePullSecrets` | Specifies imagePullSecrets for containers | `[]` | | `provisioner.profiling.enabled` | Specifies whether profiling should be enabled | `false` | | `provisioner.provisioner.image.repository` | Specifies the csi-provisioner image repository URL | `registry.k8s.io/sig-storage/csi-provisioner` | -| `provisioner.provisioner.image.tag` | Specifies image tag | `v4.0.0` | +| `provisioner.provisioner.image.tag` | Specifies image tag | `v4.0.1` | | `provisioner.provisioner.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.provisioner.image.extraArgs` | Specifies extra arguments for the provisioner sidecar | `[]` | | `provisioner.resizer.image.repository` | Specifies the csi-resizer image repository URL | `registry.k8s.io/sig-storage/csi-resizer` | -| `provisioner.resizer.image.tag` | Specifies image tag | `v1.10.0` | +| `provisioner.resizer.image.tag` | Specifies image tag | `v1.10.1` | | `provisioner.resizer.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.resizer.image.extraArgs` | Specifies extra arguments for the resizer sidecar | `[]` | | `provisioner.resizer.name` | Specifies the name of csi-resizer sidecar | `resizer` | | `provisioner.resizer.enabled` | Specifies whether resizer sidecar is enabled | `true` | | `provisioner.snapshotter.image.repository` | Specifies the csi-snapshotter image repository URL | `registry.k8s.io/sig-storage/csi-snapshotter` | -| `provisioner.snapshotter.image.tag` | Specifies image tag | `v7.0.0` | +| `provisioner.snapshotter.image.tag` | Specifies image tag | `v7.0.2` | | `provisioner.snapshotter.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.snapshotter.image.extraArgs` | Specifies extra arguments for the snapshotter sidecar | `[]` | | `provisioner.snapshotter.args.enableVolumeGroupSnapshots` | enables the creation of volume group snapshots | `false` | diff --git a/charts/ceph-csi-cephfs/values.yaml b/charts/ceph-csi-cephfs/values.yaml index 36d350b7ce1..b13b505248d 100644 --- a/charts/ceph-csi-cephfs/values.yaml +++ b/charts/ceph-csi-cephfs/values.yaml @@ -110,7 +110,7 @@ nodeplugin: registrar: image: repository: registry.k8s.io/sig-storage/csi-node-driver-registrar - tag: v2.10.0 + tag: v2.10.1 pullPolicy: IfNotPresent resources: {} @@ -202,7 +202,7 @@ provisioner: provisioner: image: repository: registry.k8s.io/sig-storage/csi-provisioner - tag: v4.0.0 + tag: v4.0.1 pullPolicy: IfNotPresent resources: {} ## For further options, check @@ -217,7 +217,7 @@ provisioner: enabled: true image: repository: registry.k8s.io/sig-storage/csi-resizer - tag: v1.10.0 + tag: v1.10.1 pullPolicy: IfNotPresent resources: {} ## For further options, check @@ -227,7 +227,7 @@ provisioner: snapshotter: image: repository: registry.k8s.io/sig-storage/csi-snapshotter - tag: v7.0.0 + tag: v7.0.2 pullPolicy: IfNotPresent resources: {} ## For further options, check diff --git a/charts/ceph-csi-rbd/README.md b/charts/ceph-csi-rbd/README.md index def55d6f4f4..a5850f04612 100644 --- a/charts/ceph-csi-rbd/README.md +++ b/charts/ceph-csi-rbd/README.md @@ -126,7 +126,7 @@ charts and their default values. | `nodeplugin.imagePullSecrets` | Specifies imagePullSecrets for containers | `[]` | | `nodeplugin.profiling.enabled` | Specifies whether profiling should be enabled | `false` | | `nodeplugin.registrar.image.repository` | Node Registrar image repository URL | `registry.k8s.io/sig-storage/csi-node-driver-registrar` | -| `nodeplugin.registrar.image.tag` | Image tag | `v2.10.0` | +| `nodeplugin.registrar.image.tag` | Image tag | `v2.10.1` | | `nodeplugin.registrar.image.pullPolicy` | Image pull policy | `IfNotPresent` | | `nodeplugin.plugin.image.repository` | Nodeplugin image repository URL | `quay.io/cephcsi/cephcsi` | | `nodeplugin.plugin.image.tag` | Image tag | `canary` | @@ -151,7 +151,7 @@ charts and their default values. | `provisioner.imagePullSecrets` | Specifies imagePullSecrets for containers | `[]` | | `provisioner.profiling.enabled` | Specifies whether profiling should be enabled | `false` | | `provisioner.provisioner.image.repository` | Specifies the csi-provisioner image repository URL | `registry.k8s.io/sig-storage/csi-provisioner` | -| `provisioner.provisioner.image.tag` | Specifies image tag | `v4.0.0` | +| `provisioner.provisioner.image.tag` | Specifies image tag | `v4.0.1` | | `provisioner.provisioner.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.provisioner.image.extraArgs` | Specifies extra arguments for the provisioner sidecar | `[]` | | `provisioner.snapshotter.args.enableVolumeGroupSnapshots` | enables the creation of volume group snapshots | `false` | @@ -162,13 +162,13 @@ charts and their default values. | `provisioner.attacher.name` | Specifies the name of csi-attacher sidecar | `attacher` | | `provisioner.attacher.enabled` | Specifies whether attacher sidecar is enabled | `true` | | `provisioner.resizer.image.repository` | Specifies the csi-resizer image repository URL | `registry.k8s.io/sig-storage/csi-resizer` | -| `provisioner.resizer.image.tag` | Specifies image tag | `v1.10.0` | +| `provisioner.resizer.image.tag` | Specifies image tag | `v1.10.1` | | `provisioner.resizer.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.resizer.image.extraArgs` | Specifies extra arguments for the resizer sidecar | `[]` | | `provisioner.resizer.name` | Specifies the name of csi-resizer sidecar | `resizer` | | `provisioner.resizer.enabled` | Specifies whether resizer sidecar is enabled | `true` | | `provisioner.snapshotter.image.repository` | Specifies the csi-snapshotter image repository URL | `registry.k8s.io/sig-storage/csi-snapshotter` | -| `provisioner.snapshotter.image.tag` | Specifies image tag | `v7.0.0` | +| `provisioner.snapshotter.image.tag` | Specifies image tag | `v7.0.2` | | `provisioner.snapshotter.image.pullPolicy` | Specifies pull policy | `IfNotPresent` | | `provisioner.snapshotter.image.extraArgs` | Specifies extra arguments for the snapshotter sidecar | `[]` | | `provisioner.nodeSelector` | Specifies the node selector for provisioner deployment | `{}` | diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index a3821ff7bb1..5af637132b8 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -139,7 +139,7 @@ nodeplugin: registrar: image: repository: registry.k8s.io/sig-storage/csi-node-driver-registrar - tag: v2.10.0 + tag: v2.10.1 pullPolicy: IfNotPresent resources: {} @@ -241,7 +241,7 @@ provisioner: provisioner: image: repository: registry.k8s.io/sig-storage/csi-provisioner - tag: v4.0.0 + tag: v4.0.1 pullPolicy: IfNotPresent resources: {} ## For further options, check @@ -256,7 +256,7 @@ provisioner: enabled: true image: repository: registry.k8s.io/sig-storage/csi-attacher - tag: v4.5.0 + tag: v4.5.1 pullPolicy: IfNotPresent resources: {} ## For further options, check @@ -268,7 +268,7 @@ provisioner: enabled: true image: repository: registry.k8s.io/sig-storage/csi-resizer - tag: v1.10.0 + tag: v1.10.1 pullPolicy: IfNotPresent resources: {} ## For further options, check @@ -278,7 +278,7 @@ provisioner: snapshotter: image: repository: registry.k8s.io/sig-storage/csi-snapshotter - tag: v7.0.0 + tag: v7.0.2 pullPolicy: IfNotPresent resources: {} ## For further options, check diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index bd27e309d77..911e6f89b29 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -92,7 +92,7 @@ spec: - name: ceph-csi-encryption-kms-config mountPath: /etc/ceph-csi-encryption-kms-config/ - name: csi-provisioner - image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.0 + image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.1 args: - "--csi-address=$(ADDRESS)" - "--v=1" @@ -111,7 +111,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-resizer - image: registry.k8s.io/sig-storage/csi-resizer:v1.10.0 + image: registry.k8s.io/sig-storage/csi-resizer:v1.10.1 args: - "--csi-address=$(ADDRESS)" - "--v=1" @@ -128,7 +128,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-snapshotter - image: registry.k8s.io/sig-storage/csi-snapshotter:v7.0.0 + image: registry.k8s.io/sig-storage/csi-snapshotter:v7.0.2 args: - "--csi-address=$(ADDRESS)" - "--v=1" diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index 16ea3dcb7ac..36382407950 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -106,7 +106,7 @@ spec: securityContext: privileged: true allowPrivilegeEscalation: true - image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.0 + image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.1 args: - "--v=1" - "--csi-address=/csi/csi.sock" diff --git a/deploy/nfs/kubernetes/csi-nfsplugin-provisioner.yaml b/deploy/nfs/kubernetes/csi-nfsplugin-provisioner.yaml index a861e0d2d1f..47c974d1f15 100644 --- a/deploy/nfs/kubernetes/csi-nfsplugin-provisioner.yaml +++ b/deploy/nfs/kubernetes/csi-nfsplugin-provisioner.yaml @@ -73,7 +73,7 @@ spec: - name: keys-tmp-dir mountPath: /tmp/csi/keys - name: csi-provisioner - image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.0 + image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.1 args: - "--csi-address=$(ADDRESS)" - "--v=1" @@ -90,7 +90,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-resizer - image: registry.k8s.io/sig-storage/csi-resizer:v1.10.0 + image: registry.k8s.io/sig-storage/csi-resizer:v1.10.1 args: - "--csi-address=$(ADDRESS)" - "--v=1" @@ -107,7 +107,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-snapshotter - image: registry.k8s.io/sig-storage/csi-snapshotter:v7.0.0 + image: registry.k8s.io/sig-storage/csi-snapshotter:v7.0.2 args: - "--csi-address=$(ADDRESS)" - "--v=1" diff --git a/deploy/nfs/kubernetes/csi-nfsplugin.yaml b/deploy/nfs/kubernetes/csi-nfsplugin.yaml index 557a3598039..1ec01b137fe 100644 --- a/deploy/nfs/kubernetes/csi-nfsplugin.yaml +++ b/deploy/nfs/kubernetes/csi-nfsplugin.yaml @@ -80,7 +80,7 @@ spec: securityContext: privileged: true allowPrivilegeEscalation: true - image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.0 + image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.1 args: - "--v=1" - "--csi-address=/csi/csi.sock" diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index da8991612cc..9f8081f0d9c 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -104,7 +104,7 @@ spec: mountPath: /run/secrets/tokens readOnly: true - name: csi-provisioner - image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.0 + image: registry.k8s.io/sig-storage/csi-provisioner:v4.0.1 args: - "--csi-address=$(ADDRESS)" - "--v=1" @@ -126,7 +126,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-snapshotter - image: registry.k8s.io/sig-storage/csi-snapshotter:v7.0.0 + image: registry.k8s.io/sig-storage/csi-snapshotter:v7.0.2 args: - "--csi-address=$(ADDRESS)" - "--v=1" @@ -142,7 +142,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-attacher - image: registry.k8s.io/sig-storage/csi-attacher:v4.5.0 + image: registry.k8s.io/sig-storage/csi-attacher:v4.5.1 args: - "--v=1" - "--csi-address=$(ADDRESS)" @@ -157,7 +157,7 @@ spec: - name: socket-dir mountPath: /csi - name: csi-resizer - image: registry.k8s.io/sig-storage/csi-resizer:v1.10.0 + image: registry.k8s.io/sig-storage/csi-resizer:v1.10.1 args: - "--csi-address=$(ADDRESS)" - "--v=1" diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index 0d0165f063b..2ac61eeb1ff 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -116,7 +116,7 @@ spec: securityContext: privileged: true allowPrivilegeEscalation: true - image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.0 + image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.10.1 args: - "--v=1" - "--csi-address=/csi/csi.sock" diff --git a/e2e/rbd.go b/e2e/rbd.go index 9c0cf68f5b4..1710156973a 100644 --- a/e2e/rbd.go +++ b/e2e/rbd.go @@ -3553,7 +3553,7 @@ var _ = Describe("RBD", func() { validateRBDImageCount(f, 1, defaultRBDPool) validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) for i := 0; i < snapChainDepth; i++ { - var pvcClone *v1.PersistentVolumeClaim + var pvcClone, smartClonePVC *v1.PersistentVolumeClaim snap := getSnapshot(snapshotPath) snap.Name = fmt.Sprintf("%s-%d", snap.Name, i) snap.Namespace = f.UniqueName @@ -3610,6 +3610,41 @@ var _ = Describe("RBD", func() { validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) validateOmapCount(f, 0, rbdType, defaultRBDPool, snapsType) + // create pvc-pvc clone to validate pvc-pvc clone creation + // of child PVC created from a snapshot which no longer exits. + // Snapshot-> restore PVC -> delete Snapshot -> PVC-PVC clone. + smartClonePVC, err = loadPVC(pvcSmartClonePath) + if err != nil { + framework.Failf("failed to load smart clone PVC: %v", err) + } + + smartClonePVC.Name = fmt.Sprintf("%s-%d", smartClonePVC.Name, i) + smartClonePVC.Namespace = f.UniqueName + smartClonePVC.Spec.DataSource.Name = pvcClone.Name + err = createPVCAndvalidatePV(f.ClientSet, smartClonePVC, deployTimeout) + if err != nil { + framework.Failf("failed to create smart clone PVC %q: %v", + smartClonePVC.Name, err) + } + + // validate created backend rbd images = clone + smart clone + temp image + totalImages = 3 + validateRBDImageCount(f, totalImages, defaultRBDPool) + validateOmapCount(f, 2, rbdType, defaultRBDPool, volumesType) + validateOmapCount(f, 0, rbdType, defaultRBDPool, snapsType) + + err = deletePVCAndValidatePV(f.ClientSet, smartClonePVC, deployTimeout) + if err != nil { + framework.Failf("failed to delete smart clone PVC %q: %v", + smartClonePVC.Name, err) + } + + // validate created backend rbd images = clone + totalImages = 1 + validateRBDImageCount(f, totalImages, defaultRBDPool) + validateOmapCount(f, 1, rbdType, defaultRBDPool, volumesType) + validateOmapCount(f, 0, rbdType, defaultRBDPool, snapsType) + app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = pvcClone.Name // create application err = createApp(f.ClientSet, app, deployTimeout) diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index fa3f68a7749..7b37a70eb61 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -146,6 +146,8 @@ type rbdImage struct { // Set metadata on volume EnableMetadata bool + // ParentInTrash indicates the parent image is in trash. + ParentInTrash bool } // rbdVolume represents a CSI volume and its RBD image specifics. @@ -1615,6 +1617,7 @@ func (ri *rbdImage) getImageInfo() error { } else { ri.ParentName = parentInfo.Image.ImageName ri.ParentPool = parentInfo.Image.PoolName + ri.ParentInTrash = parentInfo.Image.Trash } // Get image creation time tm, err := image.GetCreateTimestamp() @@ -1633,7 +1636,9 @@ func (ri *rbdImage) getParent() (*rbdImage, error) { if err != nil { return nil, err } - if ri.ParentName == "" { + // The image may not have a parent or the parent maybe in trash. + // Return nil in both the cases. + if ri.ParentName == "" || ri.ParentInTrash { return nil, nil } diff --git a/scripts/k8s-storage/driver-cephfs.yaml b/scripts/k8s-storage/driver-cephfs.yaml index 7c7de2c2f44..5a93d80a4ca 100644 --- a/scripts/k8s-storage/driver-cephfs.yaml +++ b/scripts/k8s-storage/driver-cephfs.yaml @@ -36,6 +36,18 @@ DriverInfo: # Volume ownership via fsGroup fsGroup: false + # multiple pods on a node can use the same volume concurrently + multipods: true + + # support online expansion + onlineExpansion: true + + # supports ReadWriteOncePod pod + readWriteOncePod: true + + # supports ROX AccessMode in PVC + capReadOnlyMany: true + # Raw block mode block: false