diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go index 186409bf9bc7..a29658ec3141 100644 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -251,9 +251,6 @@ func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresourc var priorityIntSlice []int priorities := make(map[int][]xdsresource.Locality) for _, locality := range localities { - if locality.Weight == 0 { - continue - } priority := int(locality.Priority) priorities[priority] = append(priorities[priority], locality) priorityIntSlice = append(priorityIntSlice, priority) diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index ddafa18a6100..2621b791494b 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -295,31 +295,6 @@ func (s) TestEDS_TwoLocalities(t *testing.T) { if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}); err != nil { t.Fatal(err) } - - // Change weight of the locality[1] to 0, it should never be picked. - clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil) - clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil) - - // Changing weight of locality[1] to 0 caused it to be removed. It's subconn - // should also be removed. - // - // NOTE: this is because we handle locality with weight 0 same as the - // locality doesn't exist. If this changes in the future, this removeSubConn - // behavior will also change. - scToRemove2 := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove2, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove2) - } - - // Test pick with two subconns different locality weight. - // - // Locality-1 will be not be picked, and locality-2 will be picked. - // Locality-2 contains sc3 and sc4. So expect sc3, sc4. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3, sc4}); err != nil { - t.Fatal(err) - } } // The EDS balancer gets EDS resp with unhealthy endpoints. Test that only diff --git a/xds/internal/xdsclient/xdsresource/type_eds.go b/xds/internal/xdsclient/xdsresource/type_eds.go index ad590160f6af..ec70f32ca436 100644 --- a/xds/internal/xdsclient/xdsresource/type_eds.go +++ b/xds/internal/xdsclient/xdsresource/type_eds.go @@ -64,7 +64,10 @@ type Locality struct { // EndpointsUpdate contains an EDS update. type EndpointsUpdate struct { - Drops []OverloadDropConfig + Drops []OverloadDropConfig + // Localities in the EDS response with `load_balancing_weight` field not set + // or explicitly set to 0 are ignored while parsing the resource, and + // therefore do not show up here. Localities []Locality // Raw is the resource from the xds response. diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go index 9eb7117d9a22..7cc12d73d6d5 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds.go @@ -57,7 +57,7 @@ func unmarshalEndpointsResource(r *anypb.Any, logger *grpclog.PrefixLogger) (str } logger.Infof("Resource with name: %v, type: %T, contains: %v", cla.GetClusterName(), cla, pretty.ToJSON(cla)) - u, err := parseEDSRespProto(cla) + u, err := parseEDSRespProto(cla, logger) if err != nil { return cla.GetClusterName(), EndpointsUpdate{}, err } @@ -102,7 +102,7 @@ func parseEndpoints(lbEndpoints []*v3endpointpb.LbEndpoint) []Endpoint { return endpoints } -func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, error) { +func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment, logger *grpclog.PrefixLogger) (EndpointsUpdate, error) { ret := EndpointsUpdate{} for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) @@ -113,6 +113,11 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate, if l == nil { return EndpointsUpdate{}, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) } + weight := locality.GetLoadBalancingWeight().GetValue() + if weight == 0 { + logger.Warningf("Ignoring locality %s with weight 0", pretty.ToJSON(l)) + continue + } lid := internal.LocalityID{ Region: l.Region, Zone: l.Zone, diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go index db9d4f52896b..3ce069c29664 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_eds_test.go @@ -75,6 +75,20 @@ func (s) TestEDSParseRespProto(t *testing.T) { want: EndpointsUpdate{}, wantErr: true, }, + { + name: "missing locality weight", + m: func() *v3endpointpb.ClusterLoadAssignment { + clab0 := newClaBuilder("test", nil) + clab0.addLocality("locality-1", 0, 1, []string{"addr1:314"}, &addLocalityOptions{ + Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY}, + }) + clab0.addLocality("locality-2", 0, 0, []string{"addr2:159"}, &addLocalityOptions{ + Health: []v3corepb.HealthStatus{v3corepb.HealthStatus_HEALTHY}, + }) + return clab0.Build() + }(), + want: EndpointsUpdate{}, + }, { name: "good", m: func() *v3endpointpb.ClusterLoadAssignment { @@ -161,7 +175,7 @@ func (s) TestEDSParseRespProto(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseEDSRespProto(tt.m) + got, err := parseEDSRespProto(tt.m, nil) if (err != nil) != tt.wantErr { t.Errorf("parseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr) return