diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 5a4bb0f270b2..c049852f3193 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -20,9 +20,11 @@ package clusterimpl import ( "context" + "encoding/json" "errors" "fmt" "strings" + "sync/atomic" "testing" "time" @@ -40,6 +42,7 @@ import ( "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" @@ -820,6 +823,108 @@ func (s) TestUpdateLRSServer(t *testing.T) { } } +// Test verifies that child policies was updated on receipt of +// configuration update. +func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) { + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) + xdsC := fakeclient.NewClient() + + builder := balancer.Get(Name) + cc := testutils.NewBalancerClientConn(t) + b := builder.Build(cc, balancer.BuildOptions{}) + defer b.Close() + + childConfigUpdated := atomic.Bool{} + // Create a stub balancer which updates picker on receipt of config + // update, and notifies when child policy was updated via + // childConfigUpdated. + const childPolicyName = "stubBalancer-ChildPolicyUpdatedSynchronouslyOnConfigUpdate" + stub.Register(childPolicyName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bd.ClientConn.UpdateState(balancer.State{ + Picker: base.NewErrPicker(errors.New("dummy error picker")), + }) + childConfigUpdated.Store(true) + return nil + }, + }) + + testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ + URI: "trafficdirector.googleapis.com:443", + ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, + }) + if err != nil { + t.Fatalf("Failed to create LRS server config for testing: %v", err) + } + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + BalancerConfig: &LBConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: childPolicyName, + }, + }, + }); err != nil { + t.Fatalf("Error updating the config: %v", err) + } + + if !childConfigUpdated.Load() { + t.Fatal("Child policy was not updated on receipt of configuration update.") + } +} + +// Test verifies that config update fails if child policy config +// failed to parse. +func (s) TestFailedToParseChildPolicyConfig(t *testing.T) { + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) + xdsC := fakeclient.NewClient() + + builder := balancer.Get(Name) + cc := testutils.NewBalancerClientConn(t) + b := builder.Build(cc, balancer.BuildOptions{}) + defer b.Close() + + // Create a stub balancer which fails to ParseConfig. + const parseConfigError = "failed to parse config" + const childPolicyName = "stubBalancer-FailedToParseChildPolicyConfig" + stub.Register(childPolicyName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bd.ClientConn.UpdateState(balancer.State{ + Picker: base.NewErrPicker(errors.New("dummy error picker")), + }) + return nil + }, + ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return nil, errors.New(parseConfigError) + }, + }) + + testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ + URI: "trafficdirector.googleapis.com:443", + ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, + }) + if err != nil { + t.Fatalf("Failed to create LRS server config for testing: %v", err) + } + err = b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC), + BalancerConfig: &LBConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: childPolicyName, + }, + }, + }) + + if err == nil || !strings.Contains(err.Error(), parseConfigError) { + t.Errorf("Got error: %v, want error: parsed config to fail.", err) + } +} + func assertString(f func() (string, error)) string { s, err := f() if err != nil { diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 9058f0d01fc8..de3b9f023c4a 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -247,8 +247,22 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) } if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { - if err := b.child.SwitchTo(bb); err != nil { - return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err) + if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name { + strCfg, err := newConfig.ChildPolicy.MarshalJSON() + if err != nil { + return fmt.Errorf("error marshaling config: %v", err) + } + r := json.RawMessage(strCfg) + sc, err := gracefulswitch.ParseConfig(r) + if err != nil { + return fmt.Errorf("error parsing child config: %v", err) + } + if err := b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: sc, + }); err != nil { + return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err) + } } } b.config = newConfig