From d525f1f1e75131ebdcdabd9072fd7d84125c51e5 Mon Sep 17 00:00:00 2001 From: Lake Mossman Date: Thu, 28 Oct 2021 16:00:37 -0700 Subject: [PATCH] Save specs to source/dest definitions on create and update (#7367) * store spec in db * update tests * run gw format * add TODOs * add lmossman to TODOs * run gw format * remove redundant DockerImageValidator * run gw format --- .../airbyte/server/apis/ConfigurationApi.java | 6 +- .../DestinationDefinitionsHandler.java | 35 +++++++--- .../handlers/SourceDefinitionsHandler.java | 33 ++++++--- .../validators/DockerImageValidator.java | 39 ----------- .../DestinationDefinitionsHandlerTest.java | 43 +++++++++--- .../SourceDefinitionsHandlerTest.java | 43 +++++++++--- .../validators/DockerImageValidatorTest.java | 68 ------------------- 7 files changed, 117 insertions(+), 150 deletions(-) delete mode 100644 airbyte-server/src/main/java/io/airbyte/server/validators/DockerImageValidator.java delete mode 100644 airbyte-server/src/test/java/io/airbyte/server/validators/DockerImageValidatorTest.java diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 3b2dc4f0e846..259bbcf2de55 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -113,7 +113,6 @@ import io.airbyte.server.handlers.WebBackendDestinationsHandler; import io.airbyte.server.handlers.WebBackendSourcesHandler; import io.airbyte.server.handlers.WorkspacesHandler; -import io.airbyte.server.validators.DockerImageValidator; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -170,12 +169,11 @@ public ConfigurationApi(final ConfigRepository configRepository, jobNotifier, temporalService, new OAuthConfigSupplier(configRepository, false, trackingClient)); - final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient); final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence); - sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient); + sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, synchronousSchedulerClient); connectionsHandler = new ConnectionsHandler(configRepository, workspaceHelper, trackingClient); operationsHandler = new OperationsHandler(configRepository); - destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient); + destinationDefinitionsHandler = new DestinationDefinitionsHandler(configRepository, synchronousSchedulerClient); destinationHandler = new DestinationHandler(configRepository, schemaValidator, specFetcher, connectionsHandler); sourceHandler = new SourceHandler(configRepository, schemaValidator, specFetcher, connectionsHandler); workspacesHandler = new WorkspacesHandler(configRepository, connectionsHandler, destinationHandler, sourceHandler); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java index db888c58eec9..459e4dae396c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java @@ -10,14 +10,17 @@ import io.airbyte.api.model.DestinationDefinitionRead; import io.airbyte.api.model.DestinationDefinitionReadList; import io.airbyte.api.model.DestinationDefinitionUpdate; +import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; +import io.airbyte.scheduler.client.SynchronousResponse; +import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.InternalServerKnownException; import io.airbyte.server.services.AirbyteGithubStore; -import io.airbyte.server.validators.DockerImageValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.net.URI; @@ -33,26 +36,22 @@ public class DestinationDefinitionsHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DestinationDefinitionsHandler.class); - private final DockerImageValidator imageValidator; private final ConfigRepository configRepository; private final Supplier uuidSupplier; private final CachingSynchronousSchedulerClient schedulerSynchronousClient; private final AirbyteGithubStore githubStore; public DestinationDefinitionsHandler(final ConfigRepository configRepository, - final DockerImageValidator imageValidator, final CachingSynchronousSchedulerClient schedulerSynchronousClient) { - this(configRepository, imageValidator, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production()); + this(configRepository, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production()); } @VisibleForTesting public DestinationDefinitionsHandler(final ConfigRepository configRepository, - final DockerImageValidator imageValidator, final Supplier uuidSupplier, final CachingSynchronousSchedulerClient schedulerSynchronousClient, final AirbyteGithubStore githubStore) { this.configRepository = configRepository; - this.imageValidator = imageValidator; this.uuidSupplier = uuidSupplier; this.schedulerSynchronousClient = schedulerSynchronousClient; this.githubStore = githubStore; @@ -104,7 +103,8 @@ public DestinationDefinitionRead getDestinationDefinition(final DestinationDefin public DestinationDefinitionRead createDestinationDefinition(final DestinationDefinitionCreate destinationDefinitionCreate) throws JsonValidationException, IOException { - imageValidator.assertValidIntegrationImage(destinationDefinitionCreate.getDockerRepository(), + final ConnectorSpecification spec = getSpecForImage( + destinationDefinitionCreate.getDockerRepository(), destinationDefinitionCreate.getDockerImageTag()); final UUID id = uuidSupplier.get(); @@ -114,7 +114,8 @@ public DestinationDefinitionRead createDestinationDefinition(final DestinationDe .withDockerImageTag(destinationDefinitionCreate.getDockerImageTag()) .withDocumentationUrl(destinationDefinitionCreate.getDocumentationUrl().toString()) .withName(destinationDefinitionCreate.getName()) - .withIcon(destinationDefinitionCreate.getIcon()); + .withIcon(destinationDefinitionCreate.getIcon()) + .withSpec(spec); configRepository.writeStandardDestinationDefinition(destinationDefinition); @@ -125,8 +126,13 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe throws ConfigNotFoundException, IOException, JsonValidationException { final StandardDestinationDefinition currentDestination = configRepository .getStandardDestinationDefinition(destinationDefinitionUpdate.getDestinationDefinitionId()); - imageValidator.assertValidIntegrationImage(currentDestination.getDockerRepository(), - destinationDefinitionUpdate.getDockerImageTag()); + + final boolean imageTagHasChanged = !currentDestination.getDockerImageTag().equals(destinationDefinitionUpdate.getDockerImageTag()); + // TODO (lmossman): remove null spec condition when the spec field becomes required on the + // definition struct + final ConnectorSpecification spec = (imageTagHasChanged || currentDestination.getSpec() == null) + ? getSpecForImage(currentDestination.getDockerRepository(), destinationDefinitionUpdate.getDockerImageTag()) + : currentDestination.getSpec(); final StandardDestinationDefinition newDestination = new StandardDestinationDefinition() .withDestinationDefinitionId(currentDestination.getDestinationDefinitionId()) @@ -134,7 +140,8 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe .withDockerRepository(currentDestination.getDockerRepository()) .withName(currentDestination.getName()) .withDocumentationUrl(currentDestination.getDocumentationUrl()) - .withIcon(currentDestination.getIcon()); + .withIcon(currentDestination.getIcon()) + .withSpec(spec); configRepository.writeStandardDestinationDefinition(newDestination); // we want to re-fetch the spec for updated definitions. @@ -142,6 +149,12 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe return buildDestinationDefinitionRead(newDestination); } + private ConnectorSpecification getSpecForImage(final String dockerRepository, final String imageTag) throws IOException { + final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag); + final SynchronousResponse getSpecResponse = schedulerSynchronousClient.createGetSpecJob(imageName); + return SpecFetcher.getSpecFromJob(getSpecResponse); + } + public static String loadIcon(final String name) { try { return name == null ? null : MoreResources.readResource("icons/" + name); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java index ab4b4612bc64..e251ef034a14 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java @@ -10,14 +10,17 @@ import io.airbyte.api.model.SourceDefinitionRead; import io.airbyte.api.model.SourceDefinitionReadList; import io.airbyte.api.model.SourceDefinitionUpdate; +import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.resources.MoreResources; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; +import io.airbyte.scheduler.client.SynchronousResponse; +import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.InternalServerKnownException; import io.airbyte.server.services.AirbyteGithubStore; -import io.airbyte.server.validators.DockerImageValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.net.URI; @@ -29,7 +32,6 @@ public class SourceDefinitionsHandler { - private final DockerImageValidator imageValidator; private final ConfigRepository configRepository; private final Supplier uuidSupplier; private final AirbyteGithubStore githubStore; @@ -37,20 +39,17 @@ public class SourceDefinitionsHandler { public SourceDefinitionsHandler( final ConfigRepository configRepository, - final DockerImageValidator imageValidator, final CachingSynchronousSchedulerClient schedulerSynchronousClient) { - this(configRepository, imageValidator, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production()); + this(configRepository, UUID::randomUUID, schedulerSynchronousClient, AirbyteGithubStore.production()); } public SourceDefinitionsHandler( final ConfigRepository configRepository, - final DockerImageValidator imageValidator, final Supplier uuidSupplier, final CachingSynchronousSchedulerClient schedulerSynchronousClient, final AirbyteGithubStore githubStore) { this.configRepository = configRepository; this.uuidSupplier = uuidSupplier; - this.imageValidator = imageValidator; this.schedulerSynchronousClient = schedulerSynchronousClient; this.githubStore = githubStore; } @@ -100,7 +99,7 @@ public SourceDefinitionRead getSourceDefinition(final SourceDefinitionIdRequestB public SourceDefinitionRead createSourceDefinition(final SourceDefinitionCreate sourceDefinitionCreate) throws JsonValidationException, IOException { - imageValidator.assertValidIntegrationImage(sourceDefinitionCreate.getDockerRepository(), sourceDefinitionCreate.getDockerImageTag()); + final ConnectorSpecification spec = getSpecForImage(sourceDefinitionCreate.getDockerRepository(), sourceDefinitionCreate.getDockerImageTag()); final UUID id = uuidSupplier.get(); final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() @@ -109,7 +108,8 @@ public SourceDefinitionRead createSourceDefinition(final SourceDefinitionCreate .withDockerImageTag(sourceDefinitionCreate.getDockerImageTag()) .withDocumentationUrl(sourceDefinitionCreate.getDocumentationUrl().toString()) .withName(sourceDefinitionCreate.getName()) - .withIcon(sourceDefinitionCreate.getIcon()); + .withIcon(sourceDefinitionCreate.getIcon()) + .withSpec(spec); configRepository.writeStandardSourceDefinition(sourceDefinition); @@ -120,7 +120,13 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate throws ConfigNotFoundException, IOException, JsonValidationException { final StandardSourceDefinition currentSourceDefinition = configRepository.getStandardSourceDefinition(sourceDefinitionUpdate.getSourceDefinitionId()); - imageValidator.assertValidIntegrationImage(currentSourceDefinition.getDockerRepository(), sourceDefinitionUpdate.getDockerImageTag()); + + final boolean imageTagHasChanged = !currentSourceDefinition.getDockerImageTag().equals(sourceDefinitionUpdate.getDockerImageTag()); + // TODO (lmossman): remove null spec condition when the spec field becomes required on the + // definition struct + final ConnectorSpecification spec = (imageTagHasChanged || currentSourceDefinition.getSpec() == null) + ? getSpecForImage(currentSourceDefinition.getDockerRepository(), sourceDefinitionUpdate.getDockerImageTag()) + : currentSourceDefinition.getSpec(); final StandardSourceDefinition newSource = new StandardSourceDefinition() .withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId()) @@ -128,7 +134,8 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate .withDockerRepository(currentSourceDefinition.getDockerRepository()) .withDocumentationUrl(currentSourceDefinition.getDocumentationUrl()) .withName(currentSourceDefinition.getName()) - .withIcon(currentSourceDefinition.getIcon()); + .withIcon(currentSourceDefinition.getIcon()) + .withSpec(spec); configRepository.writeStandardSourceDefinition(newSource); // we want to re-fetch the spec for updated definitions. @@ -136,6 +143,12 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate return buildSourceDefinitionRead(newSource); } + private ConnectorSpecification getSpecForImage(final String dockerRepository, final String imageTag) throws IOException { + final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag); + final SynchronousResponse getSpecResponse = schedulerSynchronousClient.createGetSpecJob(imageName); + return SpecFetcher.getSpecFromJob(getSpecResponse); + } + public static String loadIcon(final String name) { try { return name == null ? null : MoreResources.readResource("icons/" + name); diff --git a/airbyte-server/src/main/java/io/airbyte/server/validators/DockerImageValidator.java b/airbyte-server/src/main/java/io/airbyte/server/validators/DockerImageValidator.java deleted file mode 100644 index a79bd3a9cff3..000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/validators/DockerImageValidator.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.validators; - -import io.airbyte.commons.docker.DockerUtils; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.scheduler.client.SynchronousResponse; -import io.airbyte.scheduler.client.SynchronousSchedulerClient; -import io.airbyte.server.converters.SpecFetcher; -import io.airbyte.server.errors.BadObjectSchemaKnownException; - -public class DockerImageValidator { - - private final SynchronousSchedulerClient schedulerClient; - - public DockerImageValidator(final SynchronousSchedulerClient schedulerJobClient) { - this.schedulerClient = schedulerJobClient; - } - - /** - * @throws BadObjectSchemaKnownException if it is unable to verify that the input image is a valid - * connector definition image. - */ - public void assertValidIntegrationImage(final String dockerRepository, final String imageTag) throws BadObjectSchemaKnownException { - // Validates that the docker image exists and can generate a compatible spec by running a getSpec - // job on the provided image. - final String imageName = DockerUtils.getTaggedImageName(dockerRepository, imageTag); - try { - final SynchronousResponse getSpecResponse = schedulerClient.createGetSpecJob(imageName); - SpecFetcher.getSpecFromJob(getSpecResponse); - } catch (final Exception e) { - throw new BadObjectSchemaKnownException( - String.format("Encountered an issue while validating input docker image (%s): %s", imageName, e.getMessage())); - } - } - -} diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java index 2360b84ae3a0..519cfb69188e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java @@ -11,18 +11,24 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.api.model.DestinationDefinitionCreate; import io.airbyte.api.model.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.DestinationDefinitionRead; import io.airbyte.api.model.DestinationDefinitionReadList; import io.airbyte.api.model.DestinationDefinitionUpdate; +import io.airbyte.commons.docker.DockerUtils; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; +import io.airbyte.scheduler.client.SynchronousJobMetadata; +import io.airbyte.scheduler.client.SynchronousResponse; import io.airbyte.server.services.AirbyteGithubStore; -import io.airbyte.server.validators.DockerImageValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.net.URI; @@ -37,7 +43,6 @@ class DestinationDefinitionsHandlerTest { - private DockerImageValidator dockerImageValidator; private ConfigRepository configRepository; private StandardDestinationDefinition destination; private DestinationDefinitionsHandler destinationHandler; @@ -50,23 +55,26 @@ class DestinationDefinitionsHandlerTest { void setUp() { configRepository = mock(ConfigRepository.class); uuidSupplier = mock(Supplier.class); - dockerImageValidator = mock(DockerImageValidator.class); destination = generateDestination(); schedulerSynchronousClient = spy(CachingSynchronousSchedulerClient.class); githubStore = mock(AirbyteGithubStore.class); destinationHandler = - new DestinationDefinitionsHandler(configRepository, dockerImageValidator, uuidSupplier, schedulerSynchronousClient, githubStore); + new DestinationDefinitionsHandler(configRepository, uuidSupplier, schedulerSynchronousClient, githubStore); } private StandardDestinationDefinition generateDestination() { + final ConnectorSpecification spec = new ConnectorSpecification().withConnectionSpecification( + Jsons.jsonNode(ImmutableMap.of("foo", "bar"))); + return new StandardDestinationDefinition() .withDestinationDefinitionId(UUID.randomUUID()) .withName("presto") .withDockerImageTag("12.3") .withDockerRepository("repo") .withDocumentationUrl("https://hulu.com") - .withIcon("http.svg"); + .withIcon("http.svg") + .withSpec(spec); } @Test @@ -125,7 +133,13 @@ void testGetDestination() throws JsonValidationException, ConfigNotFoundExceptio @DisplayName("createDestinationDefinition should correctly create a destinationDefinition") void testCreateDestinationDefinition() throws URISyntaxException, IOException, JsonValidationException { final StandardDestinationDefinition destination = generateDestination(); + final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag()); + when(uuidSupplier.get()).thenReturn(destination.getDestinationDefinitionId()); + when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>( + destination.getSpec(), + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + final DestinationDefinitionCreate create = new DestinationDefinitionCreate() .name(destination.getName()) .dockerRepository(destination.getDockerRepository()) @@ -144,7 +158,8 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J final DestinationDefinitionRead actualRead = destinationHandler.createDestinationDefinition(create); assertEquals(expectedRead, actualRead); - verify(dockerImageValidator).assertValidIntegrationImage(destination.getDockerRepository(), destination.getDockerImageTag()); + verify(schedulerSynchronousClient).createGetSpecJob(imageName); + verify(configRepository).writeStandardDestinationDefinition(destination); } @Test @@ -158,11 +173,21 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa final String newDockerImageTag = "averydifferenttag"; assertNotEquals(newDockerImageTag, currentTag); - final DestinationDefinitionRead sourceRead = destinationHandler.updateDestinationDefinition( + final String newImageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), newDockerImageTag); + final ConnectorSpecification newSpec = new ConnectorSpecification().withConnectionSpecification( + Jsons.jsonNode(ImmutableMap.of("foo2", "bar2"))); + when(schedulerSynchronousClient.createGetSpecJob(newImageName)).thenReturn(new SynchronousResponse<>( + newSpec, + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final StandardDestinationDefinition updatedDestination = Jsons.clone(destination).withDockerImageTag(newDockerImageTag).withSpec(newSpec); + + final DestinationDefinitionRead destinationRead = destinationHandler.updateDestinationDefinition( new DestinationDefinitionUpdate().destinationDefinitionId(this.destination.getDestinationDefinitionId()).dockerImageTag(newDockerImageTag)); - assertEquals(newDockerImageTag, sourceRead.getDockerImageTag()); - verify(dockerImageValidator).assertValidIntegrationImage(dockerRepository, newDockerImageTag); + assertEquals(newDockerImageTag, destinationRead.getDockerImageTag()); + verify(schedulerSynchronousClient).createGetSpecJob(newImageName); + verify(configRepository).writeStandardDestinationDefinition(updatedDestination); verify(schedulerSynchronousClient).resetCache(); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java index 8f2423a37ef1..1c9ae3e9306b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java @@ -12,18 +12,24 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.api.model.SourceDefinitionCreate; import io.airbyte.api.model.SourceDefinitionIdRequestBody; import io.airbyte.api.model.SourceDefinitionRead; import io.airbyte.api.model.SourceDefinitionReadList; import io.airbyte.api.model.SourceDefinitionUpdate; +import io.airbyte.commons.docker.DockerUtils; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; +import io.airbyte.scheduler.client.SynchronousJobMetadata; +import io.airbyte.scheduler.client.SynchronousResponse; import io.airbyte.server.services.AirbyteGithubStore; -import io.airbyte.server.validators.DockerImageValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; import java.net.URI; @@ -39,7 +45,6 @@ class SourceDefinitionsHandlerTest { private ConfigRepository configRepository; - private DockerImageValidator dockerImageValidator; private StandardSourceDefinition source; private SourceDefinitionsHandler sourceHandler; private Supplier uuidSupplier; @@ -51,17 +56,18 @@ class SourceDefinitionsHandlerTest { void setUp() { configRepository = mock(ConfigRepository.class); uuidSupplier = mock(Supplier.class); - dockerImageValidator = mock(DockerImageValidator.class); schedulerSynchronousClient = spy(CachingSynchronousSchedulerClient.class); githubStore = mock(AirbyteGithubStore.class); source = generateSource(); - sourceHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, uuidSupplier, schedulerSynchronousClient, githubStore); + sourceHandler = new SourceDefinitionsHandler(configRepository, uuidSupplier, schedulerSynchronousClient, githubStore); } private StandardSourceDefinition generateSource() { final UUID sourceId = UUID.randomUUID(); + final ConnectorSpecification spec = new ConnectorSpecification().withConnectionSpecification( + Jsons.jsonNode(ImmutableMap.of("foo", "bar"))); return new StandardSourceDefinition() .withSourceDefinitionId(sourceId) @@ -69,7 +75,8 @@ private StandardSourceDefinition generateSource() { .withDocumentationUrl("https://netflix.com") .withDockerRepository("dockerstuff") .withDockerImageTag("12.3") - .withIcon("http.svg"); + .withIcon("http.svg") + .withSpec(spec); } @Test @@ -97,7 +104,8 @@ void testListSourceDefinitions() throws JsonValidationException, IOException, UR final SourceDefinitionReadList actualSourceDefinitionReadList = sourceHandler.listSourceDefinitions(); - assertEquals(Lists.newArrayList(expectedSourceDefinitionRead1, expectedSourceDefinitionRead2), + assertEquals( + Lists.newArrayList(expectedSourceDefinitionRead1, expectedSourceDefinitionRead2), actualSourceDefinitionReadList.getSourceDefinitions()); } @@ -127,7 +135,13 @@ void testGetSourceDefinition() throws JsonValidationException, ConfigNotFoundExc @DisplayName("createSourceDefinition should correctly create a sourceDefinition") void testCreateSourceDefinition() throws URISyntaxException, IOException, JsonValidationException { final StandardSourceDefinition source = generateSource(); + final String imageName = DockerUtils.getTaggedImageName(source.getDockerRepository(), source.getDockerImageTag()); + when(uuidSupplier.get()).thenReturn(source.getSourceDefinitionId()); + when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>( + source.getSpec(), + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + final SourceDefinitionCreate create = new SourceDefinitionCreate() .name(source.getName()) .dockerRepository(source.getDockerRepository()) @@ -146,12 +160,13 @@ void testCreateSourceDefinition() throws URISyntaxException, IOException, JsonVa final SourceDefinitionRead actualRead = sourceHandler.createSourceDefinition(create); assertEquals(expectedRead, actualRead); - verify(dockerImageValidator).assertValidIntegrationImage(source.getDockerRepository(), source.getDockerImageTag()); + verify(schedulerSynchronousClient).createGetSpecJob(imageName); + verify(configRepository).writeStandardSourceDefinition(source); } @Test @DisplayName("updateSourceDefinition should correctly update a sourceDefinition") - void testUpdateSourceDefinition() throws ConfigNotFoundException, IOException, JsonValidationException { + void testUpdateSourceDefinition() throws ConfigNotFoundException, IOException, JsonValidationException, URISyntaxException { when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())).thenReturn(source); final String newDockerImageTag = "averydifferenttag"; final SourceDefinitionRead sourceDefinition = sourceHandler @@ -160,11 +175,21 @@ void testUpdateSourceDefinition() throws ConfigNotFoundException, IOException, J final String currentTag = sourceDefinition.getDockerImageTag(); assertNotEquals(newDockerImageTag, currentTag); + final String newImageName = DockerUtils.getTaggedImageName(source.getDockerRepository(), newDockerImageTag); + final ConnectorSpecification newSpec = new ConnectorSpecification().withConnectionSpecification( + Jsons.jsonNode(ImmutableMap.of("foo2", "bar2"))); + when(schedulerSynchronousClient.createGetSpecJob(newImageName)).thenReturn(new SynchronousResponse<>( + newSpec, + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final StandardSourceDefinition updatedSource = Jsons.clone(source).withDockerImageTag(newDockerImageTag).withSpec(newSpec); + final SourceDefinitionRead sourceDefinitionRead = sourceHandler .updateSourceDefinition(new SourceDefinitionUpdate().sourceDefinitionId(source.getSourceDefinitionId()).dockerImageTag(newDockerImageTag)); assertEquals(newDockerImageTag, sourceDefinitionRead.getDockerImageTag()); - verify(dockerImageValidator).assertValidIntegrationImage(dockerRepository, newDockerImageTag); + verify(schedulerSynchronousClient).createGetSpecJob(newImageName); + verify(configRepository).writeStandardSourceDefinition(updatedSource); verify(schedulerSynchronousClient).resetCache(); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/validators/DockerImageValidatorTest.java b/airbyte-server/src/test/java/io/airbyte/server/validators/DockerImageValidatorTest.java deleted file mode 100644 index d094c3426074..000000000000 --- a/airbyte-server/src/test/java/io/airbyte/server/validators/DockerImageValidatorTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.validators; - -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import io.airbyte.commons.docker.DockerUtils; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.scheduler.client.SynchronousResponse; -import io.airbyte.scheduler.client.SynchronousSchedulerClient; -import io.airbyte.server.errors.KnownException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class DockerImageValidatorTest { - - private SynchronousSchedulerClient schedulerJobClient; - private DockerImageValidator validator; - - @BeforeEach - public void init() { - schedulerJobClient = mock(SynchronousSchedulerClient.class); - validator = new DockerImageValidator(schedulerJobClient); - } - - @SuppressWarnings("unchecked") - @Test - public void testAssertImageIsValid() throws URISyntaxException, IOException { - final String repo = "repo"; - final String tag = "tag"; - final String imageName = DockerUtils.getTaggedImageName(repo, tag); - - final SynchronousResponse specResponse = mock(SynchronousResponse.class); - final ConnectorSpecification connectorSpecification = new ConnectorSpecification() - .withDocumentationUrl(new URI("https://google.com")) - .withChangelogUrl(new URI("https://google.com")) - .withConnectionSpecification(Jsons.jsonNode(new HashMap<>())); - when(schedulerJobClient.createGetSpecJob(imageName)).thenReturn(specResponse); - when(specResponse.getOutput()).thenReturn(connectorSpecification); - when(specResponse.isSuccess()).thenReturn(true); - - assertDoesNotThrow(() -> validator.assertValidIntegrationImage(repo, tag)); - } - - @SuppressWarnings("unchecked") - @Test - public void testThrowsOnInvalidImage() throws IOException { - final String repo = "repo"; - final String tag = "tag"; - final String imageName = DockerUtils.getTaggedImageName(repo, tag); - final SynchronousResponse specResponse = mock(SynchronousResponse.class); - when(specResponse.isSuccess()).thenReturn(false); - when(schedulerJobClient.createGetSpecJob(imageName)).thenReturn(specResponse); - - assertThrows(KnownException.class, () -> validator.assertValidIntegrationImage(repo, tag)); - } - -}