diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index eb1110ecc4c9..cfe6fc865aff 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -108,7 +108,7 @@ func (c *clientImpl) newAuthority(config *bootstrap.ServerConfig) (_ *authority, ret.close() } }() - ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger) + ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger, nil) if err != nil { return nil, err } diff --git a/xds/internal/xdsclient/client_test.go b/xds/internal/xdsclient/client_test.go index 31f6d466fbea..d496aa59adc3 100644 --- a/xds/internal/xdsclient/client_test.go +++ b/xds/internal/xdsclient/client_test.go @@ -102,7 +102,7 @@ type testController struct { func overrideNewController(t *testing.T) *testutils.Channel { origNewController := newController ch := testutils.NewChannel() - newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (controllerInterface, error) { + newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, _ func(int) time.Duration) (controllerInterface, error) { ret := newTestController(config) ch.Send(ret) return ret, nil diff --git a/xds/internal/xdsclient/controller.go b/xds/internal/xdsclient/controller.go index 431a14498e1f..a99f4b164949 100644 --- a/xds/internal/xdsclient/controller.go +++ b/xds/internal/xdsclient/controller.go @@ -18,6 +18,8 @@ package xdsclient import ( + "time" + "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/controller" @@ -33,6 +35,6 @@ type controllerInterface interface { Close() } -var newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (controllerInterface, error) { - return controller.New(config, pubsub, validator, logger) +var newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (controllerInterface, error) { + return controller.New(config, pubsub, validator, logger, boff) } diff --git a/xds/internal/xdsclient/controller/controller.go b/xds/internal/xdsclient/controller/controller.go index 3f7371ae63c7..d48297145472 100644 --- a/xds/internal/xdsclient/controller/controller.go +++ b/xds/internal/xdsclient/controller/controller.go @@ -100,7 +100,7 @@ func SetGRPCDial(dialer func(target string, opts ...grpc.DialOption) (*grpc.Clie } // New creates a new controller. -func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (_ *Controller, retErr error) { +func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (_ *Controller, retErr error) { switch { case config == nil: return nil, errors.New("xds: no xds_server provided") @@ -120,12 +120,15 @@ func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, val }), } + if boff == nil { + boff = backoff.DefaultExponential.Backoff + } ret := &Controller{ config: config, updateValidator: validator, updateHandler: updateHandler, - backoff: backoff.DefaultExponential.Backoff, // TODO: should this be configurable? + backoff: boff, streamCh: make(chan grpc.ClientStream, 1), sendCh: buffer.NewUnbounded(), watchMap: make(map[xdsresource.ResourceType]map[string]bool), diff --git a/xds/internal/xdsclient/controller/controller_test.go b/xds/internal/xdsclient/controller/controller_test.go index 644698cc26f7..599afb3a3e94 100644 --- a/xds/internal/xdsclient/controller/controller_test.go +++ b/xds/internal/xdsclient/controller/controller_test.go @@ -95,7 +95,7 @@ func (s) TestNew(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - c, err := New(test.config, noopUpdateHandler, nil, nil) // Only testing the config, other inputs are left as nil. + c, err := New(test.config, noopUpdateHandler, nil, nil, nil) // Only testing the config, other inputs are left as nil. defer func() { if c != nil { c.Close() @@ -123,7 +123,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) { // Set the dialer and make sure it is called. SetGRPCDial(customDialer) - c, err := New(config, noopUpdateHandler, nil, nil) + c, err := New(config, noopUpdateHandler, nil, nil, nil) if err != nil { t.Fatalf("New(%+v) = %v, want no error", config, err) } @@ -138,7 +138,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) { // Reset the dialer and make sure it is not called. SetGRPCDial(grpc.Dial) - c, err = New(config, noopUpdateHandler, nil, nil) + c, err = New(config, noopUpdateHandler, nil, nil, nil) defer func() { if c != nil { c.Close() diff --git a/xds/internal/xdsclient/controller/transport.go b/xds/internal/xdsclient/controller/transport.go index e97717d974c6..9e9836512725 100644 --- a/xds/internal/xdsclient/controller/transport.go +++ b/xds/internal/xdsclient/controller/transport.go @@ -59,26 +59,21 @@ func (t *Controller) run(ctx context.Context) { // report error (and log) when stats is transient failure. retries := 0 - for { - select { - case <-ctx.Done(): - return - default: - } - - if retries != 0 { - timer := time.NewTimer(t.backoff(retries)) + lastStreamStartTime := time.Time{} + for ctx.Err() == nil { + dur := time.Until(lastStreamStartTime.Add(t.backoff(retries))) + if dur > 0 { + timer := time.NewTimer(dur) select { case <-timer.C: case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } + timer.Stop() return } } retries++ + lastStreamStartTime = time.Now() stream, err := t.vClient.NewStream(ctx, t.cc) if err != nil { t.updateHandler.NewConnectionError(err) @@ -370,24 +365,21 @@ func (t *Controller) processAckInfo(ack *ackAction, stream grpc.ClientStream) (t // It blocks until the context is cancelled. func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts controllerversion.LoadReportingOptions) { retries := 0 - for { - if ctx.Err() != nil { - return - } - - if retries != 0 { - timer := time.NewTimer(t.backoff(retries)) + lastStreamStartTime := time.Time{} + for ctx.Err() == nil { + dur := time.Until(lastStreamStartTime.Add(t.backoff(retries))) + if dur > 0 { + timer := time.NewTimer(dur) select { case <-timer.C: case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } + timer.Stop() return } } retries++ + lastStreamStartTime = time.Now() stream, err := t.vClient.NewLoadStatsStream(ctx, cc) if err != nil { t.logger.Warningf("lrs: failed to create stream: %v", err) diff --git a/xds/internal/xdsclient/controller/v2_testutils_test.go b/xds/internal/xdsclient/controller/v2_testutils_test.go index dfd195827d46..de147d480e54 100644 --- a/xds/internal/xdsclient/controller/v2_testutils_test.go +++ b/xds/internal/xdsclient/controller/v2_testutils_test.go @@ -458,13 +458,10 @@ func newTestController(p pubsub.UpdateHandler, controlPlanAddr string, n *basepb Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), TransportAPI: version.TransportV2, NodeProto: n, - }, p, nil, l) + }, p, nil, l, b) if err != nil { return nil, err } - // This direct setting backoff seems a bit hacky, but should be OK for the - // tests. Or we need to make it configurable in New(). - c.backoff = b return c, nil }