Skip to content

Commit

Permalink
Call gracefulswitch UpdateClientConnState at the end of config update
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Sep 5, 2024
1 parent 51d0d75 commit 943294d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 50 deletions.
76 changes: 46 additions & 30 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -834,44 +833,69 @@ func (s) TestChildPolicyUpdatedOnConfigUpdate(t *testing.T) {
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

childConfigUpdated := atomic.Bool{}
// Create a stub balancer which updates picker on receipt of config
// update, and notifies when child policy was updated via
// childConfigUpdated.
const childPolicyName = "stubBalancer-ChildPolicyUpdatedSynchronouslyOnConfigUpdate"
stub.Register(childPolicyName, stub.BalancerFuncs{
// Keep track of which child policy was updated
updatedChildPolicy := ""

// Create stub balancers to track config updates
const (
childPolicyName1 = "stubBalancer1"
childPolicyName2 = "stubBalancer2"
)

stub.Register(childPolicyName1, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
updatedChildPolicy = childPolicyName1
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
childConfigUpdated.Store(true)
return nil
},
})

testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
stub.Register(childPolicyName2, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
updatedChildPolicy = childPolicyName2
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
return nil
},
})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)

// Initial config update with childPolicyName1

Check failure on line 865 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'ccs' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName1,
},
},
}); err != nil {

Check failure on line 875 in xds/internal/balancer/clusterimpl/balancer_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

parameter 'ccs' seems to be unused, consider removing or renaming it as _ https://revive.run/r#unused-parameter
t.Fatalf("Error updating the config: %v", err)
}

if updatedChildPolicy != childPolicyName1 {
t.Fatal("Child policy 1 was not updated on initial configuration update.")
}

// Second config update with childPolicyName2
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName,
Name: childPolicyName2,
},
},
}); err != nil {
t.Fatalf("Error updating the config: %v", err)
}

if !childConfigUpdated.Load() {
t.Fatal("Child policy was not updated on receipt of configuration update.")
if updatedChildPolicy != childPolicyName2 {
t.Fatal("Child policy 2 was not updated after child policy name change.")
}
}

Expand Down Expand Up @@ -901,19 +925,11 @@ func (s) TestFailedToParseChildPolicyConfig(t *testing.T) {
},
})

testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}
err = b.UpdateClientConnState(balancer.ClientConnState{
err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName,
},
Expand Down
30 changes: 10 additions & 20 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,35 +246,25 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
return err
}

if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
strCfg, err := newConfig.ChildPolicy.MarshalJSON()
if err != nil {
return fmt.Errorf("error marshaling config: %v", err)
}
r := json.RawMessage(strCfg)
sc, err := gracefulswitch.ParseConfig(r)
if err != nil {
return fmt.Errorf("error parsing child config: %v", err)
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: sc,
}); err != nil {
return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
}
}
}
b.config = newConfig

// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)

// Build config for the gracefulswitch balancer. It is safe to ignore JSON
// marshaling errors here, since the config was already validated as part of
// ParseConfig().
cfg := []map[string]any{{newConfig.ChildPolicy.Name: newConfig.ChildPolicy.Config}}
cfgJSON, _ := json.Marshal(cfg)
parsedCfg, err := gracefulswitch.ParseConfig(cfgJSON)
if err != nil {
return err
}
// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.config.ChildPolicy.Config,
BalancerConfig: parsedCfg,
})
}

Expand Down

0 comments on commit 943294d

Please sign in to comment.