Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xds client split #11484

Merged
merged 10 commits into from
Aug 23, 2024
9 changes: 8 additions & 1 deletion census/src/test/java/io/grpc/census/CensusModulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -99,6 +100,7 @@
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,7 +138,7 @@ public class CensusModulesTest {
ClientStreamTracer.StreamInfo.newBuilder()
.setCallOptions(CallOptions.DEFAULT.withOption(NAME_RESOLUTION_DELAYED, 10L)).build();

private static class StringInputStream extends InputStream {
private static class StringInputStream extends InputStream implements KnownLength {
larry-safran marked this conversation as resolved.
Show resolved Hide resolved
final String string;

StringInputStream(String string) {
Expand All @@ -149,6 +151,11 @@ public int read() {
// passed to the InProcess server and consumed by MARSHALLER.parse().
throw new UnsupportedOperationException("Should not be called");
}

@Override
public int available() throws IOException {
return string == null ? 0 : string.length();
}
}

private static final MethodDescriptor.Marshaller<String> MARSHALLER =
Expand Down
80 changes: 53 additions & 27 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.client.XdsResourceType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,53 +119,77 @@ public void onCompleted() {

private boolean handleRequest(
ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
StatusException error;
try {
responseObserver.onNext(getConfigDumpForRequest(request));
return true;
} catch (StatusException e) {
error = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error =
Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
}
responseObserver.onError(error);
return false;
}
StatusException error = null;

private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
throws StatusException, InterruptedException {
if (request.getNodeMatchersCount() > 0) {
throw new StatusException(
error = new StatusException(
Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
} else {
List<String> targets = xdsClientPoolFactory.getTargets();
List<ClientConfig> clientConfigs = new ArrayList<>(targets.size());

for (int i = 0; i < targets.size() && error == null; i++) {
try {
ClientConfig clientConfig = getConfigForRequest(targets.get(i));
if (clientConfig != null) {
clientConfigs.add(clientConfig);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

try {
responseObserver.onNext(getStatusResponse(clientConfigs));
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e);
error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
.asException();
}
}

ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get();
if (error == null) {
return true; // All clients reported without error
}
responseObserver.onError(error);
return false;
}

private ClientConfig getConfigForRequest(String target) throws InterruptedException {
ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get(target);
if (xdsClientPool == null) {
return ClientStatusResponse.getDefaultInstance();
return null;
}

XdsClient xdsClient = null;
try {
xdsClient = xdsClientPool.getObject();
return ClientStatusResponse.newBuilder()
.addConfig(getClientConfigForXdsClient(xdsClient))
.build();
return getClientConfigForXdsClient(xdsClient, target);
} finally {
if (xdsClient != null) {
xdsClientPool.returnObject(xdsClient);
}
}
}

private ClientStatusResponse getStatusResponse(List<ClientConfig> clientConfigs) {
if (clientConfigs.isEmpty()) {
return ClientStatusResponse.getDefaultInstance();
}
return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build();
}

@VisibleForTesting
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target)
throws InterruptedException {
ClientConfig.Builder builder = ClientConfig.newBuilder()
.setClientScope(target)
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
.setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());

Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public static void setDefaultProviderBootstrapOverride(Map<String, ?> bootstrap)

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate();
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
}
}
57 changes: 40 additions & 17 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand All @@ -32,6 +33,7 @@
import io.grpc.xds.client.XdsInitializationException;
import io.grpc.xds.internal.security.TlsContextManagerImpl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
Expand All @@ -53,7 +55,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Bootstrapper bootstrapper;
private final Object lock = new Object();
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
private volatile ObjectPool<XdsClient> xdsClientPool;
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();

SharedXdsClientPoolProvider() {
this(new GrpcBootstrapperImpl());
Expand All @@ -75,16 +77,16 @@ public void setBootstrapOverride(Map<String, ?> bootstrap) {

@Override
@Nullable
public ObjectPool<XdsClient> get() {
return xdsClientPool;
public ObjectPool<XdsClient> get(String target) {
return targetToXdsClientMap.get(target);
}

@Override
public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
ObjectPool<XdsClient> ref = xdsClientPool;
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
ref = xdsClientPool;
ref = targetToXdsClientMap.get(target);
if (ref == null) {
BootstrapInfo bootstrapInfo;
Map<String, ?> rawBootstrap = bootstrapOverride.get();
Expand All @@ -96,21 +98,32 @@ public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = xdsClientPool = new RefCountedXdsClientObjectPool(bootstrapInfo);
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
targetToXdsClientMap.put(target, ref);
}
}
}
return ref;
}

@Override
public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
Expand All @@ -120,8 +133,9 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private int refCount;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
}

@Override
Expand All @@ -132,15 +146,19 @@ public XdsClient getObject() {
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
try {
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
scheduler,
BACKOFF_POLICY_PROVIDER,
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this catching? I don't see any new exceptions in #11254 either.

throw new RuntimeException(e);
}
}
refCount++;
return xdsClient;
Expand All @@ -167,5 +185,10 @@ XdsClient getXdsClientForTest() {
return xdsClient;
}
}

public String getTarget() {
return target;
}
}

}
7 changes: 5 additions & 2 deletions xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

interface XdsClientPoolFactory {
void setBootstrapOverride(Map<String, ?> bootstrap);

@Nullable
ObjectPool<XdsClient> get();
ObjectPool<XdsClient> get(String target);

ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException;
ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException;

List<String> getTargets();
}
17 changes: 10 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -104,6 +105,7 @@ final class XdsNameResolver extends NameResolver {
private final XdsLogger logger;
@Nullable
private final String targetAuthority;
private final String target;
private final String serviceAuthority;
// Encoded version of the service authority as per
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
Expand Down Expand Up @@ -133,23 +135,24 @@ final class XdsNameResolver extends NameResolver {
private boolean receivedConfig;

XdsNameResolver(
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
URI targetUri, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

@VisibleForTesting
XdsNameResolver(
@Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
ServiceConfigParser serviceConfigParser,
URI targetUri, @Nullable String targetAuthority, String name,
@Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
this.targetAuthority = targetAuthority;
target = targetUri.toString();

// The name might have multiple slashes so encode it before verifying.
serviceAuthority = checkNotNull(name, "name");
Expand Down Expand Up @@ -180,7 +183,7 @@ public String getServiceAuthority() {
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate(target);
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
targetUri);
String name = targetPath.substring(1);
return new XdsNameResolver(
targetUri.getAuthority(), name, args.getOverrideAuthority(),
targetUri, name, args.getOverrideAuthority(),
args.getServiceConfigParser(), args.getSynchronizationContext(),
args.getScheduledExecutorService(),
bootstrapOverride);
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void run() {

private void internalStart() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
} catch (Exception e) {
StatusException statusException = Status.UNAVAILABLE.withDescription(
"Failed to initialize xDS").withCause(e).asException();
Expand Down
Loading
Loading