Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xds client split #11484

Merged
merged 10 commits into from
Aug 23, 2024
80 changes: 53 additions & 27 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.client.XdsResourceType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,53 +119,77 @@

private boolean handleRequest(
ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
StatusException error;
try {
responseObserver.onNext(getConfigDumpForRequest(request));
return true;
} catch (StatusException e) {
error = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error =
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
}
responseObserver.onError(error);
return false;
}
StatusException error = null;

private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
throws StatusException, InterruptedException {
if (request.getNodeMatchersCount() > 0) {
throw new StatusException(
error = new StatusException(
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
} else {
List<String> targets = xdsClientPoolFactory.getTargets();
List<ClientConfig> clientConfigs = new ArrayList<>(targets.size());

for (int i = 0; i < targets.size() && error == null; i++) {
try {
ClientConfig clientConfig = getConfigForRequest(targets.get(i));
if (clientConfig != null) {
clientConfigs.add(clientConfig);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

try {
responseObserver.onNext(getStatusResponse(clientConfigs));
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();

Check warning on line 153 in xds/src/main/java/io/grpc/xds/CsdsService.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/CsdsService.java#L150-L153

Added lines #L150 - L153 were not covered by tests
}
}

ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get();
if (error == null) {
return true; // All clients reported without error
}
responseObserver.onError(error);
return false;
}

private ClientConfig getConfigForRequest(String target) throws InterruptedException {
ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get(target);
if (xdsClientPool == null) {
return ClientStatusResponse.getDefaultInstance();
return null;
}

XdsClient xdsClient = null;
try {
xdsClient = xdsClientPool.getObject();
return ClientStatusResponse.newBuilder()
.addConfig(getClientConfigForXdsClient(xdsClient))
.build();
return getClientConfigForXdsClient(xdsClient, target);
} finally {
if (xdsClient != null) {
xdsClientPool.returnObject(xdsClient);
}
}
}

private ClientStatusResponse getStatusResponse(List<ClientConfig> clientConfigs) {
if (clientConfigs.isEmpty()) {
return ClientStatusResponse.getDefaultInstance();
}
return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build();
}

@VisibleForTesting
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target)
throws InterruptedException {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setClientScope(target)
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());

Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate();
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);

Check warning on line 39 in xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java#L39

Added line #L39 was not covered by tests
}
}
37 changes: 28 additions & 9 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand All @@ -32,6 +33,7 @@
import io.grpc.xds.client.XdsInitializationException;
import io.grpc.xds.internal.security.TlsContextManagerImpl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
Expand All @@ -53,7 +55,7 @@
private final Bootstrapper bootstrapper;
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private volatile ObjectPool<XdsClient> xdsClientPool;
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand All @@ -75,16 +77,16 @@

@Override
@Nullable
public ObjectPool<XdsClient> get() {
return xdsClientPool;
public ObjectPool<XdsClient> get(String target) {
return targetToXdsClientMap.get(target);
}

@Override
public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
ObjectPool<XdsClient> ref = xdsClientPool;
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
ref = xdsClientPool;
ref = targetToXdsClientMap.get(target);
if (ref == null) {
BootstrapInfo bootstrapInfo;
Map<String, ?> rawBootstrap = bootstrapOverride.get();
Expand All @@ -96,21 +98,32 @@
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
targetToXdsClientMap.put(target, ref);
}
}
}
return ref;
}

@Override
public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -120,8 +133,9 @@
private int refCount;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
}

@Override
Expand All @@ -136,7 +150,7 @@
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
new ExponentialBackoffPolicy.Provider(),
BACKOFF_POLICY_PROVIDER,
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
Expand Down Expand Up @@ -167,5 +181,10 @@
return xdsClient;
}
}

public String getTarget() {
return target;

Check warning on line 186 in xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java#L186

Added line #L186 was not covered by tests
}
}

}
7 changes: 5 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

interface XdsClientPoolFactory {
void setBootstrapOverride(Map<String, ?> bootstrap);

@Nullable
ObjectPool<XdsClient> get();
ObjectPool<XdsClient> get(String target);

ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException;
ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException;

List<String> getTargets();
}
17 changes: 10 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -104,6 +105,7 @@ final class XdsNameResolver extends NameResolver {
private final XdsLogger logger;
@Nullable
private final String targetAuthority;
private final String target;
private final String serviceAuthority;
// Encoded version of the service authority as per
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
Expand Down Expand Up @@ -133,23 +135,24 @@ final class XdsNameResolver extends NameResolver {
private boolean receivedConfig;

XdsNameResolver(
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
URI targetUri, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

@VisibleForTesting
XdsNameResolver(
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
URI targetUri, @Nullable String targetAuthority, String name,
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
this.targetAuthority = targetAuthority;
target = targetUri.toString();

// The name might have multiple slashes so encode it before verifying.
serviceAuthority = checkNotNull(name, "name");
Expand Down Expand Up @@ -180,7 +183,7 @@ public String getServiceAuthority() {
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate(target);
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
targetUri);
String name = targetPath.substring(1);
return new XdsNameResolver(
targetUri.getAuthority(), name, args.getOverrideAuthority(),
targetUri, name, args.getOverrideAuthority(),
args.getServiceConfigParser(), args.getSynchronizationContext(),
args.getScheduledExecutorService(),
bootstrapOverride);
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void run() {

private void internalStart() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
Expand Down
Loading