diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 6a55217e7a87..84c5753adf61 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -22,12 +22,14 @@ import ( "encoding/json" "fmt" + wrapperspb "github.com/golang/protobuf/ptypes/wrappers" xdsclient "google.golang.org/grpc/xds/internal/client" ) const ( cdsName = "cds_experimental" weightedTargetName = "weighted_target_experimental" + xdsRoutingName = "xds_routing_experimental" ) type serviceConfig struct { @@ -53,24 +55,91 @@ type cdsBalancerConfig struct { Cluster string `json:"cluster"` } -func serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) { +type route struct { + Path *string `json:"path,omitempty"` + Prefix *string `json:"prefix,omitempty"` + Regex *string `json:"regex,omitempty"` + Headers []*xdsclient.HeaderMatcher `json:"headers,omitempty"` + Fraction *wrapperspb.UInt32Value `json:"matchFraction,omitempty"` + Action string `json:"action"` +} + +type xdsActionConfig struct { + ChildPolicy balancerConfig `json:"childPolicy"` +} + +type xdsRoutingBalancerConfig struct { + Action map[string]xdsActionConfig `json:"action"` + Route []*route `json:"route"` +} + +func (r *xdsResolver) routesToJSON(routes []*xdsclient.Route) (string, error) { + r.updateActions(newActionsFromRoutes(routes)) + + // Generate routes. + var rts []*route + for _, rt := range routes { + t := &route{ + Path: rt.Path, + Prefix: rt.Prefix, + Regex: rt.Regex, + Headers: rt.Headers, + } + + if f := rt.Fraction; f != nil { + t.Fraction = &wrapperspb.UInt32Value{Value: *f} + } + + t.Action = r.getActionAssignedName(rt.Action) + rts = append(rts, t) + } + + // Generate actions. + action := make(map[string]xdsActionConfig) + for _, act := range r.actions { + action[act.assignedName] = xdsActionConfig{ + ChildPolicy: weightedClusterToBalancerConfig(act.clustersWithWeights), + } + } + + sc := serviceConfig{ + LoadBalancingConfig: newBalancerConfig( + xdsRoutingName, xdsRoutingBalancerConfig{ + Route: rts, + Action: action, + }, + ), + } + + bs, err := json.Marshal(sc) + if err != nil { + return "", fmt.Errorf("failed to marshal json: %v", err) + } + return string(bs), nil +} + +func weightedClusterToBalancerConfig(wc map[string]uint32) balancerConfig { // Even if WeightedCluster has only one entry, we still use weighted_target // as top level balancer, to avoid switching top policy between CDS and // weighted_target, causing TCP connection to be recreated. targets := make(map[string]cdsWithWeight) - for name, weight := range su.WeightedCluster { + for name, weight := range wc { targets[name] = cdsWithWeight{ Weight: weight, ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: name}), } } + bc := newBalancerConfig( + weightedTargetName, weightedCDSBalancerConfig{ + Targets: targets, + }, + ) + return bc +} +func weightedClusterToJSON(wc map[string]uint32) (string, error) { sc := serviceConfig{ - LoadBalancingConfig: newBalancerConfig( - weightedTargetName, weightedCDSBalancerConfig{ - Targets: targets, - }, - ), + LoadBalancingConfig: weightedClusterToBalancerConfig(wc), } bs, err := json.Marshal(sc) if err != nil { @@ -78,3 +147,13 @@ func serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) { } return string(bs), nil } + +func (r *xdsResolver) serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) { + // If WeightedClusters is set, routing is disabled (by env variable). Use + // weighted target only. + if su.WeightedCluster != nil { + return weightedClusterToJSON(su.WeightedCluster) + } + + return r.routesToJSON(su.Routes) +} diff --git a/xds/internal/resolver/serviceconfig_action.go b/xds/internal/resolver/serviceconfig_action.go new file mode 100644 index 000000000000..d582048fda09 --- /dev/null +++ b/xds/internal/resolver/serviceconfig_action.go @@ -0,0 +1,186 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "fmt" + "math" + "sort" + "strconv" + + "google.golang.org/grpc/internal/grpcrand" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +type actionWithAssignedName struct { + // cluster:weight, "A":40, "B":60 + clustersWithWeights map[string]uint32 + // clusterNames, without weights, sorted and hashed, "A_B_" + clusterNames string + // The assigned name, clusters plus a random number, "A_B_1" + assignedName string + // randomNumber is the number appended to assignedName. + randomNumber int64 +} + +// newActionsFromRoutes gets actions from the routes, and turns them into a map +// keyed by the hash of the clusters. +// +// In the returned map, all actions don't have assignedName. The assignedName +// will be filled in after comparing the new actions with the existing actions, +// so when a new and old action only diff in weights, the new action can reuse +// the old action's name. +// +// from +// {B:60, A:40}, {A:30, B:70}, {B:90, C:10} +// +// to +// A40_B60_: {{A:40, B:60}, "A_B_", ""} +// A30_B70_: {{A:30, B:70}, "A_B_", ""} +// B90_C10_: {{B:90, C:10}, "B_C_", ""} +func newActionsFromRoutes(routes []*xdsclient.Route) map[string]actionWithAssignedName { + newActions := make(map[string]actionWithAssignedName) + for _, route := range routes { + var clusterNames []string + for n := range route.Action { + clusterNames = append(clusterNames, n) + } + + // Sort names to be consistent. + sort.Strings(clusterNames) + clustersOnly := "" + clustersWithWeight := "" + for _, c := range clusterNames { + // Generates A_B_ + clustersOnly = clustersOnly + c + "_" + // Generates A40_B60_ + clustersWithWeight = clustersWithWeight + c + strconv.FormatUint(uint64(route.Action[c]), 10) + "_" + } + + if _, ok := newActions[clustersWithWeight]; !ok { + newActions[clustersWithWeight] = actionWithAssignedName{ + clustersWithWeights: route.Action, + clusterNames: clustersOnly, + } + } + } + return newActions +} + +// updateActions takes a new map of actions, and updates the existing action map in the resolver. +// +// In the old map, all actions have assignedName set. +// In the new map, all actions have no assignedName. +// +// After the update, the action map is updated to have all actions from the new +// map, with assignedName: +// - if the new action exists in old, get the old name +// - if the new action doesn't exist in old +// - if there is an old action that will be removed, and has the same set of +// clusters, reuse the old action's name +// - otherwise, generate a new name +func (r *xdsResolver) updateActions(newActions map[string]actionWithAssignedName) { + if r.actions == nil { + r.actions = make(map[string]actionWithAssignedName) + } + + // Delete actions from existingActions if they are not in newActions. Keep + // the removed actions in a map, with key as clusterNames without weights, + // so their assigned names can be reused. + existingActions := r.actions + actionsRemoved := make(map[string][]string) + for actionHash, act := range existingActions { + if _, ok := newActions[actionHash]; !ok { + actionsRemoved[act.clusterNames] = append(actionsRemoved[act.clusterNames], act.assignedName) + delete(existingActions, actionHash) + } + } + + // Find actions in newActions but not in oldActions. Add them, and try to + // reuse assigned names from actionsRemoved. + if r.usedActionNameRandomNumber == nil { + r.usedActionNameRandomNumber = make(map[int64]bool) + } + for actionHash, act := range newActions { + if _, ok := existingActions[actionHash]; !ok { + if assignedNamed, ok := actionsRemoved[act.clusterNames]; ok { + // Reuse the first assigned name from actionsRemoved. + act.assignedName = assignedNamed[0] + // If there are more names to reuse after this, update the slice + // in the map. Otherwise, remove the entry from the map. + if len(assignedNamed) > 1 { + actionsRemoved[act.clusterNames] = assignedNamed[1:] + } else { + delete(actionsRemoved, act.clusterNames) + } + existingActions[actionHash] = act + continue + } + // Generate a new name. + act.randomNumber = r.nextAssignedNameRandomNumber() + act.assignedName = fmt.Sprintf("%s%d", act.clusterNames, act.randomNumber) + existingActions[actionHash] = act + } + } + + // Delete entry from nextIndex if all actions with the clusters are removed. + remainingRandomNumbers := make(map[int64]bool) + for _, act := range existingActions { + remainingRandomNumbers[act.randomNumber] = true + } + r.usedActionNameRandomNumber = remainingRandomNumbers +} + +var grpcrandInt63n = grpcrand.Int63n + +func (r *xdsResolver) nextAssignedNameRandomNumber() int64 { + for { + t := grpcrandInt63n(math.MaxInt32) + if !r.usedActionNameRandomNumber[t] { + return t + } + } +} + +// getActionAssignedName hashes the clusters from the action, and find the +// assigned action name. The assigned action names are kept in r.actions, with +// the clusters name hash as map key. +// +// The assigned action name is not simply the hash. For example, the hash can be +// "A40_B60_", but the assigned name can be "A_B_0". It's this way so the action +// can be reused if only weights are changing. +func (r *xdsResolver) getActionAssignedName(action map[string]uint32) string { + var clusterNames []string + for n := range action { + clusterNames = append(clusterNames, n) + } + // Hash cluster names. Sort names to be consistent. + sort.Strings(clusterNames) + clustersWithWeight := "" + for _, c := range clusterNames { + // Generates hash "A40_B60_". + clustersWithWeight = clustersWithWeight + c + strconv.FormatUint(uint64(action[c]), 10) + "_" + } + // Look in r.actions for the assigned action name. + if act, ok := r.actions[clustersWithWeight]; ok { + return act.assignedName + } + r.logger.Warningf("no assigned name found for action %v", action) + return "" +} diff --git a/xds/internal/resolver/serviceconfig_action_test.go b/xds/internal/resolver/serviceconfig_action_test.go new file mode 100644 index 000000000000..bfc0e6830155 --- /dev/null +++ b/xds/internal/resolver/serviceconfig_action_test.go @@ -0,0 +1,356 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + xdsclient "google.golang.org/grpc/xds/internal/client" +) + +func TestNewActionsFromRoutes(t *testing.T) { + tests := []struct { + name string + routes []*xdsclient.Route + want map[string]actionWithAssignedName + }{ + { + name: "temp", + routes: []*xdsclient.Route{ + {Action: map[string]uint32{"B": 60, "A": 40}}, + {Action: map[string]uint32{"A": 30, "B": 70}}, + {Action: map[string]uint32{"B": 90, "C": 10}}, + }, + want: map[string]actionWithAssignedName{ + "A40_B60_": {map[string]uint32{"A": 40, "B": 60}, "A_B_", "", 0}, + "A30_B70_": {map[string]uint32{"A": 30, "B": 70}, "A_B_", "", 0}, + "B90_C10_": {map[string]uint32{"B": 90, "C": 10}, "B_C_", "", 0}, + }, + }, + } + + cmpOpts := []cmp.Option{cmp.AllowUnexported(actionWithAssignedName{})} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := newActionsFromRoutes(tt.routes); !cmp.Equal(got, tt.want, cmpOpts...) { + t.Errorf("newActionsFromRoutes() got unexpected result, diff %v", cmp.Diff(got, tt.want, cmpOpts...)) + } + }) + } +} + +func TestRemoveOrReuseName(t *testing.T) { + tests := []struct { + name string + oldActions map[string]actionWithAssignedName + oldRandNums map[int64]bool + newActions map[string]actionWithAssignedName + wantActions map[string]actionWithAssignedName + wantRandNums map[int64]bool + }{ + { + name: "add same cluster", + oldActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + }, + oldRandNums: map[int64]bool{ + 0: true, + }, + newActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + }, + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + }, + }, + wantActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_1000", + randomNumber: 1000, + }, + }, + wantRandNums: map[int64]bool{ + 0: true, + 1000: true, + }, + }, + { + name: "delete same cluster", + oldActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_1", + randomNumber: 1, + }, + }, + oldRandNums: map[int64]bool{ + 0: true, + 1: true, + }, + newActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + }, + }, + wantActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + }, + wantRandNums: map[int64]bool{ + 0: true, + }, + }, + { + name: "add new clusters", + oldActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + }, + oldRandNums: map[int64]bool{ + 0: true, + }, + newActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + }, + "a50_b50_": { + clustersWithWeights: map[string]uint32{"a": 50, "b": 50}, + clusterNames: "a_b_", + }, + }, + wantActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + "a50_b50_": { + clustersWithWeights: map[string]uint32{"a": 50, "b": 50}, + clusterNames: "a_b_", + assignedName: "a_b_1000", + randomNumber: 1000, + }, + }, + wantRandNums: map[int64]bool{ + 0: true, + 1000: true, + }, + }, + { + name: "reuse", + oldActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + }, + oldRandNums: map[int64]bool{ + 0: true, + }, + newActions: map[string]actionWithAssignedName{ + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + }, + }, + wantActions: map[string]actionWithAssignedName{ + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + }, + wantRandNums: map[int64]bool{ + 0: true, + }, + }, + { + name: "add and reuse", + oldActions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_1", + randomNumber: 1, + }, + "a50_b50_": { + clustersWithWeights: map[string]uint32{"a": 50, "b": 50}, + clusterNames: "a_b_", + assignedName: "a_b_2", + randomNumber: 2, + }, + }, + oldRandNums: map[int64]bool{ + 0: true, + 1: true, + 2: true, + }, + newActions: map[string]actionWithAssignedName{ + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + }, + "a30_b30_c40_": { + clustersWithWeights: map[string]uint32{"a": 30, "b": 30, "c": 40}, + clusterNames: "a_b_c_", + }, + "c50_d50_": { + clustersWithWeights: map[string]uint32{"c": 50, "d": 50}, + clusterNames: "c_d_", + }, + }, + wantActions: map[string]actionWithAssignedName{ + "a10_b50_c40_": { + clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_1", + randomNumber: 1, + }, + "a30_b30_c40_": { + clustersWithWeights: map[string]uint32{"a": 30, "b": 30, "c": 40}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + randomNumber: 0, + }, + "c50_d50_": { + clustersWithWeights: map[string]uint32{"c": 50, "d": 50}, + clusterNames: "c_d_", + assignedName: "c_d_1000", + randomNumber: 1000, + }, + }, + wantRandNums: map[int64]bool{ + 0: true, + 1: true, + 1000: true, + }, + }, + } + cmpOpts := []cmp.Option{cmp.AllowUnexported(actionWithAssignedName{})} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer replaceRandNumGenerator(1000)() + r := &xdsResolver{ + actions: tt.oldActions, + usedActionNameRandomNumber: tt.oldRandNums, + } + r.updateActions(tt.newActions) + if !cmp.Equal(r.actions, tt.wantActions, cmpOpts...) { + t.Errorf("removeOrReuseName() got unexpected actions, diff %v", cmp.Diff(r.actions, tt.wantActions, cmpOpts...)) + } + if !cmp.Equal(r.usedActionNameRandomNumber, tt.wantRandNums) { + t.Errorf("removeOrReuseName() got unexpected nextIndex, diff %v", cmp.Diff(r.usedActionNameRandomNumber, tt.wantRandNums)) + } + }) + } +} + +func TestGetActionAssignedName(t *testing.T) { + tests := []struct { + name string + actions map[string]actionWithAssignedName + action map[string]uint32 + want string + }{ + { + name: "good", + actions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + }, + }, + action: map[string]uint32{"a": 20, "b": 30, "c": 50}, + want: "a_b_c_0", + }, + { + name: "two", + actions: map[string]actionWithAssignedName{ + "a20_b30_c50_": { + clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50}, + clusterNames: "a_b_c_", + assignedName: "a_b_c_0", + }, + "c50_d50_": { + clustersWithWeights: map[string]uint32{"c": 50, "d": 50}, + clusterNames: "c_d_", + assignedName: "c_d_0", + }, + }, + action: map[string]uint32{"c": 50, "d": 50}, + want: "c_d_0", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &xdsResolver{ + actions: tt.actions, + } + if got := r.getActionAssignedName(tt.action); got != tt.want { + t.Errorf("getActionAssignedName() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/xds/internal/resolver/serviceconfig_test.go b/xds/internal/resolver/serviceconfig_test.go index 6e83a23cdd5d..4e149893ee70 100644 --- a/xds/internal/resolver/serviceconfig_test.go +++ b/xds/internal/resolver/serviceconfig_test.go @@ -23,9 +23,12 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/serviceconfig" _ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" + _ "google.golang.org/grpc/xds/internal/balancer/xdsrouting" "google.golang.org/grpc/xds/internal/client" + xdsclient "google.golang.org/grpc/xds/internal/client" ) const ( @@ -34,7 +37,7 @@ const ( "weighted_target_experimental": { "targets": { "test-cluster-1" : { "weight":1, "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } } -}]}` + }]}` testWeightedCDSJSON = `{"loadBalancingConfig":[{ "weighted_target_experimental": { "targets": { @@ -48,30 +51,303 @@ const ( } } } -}]}` + }]}` testWeightedCDSNoChildJSON = `{"loadBalancingConfig":[{ "weighted_target_experimental": { "targets": {} } -}]}` + }]}` + testRoutingJSON = `{"loadBalancingConfig":[{ + "xds_routing_experimental": { + "action":{ + "cluster_1_cluster_2_0":{ + "childPolicy":[{ + "weighted_target_experimental": { + "targets": { + "cluster_1" : { + "weight":75, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + }, + "cluster_2" : { + "weight":25, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + } + } + } + }] + } + }, + + "route":[{ + "path":"/service_1/method_1", + "action":"cluster_1_cluster_2_0" + }] + } + }]} +` + testRoutingAllMatchersJSON = `{"loadBalancingConfig":[{ + "xds_routing_experimental": { + "action":{ + "cluster_1_0":{ + "childPolicy":[{ + "weighted_target_experimental": { + "targets": { + "cluster_1" : { + "weight":1, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + } + } + } + }] + }, + "cluster_2_0":{ + "childPolicy":[{ + "weighted_target_experimental": { + "targets": { + "cluster_2" : { + "weight":1, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + } + } + } + }] + }, + "cluster_3_0":{ + "childPolicy":[{ + "weighted_target_experimental": { + "targets": { + "cluster_3" : { + "weight":1, + "childPolicy":[{"cds_experimental":{"cluster":"cluster_3"}}] + } + } + } + }] + } + }, + + "route":[{ + "path":"/service_1/method_1", + "action":"cluster_1_0" + }, + { + "prefix":"/service_2/method_1", + "action":"cluster_1_0" + }, + { + "regex":"^/service_2/method_3$", + "action":"cluster_1_0" + }, + { + "prefix":"", + "headers":[{"name":"header-1", "exactMatch":"value-1", "invertMatch":true}], + "action":"cluster_2_0" + }, + { + "prefix":"", + "headers":[{"name":"header-1", "regexMatch":"^value-1$"}], + "action":"cluster_2_0" + }, + { + "prefix":"", + "headers":[{"name":"header-1", "rangeMatch":{"start":-1, "end":7}}], + "action":"cluster_3_0" + }, + { + "prefix":"", + "headers":[{"name":"header-1", "presentMatch":true}], + "action":"cluster_3_0" + }, + { + "prefix":"", + "headers":[{"name":"header-1", "prefixMatch":"value-1"}], + "action":"cluster_2_0" + }, + { + "prefix":"", + "headers":[{"name":"header-1", "suffixMatch":"value-1"}], + "action":"cluster_2_0" + }, + { + "prefix":"", + "matchFraction":{"value": 31415}, + "action":"cluster_3_0" + }] + } + }]} +` ) -func TestServiceUpdateToJSON(t *testing.T) { +func TestWeightedClusterToJSON(t *testing.T) { tests := []struct { name string - su client.ServiceUpdate + wc map[string]uint32 wantJSON string // wantJSON is not to be compared verbatim. }{ { name: "one cluster only", - su: client.ServiceUpdate{WeightedCluster: map[string]uint32{testCluster1: 1}}, + wc: map[string]uint32{testCluster1: 1}, wantJSON: testClusterOnlyJSON, }, { name: "empty weighted clusters", - su: client.ServiceUpdate{WeightedCluster: nil}, + wc: nil, wantJSON: testWeightedCDSNoChildJSON, }, + { + name: "weighted clusters", + wc: map[string]uint32{ + "cluster_1": 75, + "cluster_2": 25, + }, + wantJSON: testWeightedCDSJSON, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotJSON, err := weightedClusterToJSON(tt.wc) + if err != nil { + t.Errorf("serviceUpdateToJSON returned error: %v", err) + return + } + + gotParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(gotJSON) + wantParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(tt.wantJSON) + + if !internal.EqualServiceConfigForTesting(gotParsed.Config, wantParsed.Config) { + t.Errorf("serviceUpdateToJSON() = %v, want %v", gotJSON, tt.wantJSON) + t.Error("gotParsed: ", cmp.Diff(nil, gotParsed)) + t.Error("wantParsed: ", cmp.Diff(nil, wantParsed)) + } + }) + } +} + +func TestRoutesToJSON(t *testing.T) { + tests := []struct { + name string + routes []*xdsclient.Route + wantJSON string + wantErr bool + }{ + { + name: "one route", + routes: []*xdsclient.Route{{ + Path: newStringP("/service_1/method_1"), + Action: map[string]uint32{"cluster_1": 75, "cluster_2": 25}, + }}, + wantJSON: testRoutingJSON, + wantErr: false, + }, + { + name: "all matchers", + routes: []*xdsclient.Route{ + { + Path: newStringP("/service_1/method_1"), + Action: map[string]uint32{"cluster_1": 1}, + }, + { + Prefix: newStringP("/service_2/method_1"), + Action: map[string]uint32{"cluster_1": 1}, + }, + { + Regex: newStringP("^/service_2/method_3$"), + Action: map[string]uint32{"cluster_1": 1}, + }, + { + Prefix: newStringP(""), + Headers: []*xdsclient.HeaderMatcher{{ + Name: "header-1", + InvertMatch: newBoolP(true), + ExactMatch: newStringP("value-1"), + }}, + Action: map[string]uint32{"cluster_2": 1}, + }, + { + Prefix: newStringP(""), + Headers: []*xdsclient.HeaderMatcher{{ + Name: "header-1", + RegexMatch: newStringP("^value-1$"), + }}, + Action: map[string]uint32{"cluster_2": 1}, + }, + { + Prefix: newStringP(""), + Headers: []*xdsclient.HeaderMatcher{{ + Name: "header-1", + RangeMatch: &xdsclient.Int64Range{Start: -1, End: 7}, + }}, + Action: map[string]uint32{"cluster_3": 1}, + }, + { + Prefix: newStringP(""), + Headers: []*xdsclient.HeaderMatcher{{ + Name: "header-1", + PresentMatch: newBoolP(true), + }}, + Action: map[string]uint32{"cluster_3": 1}, + }, + { + Prefix: newStringP(""), + Headers: []*xdsclient.HeaderMatcher{{ + Name: "header-1", + PrefixMatch: newStringP("value-1"), + }}, + Action: map[string]uint32{"cluster_2": 1}, + }, + { + Prefix: newStringP(""), + Headers: []*xdsclient.HeaderMatcher{{ + Name: "header-1", + SuffixMatch: newStringP("value-1"), + }}, + Action: map[string]uint32{"cluster_2": 1}, + }, + { + Prefix: newStringP(""), + Fraction: newUint32P(31415), + Action: map[string]uint32{"cluster_3": 1}, + }, + }, + wantJSON: testRoutingAllMatchersJSON, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Note this random number function only generates 0. This is + // because the test doesn't handle action update, and there's only + // one action for each cluster bundle. + // + // This is necessary so the output is deterministic. + grpcrandInt63n = func(int64) int64 { return 0 } + defer func() { grpcrandInt63n = grpcrand.Int63n }() + + gotJSON, err := (&xdsResolver{}).routesToJSON(tt.routes) + if err != nil { + t.Errorf("routesToJSON returned error: %v", err) + return + } + + gotParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(gotJSON) + wantParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(tt.wantJSON) + + if !internal.EqualServiceConfigForTesting(gotParsed.Config, wantParsed.Config) { + t.Errorf("serviceUpdateToJSON() = %v, want %v", gotJSON, tt.wantJSON) + t.Error("gotParsed: ", cmp.Diff(nil, gotParsed)) + t.Error("wantParsed: ", cmp.Diff(nil, wantParsed)) + } + }) + } +} + +func TestServiceUpdateToJSON(t *testing.T) { + tests := []struct { + name string + su client.ServiceUpdate + wantJSON string + wantErr bool + }{ { name: "weighted clusters", su: client.ServiceUpdate{WeightedCluster: map[string]uint32{ @@ -79,11 +355,24 @@ func TestServiceUpdateToJSON(t *testing.T) { "cluster_2": 25, }}, wantJSON: testWeightedCDSJSON, + wantErr: false, + }, + { + name: "routing", + su: client.ServiceUpdate{ + Routes: []*xdsclient.Route{{ + Path: newStringP("/service_1/method_1"), + Action: map[string]uint32{"cluster_1": 75, "cluster_2": 25}, + }}, + }, + wantJSON: testRoutingJSON, + wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotJSON, err := serviceUpdateToJSON(tt.su) + defer replaceRandNumGenerator(0)() + gotJSON, err := (&xdsResolver{}).serviceUpdateToJSON(tt.su) if err != nil { t.Errorf("serviceUpdateToJSON returned error: %v", err) return @@ -100,3 +389,19 @@ func TestServiceUpdateToJSON(t *testing.T) { }) } } + +// Two updates to the same resolver, test that action names are reused. +func TestServiceUpdateToJSON_TwoConfig_UpdateActions(t *testing.T) { +} + +func newStringP(s string) *string { + return &s +} + +func newBoolP(b bool) *bool { + return &b +} + +func newUint32P(i uint32) *uint32 { + return &i +} diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index fb46a91603c2..cdd103ef7dc3 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -159,6 +159,16 @@ type xdsResolver struct { updateCh chan suWithError // cancelWatch is the function to cancel the watcher. cancelWatch func() + + // actions is a map from hash of weighted cluster, to the weighted cluster + // map, and it's assigned name. E.g. + // "A40_B60_": {{A:40, B:60}, "A_B_", "A_B_0"} + // "A30_B70_": {{A:30, B:70}, "A_B_", "A_B_1"} + // "B90_C10_": {{B:90, C:10}, "B_C_", "B_C_0"} + actions map[string]actionWithAssignedName + // usedActionNameRandomNumber contains random numbers that have been used in + // assigned names, to avoid collision. + usedActionNameRandomNumber map[int64]bool } // run is a long running goroutine which blocks on receiving service updates @@ -185,7 +195,7 @@ func (r *xdsResolver) run() { r.cc.ReportError(update.err) continue } - sc, err := serviceUpdateToJSON(update.su) + sc, err := r.serviceUpdateToJSON(update.su) if err != nil { r.logger.Warningf("failed to convert update to service config: %v", err) r.cc.ReportError(err) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 025a8b2dbc1a..d84ec44eb8c0 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" xdsinternal "google.golang.org/grpc/xds/internal" @@ -446,3 +447,15 @@ func TestXDSResolverResourceNotFoundError(t *testing.T) { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } } + +func replaceRandNumGenerator(start int64) func() { + nextInt := start + grpcrandInt63n = func(int64) (ret int64) { + ret = nextInt + nextInt++ + return + } + return func() { + grpcrandInt63n = grpcrand.Int63n + } +}