diff --git a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java index cdbb0e3be08..16a990ed739 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java @@ -82,6 +82,7 @@ final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer { private final AtomicInteger sequence; private final long infTime; private final Ticker ticker; + private String locality = ""; // The metric instruments are only registered once and shared by all instances of this LB. static { @@ -147,6 +148,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { handleNameResolutionError(unavailableStatus); return unavailableStatus; } + String locality = resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME); + if (locality != null) { + this.locality = locality; + } else { + this.locality = ""; + } config = (WeightedRoundRobinLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); AcceptResolvedAddrRetVal acceptRetVal; @@ -179,7 +186,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { @Override public SubchannelPicker createReadyPicker(Collection activeList) { return new WeightedRoundRobinPicker(ImmutableList.copyOf(activeList), - config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper()); + config.enableOobLoadReport, config.errorUtilizationPenalty, sequence, getHelper(), + locality); } @VisibleForTesting @@ -373,10 +381,12 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker { private final AtomicInteger sequence; private final int hashCode; private final LoadBalancer.Helper helper; + private final String locality; private volatile StaticStrideScheduler scheduler; WeightedRoundRobinPicker(List children, boolean enableOobLoadReport, - float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper) { + float errorUtilizationPenalty, AtomicInteger sequence, LoadBalancer.Helper helper, + String locality) { checkNotNull(children, "children"); Preconditions.checkArgument(!children.isEmpty(), "empty child list"); this.children = children; @@ -391,6 +401,7 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker { this.errorUtilizationPenalty = errorUtilizationPenalty; this.sequence = checkNotNull(sequence, "sequence"); this.helper = helper; + this.locality = checkNotNull(locality, "locality"); // For equality we treat children as a set; use hash code as defined by Set int sum = 0; @@ -434,7 +445,7 @@ private void updateWeight() { helper.getMetricRecorder() .recordDoubleHistogram(ENDPOINT_WEIGHTS_HISTOGRAM, newWeight, ImmutableList.of(helper.getChannelTarget()), - ImmutableList.of("")); + ImmutableList.of(locality)); newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f; } if (staleEndpoints.get() > 0) { @@ -442,13 +453,13 @@ private void updateWeight() { helper.getMetricRecorder() .addLongCounter(ENDPOINT_WEIGHT_STALE_COUNTER, staleEndpoints.get(), ImmutableList.of(helper.getChannelTarget()), - ImmutableList.of("")); + ImmutableList.of(locality)); } if (notYetUsableEndpoints.get() > 0) { // TODO: add locality label once available helper.getMetricRecorder() .addLongCounter(ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER, notYetUsableEndpoints.get(), - ImmutableList.of(helper.getChannelTarget()), ImmutableList.of("")); + ImmutableList.of(helper.getChannelTarget()), ImmutableList.of(locality)); } this.scheduler = new StaticStrideScheduler(newWeights, sequence); @@ -456,7 +467,7 @@ private void updateWeight() { // TODO: locality label once available helper.getMetricRecorder() .addLongCounter(RR_FALLBACK_COUNTER, 1, ImmutableList.of(helper.getChannelTarget()), - ImmutableList.of("")); + ImmutableList.of(locality)); } } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index f71400ba46d..7f06d08998b 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -145,6 +145,7 @@ public void uncaughtException(Thread t, Throwable e) { }); private String channelTarget = "channel-target"; + private String locality = "locality"; public WeightedRoundRobinLoadBalancerTest() { testHelperInstance = new TestHelper(); @@ -1135,9 +1136,11 @@ public void removingAddressShutsdownSubchannel() { @Test public void metrics() { // Give WRR some valid addresses to work with. + Attributes attributesWithLocality = Attributes.newBuilder() + .set(WeightedTargetLoadBalancer.CHILD_NAME, locality).build(); syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder() .setAddresses(servers).setLoadBalancingPolicyConfig(weightedConfig) - .setAttributes(affinity).build())); + .setAttributes(attributesWithLocality).build())); // Flip the three subchannels to READY state to initiate the WRR logic Iterator it = subchannels.values().iterator(); @@ -1240,7 +1243,7 @@ private void verifyLongCounterRecord(String name, int times, long value) { public boolean matches(LongCounterMetricInstrument longCounterInstrument) { return longCounterInstrument.getName().equals(name); } - }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(""))); + }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality))); } // Verifies that the MetricRecorder has been called to record a given double histogram value the @@ -1252,7 +1255,7 @@ private void verifyDoubleHistogramRecord(String name, int times, double value) { public boolean matches(DoubleHistogramMetricInstrument doubleHistogramInstrument) { return doubleHistogramInstrument.getName().equals(name); } - }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(""))); + }), eq(value), eq(Lists.newArrayList(channelTarget)), eq(Lists.newArrayList(locality))); } private int getNumFilteredPendingTasks() {