Skip to content

Commit

Permalink
Fix Bug Where Weight Is Set to 0 in Ramping-Up Strategy (#6014)
Browse files Browse the repository at this point in the history
Motivation:
When the original weight is less than 10, the initial weight is incorrectly set to 0, potentially leading to `EndpointSelectionTimeoutException`.

Modifications:
- Ensured a minimum weight of 1 is set when the original weight is greater than 1 in the ramping-up strategy.
- Added debugging logs for selector and selection strategy to facilitate troubleshooting.

Result:
- Fix a bug where weights could unintentionally be set to 0 in ramping-up strategies.
  • Loading branch information
minwoox authored Dec 3, 2024
1 parent 5d346b2 commit 62da203
Show file tree
Hide file tree
Showing 11 changed files with 206 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.linecorp.armeria.internal.client.endpoint.EndpointToStringUtil.toShortString;
import static com.linecorp.armeria.internal.common.util.CollectionUtil.truncate;
import static java.util.Objects.requireNonNull;

Expand All @@ -32,6 +33,9 @@
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand All @@ -51,6 +55,8 @@
*/
public class DynamicEndpointGroup extends AbstractEndpointGroup implements ListenableAsyncCloseable {

private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointGroup.class);

/**
* Returns a newly created builder.
*/
Expand Down Expand Up @@ -223,6 +229,8 @@ protected final void addEndpoint(Endpoint e) {
final List<Endpoint> newEndpointsUnsorted = Lists.newArrayList(endpoints);
newEndpointsUnsorted.add(e);
endpoints = newEndpoints = ImmutableList.sortedCopyOf(newEndpointsUnsorted);
logger.info("An endpoint has been added: {}. Current endpoints: {}",
toShortString(e), toShortString(newEndpoints));
} finally {
endpointsLock.unlock();
}
Expand All @@ -238,12 +246,17 @@ protected final void removeEndpoint(Endpoint e) {
final List<Endpoint> newEndpoints;
endpointsLock.lock();
try {
if (!allowEmptyEndpoints && endpoints.size() == 1) {
final List<Endpoint> oldEndpoints = endpoints;
if (!allowEmptyEndpoints && oldEndpoints.size() == 1) {
return;
}
endpoints = newEndpoints = endpoints.stream()
.filter(endpoint -> !endpoint.equals(e))
.collect(toImmutableList());
endpoints = newEndpoints = oldEndpoints.stream()
.filter(endpoint -> !endpoint.equals(e))
.collect(toImmutableList());
if (endpoints.size() != oldEndpoints.size()) {
logger.info("An endpoint has been removed: {}. Current endpoints: {}",
toShortString(e), toShortString(newEndpoints));
}
} finally {
endpointsLock.unlock();
}
Expand All @@ -266,6 +279,7 @@ protected final void setEndpoints(Iterable<Endpoint> endpoints) {
return;
}
this.endpoints = newEndpoints;
logger.info("New endpoints have been set: {}", toShortString(newEndpoints));
} finally {
endpointsLock.unlock();
}
Expand Down Expand Up @@ -376,7 +390,7 @@ public String toString() {
protected final String toString(Consumer<? super StringBuilder> builderMutator) {
final StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
buf.append("{selectionStrategy=").append(selectionStrategy.getClass());
buf.append("{selector=").append(toStringSelector());
buf.append(", allowsEmptyEndpoints=").append(allowEmptyEndpoints);
buf.append(", initialized=").append(initialEndpointsFuture.isDone());
buf.append(", numEndpoints=").append(endpoints.size());
Expand All @@ -385,6 +399,21 @@ protected final String toString(Consumer<? super StringBuilder> builderMutator)
return buf.append('}').toString();
}

/**
* Returns the string representation of the {@link EndpointSelector} of this {@link DynamicEndpointGroup}.
* If the {@link EndpointSelector} is not created yet, it returns the class name of the
* {@link EndpointSelectionStrategy}.
*/
protected String toStringSelector() {
final EndpointSelector endpointSelector = selector.get();
if (endpointSelector == null) {
// Return selection strategy if selector is not created yet.
return selectionStrategy.getClass().toString();
}

return endpointSelector.toString();
}

private class InitialEndpointsFuture extends EventLoopCheckingFuture<List<Endpoint>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
Expand Down Expand Up @@ -57,5 +59,12 @@ public Endpoint selectNow(ClientRequestContext ctx) {
final int currentSequence = sequence.getAndIncrement();
return endpoints.get(Math.abs(currentSequence % endpoints.size()));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpoints", group().endpoints())
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.function.ToLongFunction;

import com.google.common.base.MoreObjects;
import com.google.common.hash.Hashing;

import com.linecorp.armeria.client.ClientRequestContext;
Expand Down Expand Up @@ -84,7 +85,6 @@ private static final class StickyEndpointSelector extends AbstractEndpointSelect
@Nullable
@Override
public Endpoint selectNow(ClientRequestContext ctx) {

final List<Endpoint> endpoints = group().endpoints();
if (endpoints.isEmpty()) {
return null;
Expand All @@ -94,5 +94,12 @@ public Endpoint selectNow(ClientRequestContext ctx) {
final int nearest = Hashing.consistentHash(key, endpoints.size());
return endpoints.get(nearest);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpoints", group().endpoints())
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import static com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder.defaultTransition;
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.createdAtNanos;
import static com.linecorp.armeria.internal.client.endpoint.EndpointAttributeKeys.hasCreatedAtNanos;
import static com.linecorp.armeria.internal.client.endpoint.EndpointToStringUtil.toShortString;
import static java.util.Objects.requireNonNull;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -37,6 +36,9 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -76,6 +78,8 @@
*/
final class WeightRampingUpStrategy implements EndpointSelectionStrategy {

private static final Logger logger = LoggerFactory.getLogger(WeightRampingUpStrategy.class);

private static final Ticker defaultTicker = Ticker.systemTicker();
private static final WeightedRandomDistributionEndpointSelector EMPTY_SELECTOR =
new WeightedRandomDistributionEndpointSelector(ImmutableList.of());
Expand Down Expand Up @@ -130,8 +134,6 @@ final class RampingUpEndpointWeightSelector extends AbstractEndpointSelector {

private final List<Endpoint> endpointsFinishedRampingUp = new ArrayList<>();

@VisibleForTesting
final Deque<EndpointsRampingUpEntry> endpointsRampingUp = new ArrayDeque<>();
@VisibleForTesting
final Map<Long, EndpointsRampingUpEntry> rampingUpWindowsMap = new HashMap<>();
private Object2LongOpenHashMap<Endpoint> endpointCreatedTimestamps = new Object2LongOpenHashMap<>();
Expand Down Expand Up @@ -233,7 +235,25 @@ private void buildEndpointSelector() {
endpointAndStep.endpoint().withWeight(endpointAndStep.currentWeight()));
}
}
endpointSelector = new WeightedRandomDistributionEndpointSelector(targetEndpointsBuilder.build());
final List<Endpoint> endpoints = targetEndpointsBuilder.build();
if (rampingUpWindowsMap.isEmpty()) {
logger.info("Finished ramping up. endpoints: {}", toShortString(endpoints));
} else {
logger.debug("Ramping up. endpoints: {}", toShortString(endpoints));
}

boolean found = false;
for (Endpoint endpoint : endpoints) {
if (endpoint.weight() > 0) {
found = true;
break;
}
}
if (!found) {
logger.warn("No valid endpoint with weight > 0. endpoints: {}", toShortString(endpoints));
}

endpointSelector = new WeightedRandomDistributionEndpointSelector(endpoints);
}

@VisibleForTesting
Expand Down Expand Up @@ -288,6 +308,15 @@ private void close() {
lock.unlock();
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpointSelector", endpointSelector)
.add("endpointsFinishedRampingUp", endpointsFinishedRampingUp)
.add("rampingUpWindowsMap", rampingUpWindowsMap)
.toString();
}
}

private static int numStep(long rampingUpIntervalNanos, Ticker ticker, long createTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,17 @@ public final class WeightRampingUpStrategyBuilder {
static final int DEFAULT_TOTAL_STEPS = 10;
static final int DEFAULT_RAMPING_UP_TASK_WINDOW_MILLIS = 500;
static final EndpointWeightTransition DEFAULT_LINEAR_TRANSITION =
(endpoint, currentStep, totalSteps) ->
// currentStep is never greater than totalSteps so we can cast long to int.
Ints.saturatedCast((long) endpoint.weight() * currentStep / totalSteps);
(endpoint, currentStep, totalSteps) -> {
// currentStep is never greater than totalSteps so we can cast long to int.
final int currentWeight =
Ints.saturatedCast((long) endpoint.weight() * currentStep / totalSteps);
if (endpoint.weight() > 0 && currentWeight == 0) {
// If the original weight is not 0,
// we should return 1 to make sure the endpoint is selected.
return 1;
}
return currentWeight;
};
static final EndpointWeightTransition defaultTransition = EndpointWeightTransition.linear();

private EndpointWeightTransition transition = defaultTransition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
package com.linecorp.armeria.client.endpoint;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.linecorp.armeria.internal.client.endpoint.EndpointToStringUtil.toShortString;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;

Expand All @@ -31,6 +36,8 @@

final class WeightedRoundRobinStrategy implements EndpointSelectionStrategy {

private static final Logger logger = LoggerFactory.getLogger(WeightedRoundRobinStrategy.class);

static final WeightedRoundRobinStrategy INSTANCE = new WeightedRoundRobinStrategy();

private WeightedRoundRobinStrategy() {}
Expand Down Expand Up @@ -63,6 +70,17 @@ private static final class WeightedRoundRobinSelector extends AbstractEndpointSe

@Override
protected void updateNewEndpoints(List<Endpoint> endpoints) {
boolean found = false;
for (Endpoint endpoint : endpoints) {
if (endpoint.weight() > 0) {
found = true;
break;
}
}
if (!found) {
logger.warn("No valid endpoint with weight > 0. endpoints: {}", toShortString(endpoints));
}

final EndpointsAndWeights endpointsAndWeights = this.endpointsAndWeights;
if (endpointsAndWeights == null || endpointsAndWeights.endpoints != endpoints) {
this.endpointsAndWeights = new EndpointsAndWeights(endpoints);
Expand Down Expand Up @@ -94,6 +112,13 @@ private static final class EndpointsGroupByWeight {
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpointsAndWeights", endpointsAndWeights)
.toString();
}

//
// In general, assume the weights are w0 < w1 < ... < wM where M = N - 1, N is number of endpoints.
//
Expand Down Expand Up @@ -228,6 +253,16 @@ Endpoint selectEndpoint(int currentSequence) {

return endpoints.get(Math.abs(currentSequence % endpoints.size()));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpoints", endpoints)
.add("weighted", weighted)
.add("totalWeight", totalWeight)
.add("accumulatedGroups", accumulatedGroups)
.toString();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public String toString() {
.add("numEndpoints", endpoints.size())
.add("candidates", truncate(delegateEndpoints, 10))
.add("numCandidates", delegateEndpoints.size())
.add("selectionStrategy", selectionStrategy().getClass())
.add("selector", toStringSelector())
.add("initialized", whenReady().isDone())
.add("initialSelectionTimeoutMillis", initialSelectionTimeoutMillis)
.add("selectionTimeoutMillis", selectionTimeoutMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public interface MeterIdPrefixFunction {
* <li>Client-side tags:<ul>
* <li>{@code method} - RPC method name or {@link HttpMethod#name()} if RPC method name is not
* available</li>
* <li>{@code service} - RPC service name or innermost service class name</li>
* <li>{@code httpStatus} - {@link HttpStatus#code()}</li>
* </ul></li>
* </ul>
Expand Down
Loading

0 comments on commit 62da203

Please sign in to comment.