diff --git a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java index 4c4e19642c..f7b2fd0b35 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/CentralDogma.java @@ -810,11 +810,11 @@ private void configureHttpApi(ServerBuilder sb, apiV1ServiceBuilder .annotatedService(new AdministrativeService(executor, statusManager)) .annotatedService(new ProjectServiceV1(projectApiManager, executor)) - .annotatedService(new RepositoryServiceV1(executor, mds)); + .annotatedService(new RepositoryServiceV1(executor, mds)) + .annotatedService(new CredentialServiceV1(projectApiManager, executor)); if (GIT_MIRROR_ENABLED) { - apiV1ServiceBuilder.annotatedService(new MirroringServiceV1(projectApiManager, executor)) - .annotatedService(new CredentialServiceV1(projectApiManager, executor)); + apiV1ServiceBuilder.annotatedService(new MirroringServiceV1(projectApiManager, executor)); } apiV1ServiceBuilder.annotatedService() diff --git a/server/src/main/java/com/linecorp/centraldogma/server/internal/credential/AbstractCredential.java b/server/src/main/java/com/linecorp/centraldogma/server/internal/credential/AbstractCredential.java index cdbd946e91..0e928c2ddd 100644 --- a/server/src/main/java/com/linecorp/centraldogma/server/internal/credential/AbstractCredential.java +++ b/server/src/main/java/com/linecorp/centraldogma/server/internal/credential/AbstractCredential.java @@ -81,6 +81,7 @@ public int hashCode() { public final String toString() { final ToStringHelper helper = MoreObjects.toStringHelper(this); helper.add("id", id); + helper.add("type", type); helper.add("enabled", enabled); addProperties(helper); return helper.toString(); diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java index 1435dcb6f4..5fe2f98ca6 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlaneService.java @@ -96,9 +96,9 @@ public final class ControlPlaneService extends XdsResourceWatchingService { void start(PluginInitContext pluginInitContext) { init(); + cache.setSnapshot(DEFAULT_GROUP, centralDogmaXdsResources.snapshot()); final CommandExecutor commandExecutor = pluginInitContext.commandExecutor(); - final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(), - cache); + final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(), cache); final GrpcService grpcService = GrpcService.builder() .addService(server.getClusterDiscoveryServiceImpl()) .addService(server.getEndpointDiscoveryServiceImpl()) diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java index 64bf98f64d..4310359df5 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsResourceManager.java @@ -128,6 +128,10 @@ public XdsResourceManager(Project xdsProject, CommandExecutor commandExecutor) { this.commandExecutor = requireNonNull(commandExecutor, "commandExecutor"); } + public Project xdsProject() { + return xdsProject; + } + public void checkGroup(String group) { checkGroupId(group); // TODO(minwoox): check the write permission. diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java index 9aedb7a42f..007287b170 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesEndpointFetchingService.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -125,7 +126,8 @@ protected void handleXdsResource(String path, String contentAsText, String group final ServiceEndpointWatcher endpointWatcher = watcherBuilder.build(); final String watcherName = endpointWatcher.getName(); logger.info("Creating a service endpoint watcher: {}", watcherName); - final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(endpointWatcher); + final CompletableFuture future = + createKubernetesEndpointGroup(endpointWatcher, xdsProject().metaRepo(), executorService); final Map updaters = kubernetesEndpointsUpdaters.computeIfAbsent(groupName, unused -> new HashMap<>()); @@ -134,15 +136,26 @@ protected void handleXdsResource(String path, String contentAsText, String group oldUpdater.close(); } final KubernetesEndpointsUpdater updater = - new KubernetesEndpointsUpdater(commandExecutor, kubernetesEndpointGroup, executorService, + new KubernetesEndpointsUpdater(commandExecutor, future, executorService, groupName, watcherName, endpointWatcher.getClusterName()); updaters.put(watcherName, updater); - kubernetesEndpointGroup.addListener(endpoints -> { - if (endpoints.isEmpty()) { - return; + future.handle((kubernetesEndpointGroup, cause) -> { + if (cause != null) { + logger.warn("Unexpected exception while creating a KubernetesEndpointGroup in fetching service", + cause); + // Do not remove the updater from updaters because it can remove the updater that is created + // by the next commit. The updater will be removed only when the file or group is removed. + updater.close(); + return null; } - executorService.execute(updater::maybeSchedule); - }, true); + kubernetesEndpointGroup.addListener(endpoints -> { + if (endpoints.isEmpty()) { + return; + } + executorService.execute(() -> updater.maybeSchedule(kubernetesEndpointGroup)); + }, true); + return null; + }); } @Override @@ -200,7 +213,7 @@ protected boolean isStopped() { private static class KubernetesEndpointsUpdater { private final CommandExecutor commandExecutor; - private final KubernetesEndpointGroup kubernetesEndpointGroup; + private final CompletableFuture kubernetesEndpointGroupFuture; private final ScheduledExecutorService executorService; private final String groupName; private final String watcherName; @@ -209,18 +222,18 @@ private static class KubernetesEndpointsUpdater { private ScheduledFuture scheduledFuture; KubernetesEndpointsUpdater(CommandExecutor commandExecutor, - KubernetesEndpointGroup kubernetesEndpointGroup, + CompletableFuture kubernetesEndpointGroupFuture, ScheduledExecutorService executorService, String groupName, String watcherName, String clusterName) { this.commandExecutor = commandExecutor; - this.kubernetesEndpointGroup = kubernetesEndpointGroup; + this.kubernetesEndpointGroupFuture = kubernetesEndpointGroupFuture; this.executorService = executorService; this.groupName = groupName; this.watcherName = watcherName; this.clusterName = clusterName; } - void maybeSchedule() { + void maybeSchedule(KubernetesEndpointGroup kubernetesEndpointGroup) { if (scheduledFuture != null) { return; } @@ -231,11 +244,11 @@ void maybeSchedule() { if (kubernetesEndpointGroup.isClosing()) { return; } - pushK8sEndpoints(); + pushK8sEndpoints(kubernetesEndpointGroup); }, 1, TimeUnit.SECONDS); } - private void pushK8sEndpoints() { + private void pushK8sEndpoints(KubernetesEndpointGroup kubernetesEndpointGroup) { final List endpoints = kubernetesEndpointGroup.endpoints(); if (endpoints.isEmpty()) { @@ -298,7 +311,7 @@ void close() { if (scheduledFuture != null) { scheduledFuture.cancel(true); } - kubernetesEndpointGroup.closeAsync(); + kubernetesEndpointGroupFuture.thenAccept(KubernetesEndpointGroup::closeAsync); } } } diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java index 07ed87e980..be9b63f5a6 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesService.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; @@ -36,9 +38,14 @@ import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroup; import com.linecorp.armeria.client.kubernetes.endpoints.KubernetesEndpointGroupBuilder; +import com.linecorp.armeria.common.ContextAwareBlockingTaskExecutor; +import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.Blocking; import com.linecorp.centraldogma.common.Author; +import com.linecorp.centraldogma.common.EntryNotFoundException; +import com.linecorp.centraldogma.server.internal.credential.AccessTokenCredential; +import com.linecorp.centraldogma.server.storage.repository.MetaRepository; import com.linecorp.centraldogma.xds.internal.XdsResourceManager; import com.linecorp.centraldogma.xds.k8s.v1.XdsKubernetesServiceGrpc.XdsKubernetesServiceImplBase; @@ -46,7 +53,6 @@ import io.fabric8.kubernetes.client.ConfigBuilder; import io.grpc.Status; import io.grpc.stub.StreamObserver; -import io.netty.util.concurrent.ScheduledFuture; /** * A gRPC service that handles Kubernetes resources. @@ -96,43 +102,57 @@ public void createServiceEndpointWatcher(CreateServiceEndpointWatcherRequest req "Create watcher: " + watcherName, watcher, author, true)); } - private static void validateWatcherAndPush( + private void validateWatcherAndPush( StreamObserver responseObserver, ServiceEndpointWatcher watcher, Runnable onSuccess) { // Create a KubernetesEndpointGroup to check if the watcher is valid. // We use KubernetesEndpointGroup for simplicity, but we will implement a custom implementation // for better debugging and error handling in the future. - final KubernetesEndpointGroup kubernetesEndpointGroup = createKubernetesEndpointGroup(watcher); - - final AtomicBoolean completed = new AtomicBoolean(); - final CompletableFuture> whenReady = kubernetesEndpointGroup.whenReady(); - final ServiceRequestContext ctx = ServiceRequestContext.current(); - - // Use a schedule to time out the watcher creation until we implement a custom implementation. - final ScheduledFuture scheduledFuture = ctx.eventLoop().schedule(() -> { - if (!completed.compareAndSet(false, true)) { - return; - } - kubernetesEndpointGroup.closeAsync(); - responseObserver.onError( - Status.INTERNAL.withDescription( - "Failed to retrieve k8s endpoints within 5 seconds. watcherName: " + - watcher.getName()).asRuntimeException()); - }, 5, TimeUnit.SECONDS); - - whenReady.handle((endpoints, cause) -> { - if (!completed.compareAndSet(false, true)) { - return null; - } - scheduledFuture.cancel(false); - kubernetesEndpointGroup.closeAsync(); + final ContextAwareBlockingTaskExecutor taskExecutor = + ServiceRequestContext.current().blockingTaskExecutor(); + final CompletableFuture future = + createKubernetesEndpointGroup(watcher, xdsResourceManager.xdsProject().metaRepo(), + taskExecutor); + future.handle((kubernetesEndpointGroup, cause) -> { if (cause != null) { - // Specific types. - responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException()); + cause = Exceptions.peel(cause); + if (cause instanceof IllegalArgumentException || cause instanceof EntryNotFoundException) { + responseObserver.onError(Status.INVALID_ARGUMENT.withCause(cause).asRuntimeException()); + } else { + responseObserver.onError(Status.INTERNAL.withCause(cause).asRuntimeException()); + } return null; } - logger.debug("Successfully retrieved k8s endpoints: {}", endpoints); - onSuccess.run(); + final AtomicBoolean completed = new AtomicBoolean(); + final CompletableFuture> whenReady = kubernetesEndpointGroup.whenReady(); + + // Use a schedule to time out the watcher creation until we implement a custom implementation. + final ScheduledFuture scheduledFuture = taskExecutor.schedule(() -> { + if (!completed.compareAndSet(false, true)) { + return; + } + kubernetesEndpointGroup.closeAsync(); + responseObserver.onError( + Status.INTERNAL.withDescription( + "Failed to retrieve k8s endpoints within 5 seconds. watcherName: " + + watcher.getName()).asRuntimeException()); + }, 5, TimeUnit.SECONDS); + + whenReady.handle((endpoints, cause1) -> { + if (!completed.compareAndSet(false, true)) { + return null; + } + scheduledFuture.cancel(false); + kubernetesEndpointGroup.closeAsync(); + if (cause1 != null) { + // Specific types. + responseObserver.onError(Status.INTERNAL.withCause(cause1).asRuntimeException()); + return null; + } + logger.debug("Successfully retrieved k8s endpoints: {}", endpoints); + onSuccess.run(); + return null; + }); return null; }); } @@ -142,32 +162,47 @@ private static void validateWatcherAndPush( * This method must be executed in a blocking thread because * {@link KubernetesEndpointGroupBuilder#build()} blocks the execution thread. */ - public static KubernetesEndpointGroup createKubernetesEndpointGroup(ServiceEndpointWatcher watcher) { + public static CompletableFuture createKubernetesEndpointGroup( + ServiceEndpointWatcher watcher, MetaRepository metaRepository, Executor executor) { final KubernetesConfig kubernetesConfig = watcher.getKubernetesConfig(); final String serviceName = watcher.getServiceName(); - final KubernetesEndpointGroupBuilder kubernetesEndpointGroupBuilder = - KubernetesEndpointGroup.builder(toConfig(kubernetesConfig)).serviceName(serviceName); - if (!isNullOrEmpty(kubernetesConfig.getNamespace())) { - kubernetesEndpointGroupBuilder.namespace(kubernetesConfig.getNamespace()); - } - if (!isNullOrEmpty(watcher.getPortName())) { - kubernetesEndpointGroupBuilder.portName(watcher.getPortName()); - } - - return kubernetesEndpointGroupBuilder.build(); + return toConfig(kubernetesConfig, metaRepository).thenApplyAsync(config -> { + final KubernetesEndpointGroupBuilder kubernetesEndpointGroupBuilder = + KubernetesEndpointGroup.builder(config).serviceName(serviceName); + if (!isNullOrEmpty(kubernetesConfig.getNamespace())) { + kubernetesEndpointGroupBuilder.namespace(kubernetesConfig.getNamespace()); + } + if (!isNullOrEmpty(watcher.getPortName())) { + kubernetesEndpointGroupBuilder.portName(watcher.getPortName()); + } + // This callback can be executed by an event loop from CachingRepository, so we should use the + // specified executor to avoid blocking the event loop below. + return kubernetesEndpointGroupBuilder.build(); + }, executor); } - private static Config toConfig(KubernetesConfig kubernetesConfig) { + private static CompletableFuture toConfig(KubernetesConfig kubernetesConfig, + MetaRepository metaRepository) { final ConfigBuilder configBuilder = new ConfigBuilder() .withMasterUrl(kubernetesConfig.getControlPlaneUrl()) .withTrustCerts(kubernetesConfig.getTrustCerts()); - if (!isNullOrEmpty(kubernetesConfig.getOauthToken())) { - configBuilder.withOauthToken(kubernetesConfig.getOauthToken()); + final String credentialId = kubernetesConfig.getCredentialId(); + if (isNullOrEmpty(credentialId)) { + return CompletableFuture.completedFuture(configBuilder.build()); } - return configBuilder.build(); + return metaRepository.credential(credentialId) + .thenApply(credential -> { + if (!(credential instanceof AccessTokenCredential)) { + throw new IllegalArgumentException( + "credential must be an access token: " + credential); + } + + return configBuilder.withOauthToken( + ((AccessTokenCredential) credential).accessToken()).build(); + }); } @Blocking diff --git a/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto b/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto index 10dbba95f9..17873fe0ca 100644 --- a/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto +++ b/xds/src/main/proto/centraldogma/xds/k8s/v1/xds_kubernetes.proto @@ -73,7 +73,7 @@ message DeleteServiceEndpointWatcherRequest { message KubernetesConfig { string control_plane_url = 1 [(google.api.field_behavior) = REQUIRED]; string namespace = 2 [(google.api.field_behavior) = OPTIONAL]; - string oauth_token = 3 [(google.api.field_behavior) = OPTIONAL]; + string credential_id = 3 [(google.api.field_behavior) = OPTIONAL]; bool trust_certs = 4 [(google.api.field_behavior) = OPTIONAL]; } diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java index e38528a7c4..07e35ad0a7 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java @@ -80,9 +80,18 @@ public void onCompleted() {} requestStreamObserver.onNext(DiscoveryRequest.newBuilder() .setTypeUrl(Resources.V3.CLUSTER_TYPE_URL) .build()); + + int expectedNonce = 0; DiscoveryResponse discoveryResponse = queue.take(); + while (discoveryResponse.getResourcesList().isEmpty()) { + // The commited cluster is not yet available. Send ack and receive the next discovery response. + sendAck(requestStreamObserver, discoveryResponse); + discoveryResponse = queue.take(); + expectedNonce++; + } final String versionInfo1 = discoveryResponse.getVersionInfo(); - assertDiscoveryResponse(versionInfo1, discoveryResponse, fooCluster, queue, "0"); + assertDiscoveryResponse(versionInfo1, discoveryResponse, fooCluster, + queue, Integer.toString(expectedNonce)); // Send ack sendAck(requestStreamObserver, discoveryResponse); // No discovery response because there's no change. @@ -91,15 +100,16 @@ public void onCompleted() {} // Change the configuration. fooCluster = cluster(fooClusterName, 2); updateCluster(fooGroupName, fooClusterId, fooCluster, webClient); + expectedNonce++; discoveryResponse = queue.take(); final String versionInfo2 = discoveryResponse.getVersionInfo(); assertThat(versionInfo2).isNotEqualTo(versionInfo1); - assertDiscoveryResponse(versionInfo2, discoveryResponse, fooCluster, queue, "1"); + assertDiscoveryResponse(versionInfo2, discoveryResponse, fooCluster, + queue, Integer.toString(expectedNonce)); // Send ack sendAck(requestStreamObserver, discoveryResponse); // No discovery response because there's no change. assertThat(queue.poll(300, TimeUnit.MILLISECONDS)).isNull(); - // Add another cluster final String barGroupName = "groups/bar"; createGroup("bar", webClient); @@ -108,6 +118,7 @@ public void onCompleted() {} final Cluster barCluster = cluster(barClusterName, 1); createCluster(barGroupName, barClusterId, barCluster, webClient); + expectedNonce++; discoveryResponse = queue.take(); final String versionInfo3 = discoveryResponse.getVersionInfo(); assertThat(versionInfo3.length()).isEqualTo(64); @@ -129,9 +140,11 @@ public void onCompleted() {} // Remove bar group. deleteGroup(barGroupName, webClient); + expectedNonce++; discoveryResponse = queue.take(); final String versionInfo4 = discoveryResponse.getVersionInfo(); - assertDiscoveryResponse(versionInfo4, discoveryResponse, fooCluster, queue, "3"); + assertDiscoveryResponse(versionInfo4, discoveryResponse, fooCluster, + queue, Integer.toString(expectedNonce)); assertThat(versionInfo4).isEqualTo(versionInfo2); // Send ack sendAck(requestStreamObserver, discoveryResponse); @@ -143,7 +156,8 @@ private static void assertDiscoveryResponse( String versionInfo, DiscoveryResponse discoveryResponse, Cluster fooCluster, BlockingQueue queue, String nonce) throws InvalidProtocolBufferException, InterruptedException { - assertThat(versionInfo.length()).isEqualTo(64); // sha 256 hash length is 64. 256/4 + assertThat(versionInfo.length()).as("The length of versionInfo is not 64: " + versionInfo) + .isEqualTo(64); // sha 256 hash length is 64. 256/4 assertThat(discoveryResponse.getNonce()).isEqualTo(nonce); final List resources = discoveryResponse.getResourcesList(); assertThat(resources.size()).isOne(); diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java index cef04de485..7790d3a6ce 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/k8s/v1/XdsKubernetesServiceTest.java @@ -25,10 +25,16 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -80,19 +86,30 @@ import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMixedDispatcher; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import io.fabric8.mockwebserver.Context; +import io.fabric8.mockwebserver.ServerRequest; +import io.fabric8.mockwebserver.ServerResponse; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; -@EnableKubernetesMockClient(crud = true) class XdsKubernetesServiceTest { @RegisterExtension static final CentralDogmaExtension dogma = new CentralDogmaExtension(); - static KubernetesClient client; + private static CapturingKubernetesMixedDispatcher dispatcher; + private static KubernetesMockServer mock; + private static NamespacedKubernetesClient client; @BeforeAll static void setup() { + setUpK8s(); + putCredential(); + final AggregatedHttpResponse response = createGroup("foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); final List nodes = ImmutableList.of(newNode("1.1.1.1"), newNode("2.2.2.2")); @@ -113,6 +130,24 @@ static void setup() { client.services().resource(service).create(); } + private static void setUpK8s() { + final Map> responses = new HashMap<>(); + dispatcher = new CapturingKubernetesMixedDispatcher(responses); + mock = new KubernetesMockServer(new Context(), new MockWebServer(), responses, dispatcher, false); + mock.init(); + client = mock.createClient(); + } + + private static void putCredential() { + final ImmutableMap credential = ImmutableMap.of("type", "access_token", + "id", "my-credential", + "accessToken", "secret"); + dogma.httpClient().prepare() + .post("/api/v1/projects/@xds/credentials") + .header(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentJson(credential).execute().aggregate().join(); + } + @AfterAll static void cleanupK8sResources() { client.nodes().list().getItems().forEach( @@ -125,15 +160,26 @@ static void cleanupK8sResources() { client.services().list().getItems().forEach( service -> client.services().withName(service.getMetadata().getName()).delete()); client.close(); + mock.destroy(); + } + + @BeforeEach + void clearQueue() { + dispatcher.queue().clear(); } @Test void invalidProperty() throws IOException { final String watcherId = "foo-cluster"; - final ServiceEndpointWatcher watcher = watcher(watcherId, "invalid-service-name"); - final AggregatedHttpResponse response = createWatcher(watcher, watcherId); + ServiceEndpointWatcher watcher = watcher(watcherId, "invalid-service-name", "my-credential"); + AggregatedHttpResponse response = createWatcher(watcher, watcherId); assertThat(response.status()).isSameAs(HttpStatus.INTERNAL_SERVER_ERROR); assertThat(response.contentUtf8()).contains("Failed to retrieve k8s endpoints"); + + watcher = watcher(watcherId, "nginx-service", "invalid-credential-id"); + response = createWatcher(watcher, watcherId); + assertThat(response.status()).isSameAs(HttpStatus.BAD_REQUEST); + assertThat(response.contentUtf8()).contains("failed to find credential 'invalid-credential-id'"); } @Test @@ -164,17 +210,24 @@ void createWatcherRequest() throws IOException { final ClusterLoadAssignment.Builder clusterLoadAssignmentBuilder = ClusterLoadAssignment.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(clusterEntry.contentAsText(), clusterLoadAssignmentBuilder); assertThat(clusterLoadAssignmentBuilder.build()).isEqualTo(loadAssignment); + + dispatcher.queue().forEach(req -> { + // All requests contain the credential. + assertThat(req.getHeaders().get(HttpHeaderNames.AUTHORIZATION.toString())) + .isEqualTo("Bearer secret"); + }); } private static ServiceEndpointWatcher watcher(String watcherId) { - return watcher(watcherId, "nginx-service"); + return watcher(watcherId, "nginx-service", "my-credential"); } - private static ServiceEndpointWatcher watcher(String watcherId, String serviceName) { + private static ServiceEndpointWatcher watcher(String watcherId, String serviceName, String credentialId) { final KubernetesConfig kubernetesConfig = KubernetesConfig.newBuilder() .setControlPlaneUrl(client.getMasterUrl().toString()) .setNamespace(client.getNamespace()) + .setCredentialId(credentialId) .setTrustCerts(true) .build(); return ServiceEndpointWatcher.newBuilder() @@ -371,4 +424,23 @@ static Pod newPod(PodTemplateSpec template, String newNodeName) { .withSpec(spec) .build(); } + + private static class CapturingKubernetesMixedDispatcher extends KubernetesMixedDispatcher { + + private final BlockingQueue queue = new ArrayBlockingQueue<>(16); + + CapturingKubernetesMixedDispatcher(Map> responses) { + super(responses); + } + + BlockingQueue queue() { + return queue; + } + + @Override + public MockResponse dispatch(RecordedRequest request) throws InterruptedException { + queue.add(request); + return super.dispatch(request); + } + } }