Skip to content

Commit

Permalink
Xds client split (grpc#11484)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran authored Aug 23, 2024
1 parent ee3ffef commit d034a56
Show file tree
Hide file tree
Showing 15 changed files with 284 additions and 93 deletions.
80 changes: 53 additions & 27 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.client.XdsResourceType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,53 +119,77 @@ public void onCompleted() {

private boolean handleRequest(
ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
StatusException error;
try {
responseObserver.onNext(getConfigDumpForRequest(request));
return true;
} catch (StatusException e) {
error = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error =
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
}
responseObserver.onError(error);
return false;
}
StatusException error = null;

private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
throws StatusException, InterruptedException {
if (request.getNodeMatchersCount() > 0) {
throw new StatusException(
error = new StatusException(
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
} else {
List<String> targets = xdsClientPoolFactory.getTargets();
List<ClientConfig> clientConfigs = new ArrayList<>(targets.size());

for (int i = 0; i < targets.size() && error == null; i++) {
try {
ClientConfig clientConfig = getConfigForRequest(targets.get(i));
if (clientConfig != null) {
clientConfigs.add(clientConfig);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

try {
responseObserver.onNext(getStatusResponse(clientConfigs));
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get();
if (error == null) {
return true; // All clients reported without error
}
responseObserver.onError(error);
return false;
}

private ClientConfig getConfigForRequest(String target) throws InterruptedException {
ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get(target);
if (xdsClientPool == null) {
return ClientStatusResponse.getDefaultInstance();
return null;
}

XdsClient xdsClient = null;
try {
xdsClient = xdsClientPool.getObject();
return ClientStatusResponse.newBuilder()
.addConfig(getClientConfigForXdsClient(xdsClient))
.build();
return getClientConfigForXdsClient(xdsClient, target);
} finally {
if (xdsClient != null) {
xdsClientPool.returnObject(xdsClient);
}
}
}

private ClientStatusResponse getStatusResponse(List<ClientConfig> clientConfigs) {
if (clientConfigs.isEmpty()) {
return ClientStatusResponse.getDefaultInstance();
}
return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build();
}

@VisibleForTesting
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target)
throws InterruptedException {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setClientScope(target)
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());

Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public static void setDefaultProviderBootstrapOverride(Map<String, ?> bootstrap)

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate();
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
}
}
37 changes: 28 additions & 9 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand All @@ -32,6 +33,7 @@
import io.grpc.xds.client.XdsInitializationException;
import io.grpc.xds.internal.security.TlsContextManagerImpl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
Expand All @@ -53,7 +55,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Bootstrapper bootstrapper;
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private volatile ObjectPool<XdsClient> xdsClientPool;
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand All @@ -75,16 +77,16 @@ public void setBootstrapOverride(Map<String, ?> bootstrap) {

@Override
@Nullable
public ObjectPool<XdsClient> get() {
return xdsClientPool;
public ObjectPool<XdsClient> get(String target) {
return targetToXdsClientMap.get(target);
}

@Override
public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
ObjectPool<XdsClient> ref = xdsClientPool;
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
ref = xdsClientPool;
ref = targetToXdsClientMap.get(target);
if (ref == null) {
BootstrapInfo bootstrapInfo;
Map<String, ?> rawBootstrap = bootstrapOverride.get();
Expand All @@ -96,21 +98,32 @@ public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
targetToXdsClientMap.put(target, ref);
}
}
}
return ref;
}

@Override
public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -120,8 +133,9 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private int refCount;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
}

@Override
Expand All @@ -136,7 +150,7 @@ public XdsClient getObject() {
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
new ExponentialBackoffPolicy.Provider(),
BACKOFF_POLICY_PROVIDER,
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
Expand Down Expand Up @@ -167,5 +181,10 @@ XdsClient getXdsClientForTest() {
return xdsClient;
}
}

public String getTarget() {
return target;
}
}

}
7 changes: 5 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

interface XdsClientPoolFactory {
void setBootstrapOverride(Map<String, ?> bootstrap);

@Nullable
ObjectPool<XdsClient> get();
ObjectPool<XdsClient> get(String target);

ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException;
ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException;

List<String> getTargets();
}
17 changes: 10 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -104,6 +105,7 @@ final class XdsNameResolver extends NameResolver {
private final XdsLogger logger;
@Nullable
private final String targetAuthority;
private final String target;
private final String serviceAuthority;
// Encoded version of the service authority as per
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
Expand Down Expand Up @@ -133,23 +135,24 @@ final class XdsNameResolver extends NameResolver {
private boolean receivedConfig;

XdsNameResolver(
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
URI targetUri, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

@VisibleForTesting
XdsNameResolver(
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
URI targetUri, @Nullable String targetAuthority, String name,
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
this.targetAuthority = targetAuthority;
target = targetUri.toString();

// The name might have multiple slashes so encode it before verifying.
serviceAuthority = checkNotNull(name, "name");
Expand Down Expand Up @@ -180,7 +183,7 @@ public String getServiceAuthority() {
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate(target);
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
targetUri);
String name = targetPath.substring(1);
return new XdsNameResolver(
targetUri.getAuthority(), name, args.getOverrideAuthority(),
targetUri, name, args.getOverrideAuthority(),
args.getServiceConfigParser(), args.getSynchronizationContext(),
args.getScheduledExecutorService(),
bootstrapOverride);
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void run() {

private void internalStart() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
Expand Down
Loading

0 comments on commit d034a56

Please sign in to comment.