diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/BucketSpecCacheSchedulerClient.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/BucketSpecCacheSchedulerClient.java index e48915686276..a615643d0830 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/BucketSpecCacheSchedulerClient.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/BucketSpecCacheSchedulerClient.java @@ -23,9 +23,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.time.Instant; import java.util.Optional; -import java.util.UUID; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,16 +81,7 @@ public SynchronousResponse createGetSpecJob(final String if (cachedSpecOptional.isPresent()) { LOGGER.debug("Spec bucket cache: Cache hit."); - final long now = Instant.now().toEpochMilli(); - final SynchronousJobMetadata mockMetadata = new SynchronousJobMetadata( - UUID.randomUUID(), - ConfigType.GET_SPEC, - null, - now, - now, - true, - null); - return new SynchronousResponse<>(cachedSpecOptional.get(), mockMetadata); + return new SynchronousResponse<>(cachedSpecOptional.get(), SynchronousJobMetadata.mock(ConfigType.GET_SPEC)); } else { LOGGER.debug("Spec bucket cache: Cache miss."); return client.createGetSpecJob(dockerImage); diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousJobMetadata.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousJobMetadata.java index 2b80602018c0..8f7d70390919 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousJobMetadata.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SynchronousJobMetadata.java @@ -7,6 +7,7 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.workers.temporal.JobMetadata; import java.nio.file.Path; +import java.time.Instant; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -114,4 +115,20 @@ public String toString() { '}'; } + public static SynchronousJobMetadata mock(final ConfigType configType) { + final long now = Instant.now().toEpochMilli(); + final UUID configId = null; + final boolean succeeded = true; + final Path logPath = null; + + return new SynchronousJobMetadata( + UUID.randomUUID(), + configType, + configId, + now, + now, + succeeded, + logPath); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 6d03bd2c040e..bf6f19b4674d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -219,7 +219,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient); // required before migration - configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.execute(dockerImage))); + // TODO: remove this specFetcherFn logic once file migrations are deprecated + configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage))); Optional airbyteDatabaseVersion = jobPersistence.getVersion(); if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java index f615d6ef7ae1..1fecb41fb577 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ConfigurationUpdate.java @@ -5,7 +5,6 @@ package io.airbyte.server.converters; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; @@ -42,8 +41,7 @@ public SourceConnection source(final UUID sourceId, final String sourceName, fin persistedSource.setName(sourceName); // get spec final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(persistedSource.getSourceDefinitionId()); - final String imageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag()); - final ConnectorSpecification spec = specFetcher.execute(imageName); + final ConnectorSpecification spec = specFetcher.getSpec(sourceDefinition); // copy any necessary secrets from the current source to the incoming updated source final JsonNode updatedConfiguration = secretsProcessor.copySecrets( persistedSource.getConfiguration(), @@ -61,8 +59,7 @@ public DestinationConnection destination(final UUID destinationId, final String // get spec final StandardDestinationDefinition destinationDefinition = configRepository .getStandardDestinationDefinition(persistedDestination.getDestinationDefinitionId()); - final String imageName = DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), destinationDefinition.getDockerImageTag()); - final ConnectorSpecification spec = specFetcher.execute(imageName); + final ConnectorSpecification spec = specFetcher.getSpec(destinationDefinition); // copy any necessary secrets from the current destination to the incoming updated destination final JsonNode updatedConfiguration = secretsProcessor.copySecrets( persistedDestination.getConfiguration(), diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/SpecFetcher.java b/airbyte-server/src/main/java/io/airbyte/server/converters/SpecFetcher.java index 24e8446c9ada..bd9a10d40183 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/SpecFetcher.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/SpecFetcher.java @@ -5,24 +5,78 @@ package io.airbyte.server.converters; import com.google.common.base.Preconditions; +import io.airbyte.commons.docker.DockerUtils; +import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.client.SynchronousJobMetadata; import io.airbyte.scheduler.client.SynchronousResponse; import io.airbyte.scheduler.client.SynchronousSchedulerClient; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SpecFetcher { + private static final Logger LOGGER = LoggerFactory.getLogger(SpecFetcher.class); + private final SynchronousSchedulerClient schedulerJobClient; public SpecFetcher(final SynchronousSchedulerClient schedulerJobClient) { this.schedulerJobClient = schedulerJobClient; } - public ConnectorSpecification execute(final String dockerImage) throws IOException { + // TODO: remove this once file migrations are deprecated, as that is the only time this function is + // used + @Deprecated + public ConnectorSpecification getSpec(final String dockerImage) throws IOException { return getSpecFromJob(schedulerJobClient.createGetSpecJob(dockerImage)); } - private static ConnectorSpecification getSpecFromJob(final SynchronousResponse response) { + public ConnectorSpecification getSpec(final StandardSourceDefinition sourceDefinition) throws IOException { + return getSpecFromJob(getSpecJobResponse(sourceDefinition)); + } + + public ConnectorSpecification getSpec(final StandardDestinationDefinition destinationDefinition) throws IOException { + return getSpecFromJob(getSpecJobResponse(destinationDefinition)); + } + + // TODO: remove this method once the spec is a required field on the StandardSourceDefinition struct + public SynchronousResponse getSpecJobResponse(final StandardSourceDefinition sourceDefinition) throws IOException { + LOGGER.debug("Spec Fetcher: Getting spec for Source Definition."); + final ConnectorSpecification spec = sourceDefinition.getSpec(); + + if (spec != null) { + LOGGER.debug("Spec Fetcher: Spec found in Source Definition."); + return new SynchronousResponse<>(spec, SynchronousJobMetadata.mock(ConfigType.GET_SPEC)); + } + + LOGGER.debug("Spec Fetcher: Spec not found in Source Definition, fetching with scheduler job instead."); + final String dockerImageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag()); + return schedulerJobClient.createGetSpecJob(dockerImageName); + } + + // TODO: remove this method once the spec is a required field on the StandardDestinationDefinition + // struct + public SynchronousResponse getSpecJobResponse(final StandardDestinationDefinition destinationDefinition) + throws IOException { + LOGGER.debug("Spec Fetcher: Getting spec for Destination Definition."); + final ConnectorSpecification spec = destinationDefinition.getSpec(); + + if (spec != null) { + LOGGER.debug("Spec Fetcher: Spec found in Destination Definition."); + return new SynchronousResponse<>(spec, SynchronousJobMetadata.mock(ConfigType.GET_SPEC)); + } + + LOGGER.debug("Spec Fetcher: Spec not found in Destination Definition, fetching with scheduler job instead."); + final String dockerImageName = DockerUtils.getTaggedImageName( + destinationDefinition.getDockerRepository(), + destinationDefinition.getDockerImageTag()); + return schedulerJobClient.createGetSpecJob(dockerImageName); + } + + public static ConnectorSpecification getSpecFromJob(final SynchronousResponse response) { Preconditions.checkState(response.isSuccess(), "Get Spec job failed."); Preconditions.checkNotNull(response.getOutput(), "Get Spec job return null spec"); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 6548c7b32b7a..5c85065240a4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -15,7 +15,6 @@ import io.airbyte.api.model.DestinationSearch; import io.airbyte.api.model.DestinationUpdate; import io.airbyte.api.model.WorkspaceIdRequestBody; -import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; @@ -213,7 +212,7 @@ public ConnectorSpecification getSpec(final UUID destinationDefinitionId) public static ConnectorSpecification getSpec(final SpecFetcher specFetcher, final StandardDestinationDefinition destinationDef) throws JsonValidationException, IOException, ConfigNotFoundException { - return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag())); + return specFetcher.getSpec(destinationDef); } private void persistDestinationConnection(final String name, diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index b9709985d165..a80fe8699df5 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -136,11 +136,9 @@ public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdReque public CheckConnectionRead checkSourceConnectionFromSourceCreate(final SourceCoreConfig sourceConfig) throws ConfigNotFoundException, IOException, JsonValidationException { final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceConfig.getSourceDefinitionId()); - final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag()); - final var partialConfig = configRepository.statefulSplitEphemeralSecrets( sourceConfig.getConnectionConfiguration(), - specFetcher.execute(imageName)); + specFetcher.getSpec(sourceDef)); // todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are // technically declared as required. @@ -148,6 +146,7 @@ public CheckConnectionRead checkSourceConnectionFromSourceCreate(final SourceCor .withSourceDefinitionId(sourceConfig.getSourceDefinitionId()) .withConfiguration(partialConfig); + final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag()); return reportConnectionStatus(synchronousSchedulerClient.createSourceCheckConnectionJob(source, imageName)); } @@ -177,11 +176,9 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationId(final Des public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final DestinationCoreConfig destinationConfig) throws ConfigNotFoundException, IOException, JsonValidationException { final StandardDestinationDefinition destDef = configRepository.getStandardDestinationDefinition(destinationConfig.getDestinationDefinitionId()); - final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag()); - final var partialConfig = configRepository.statefulSplitEphemeralSecrets( destinationConfig.getConnectionConfiguration(), - specFetcher.execute(imageName)); + specFetcher.getSpec(destDef)); // todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are // technically declared as required. @@ -189,6 +186,7 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final .withDestinationDefinitionId(destinationConfig.getDestinationDefinitionId()) .withConfiguration(partialConfig); + final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag()); return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName)); } @@ -244,8 +242,7 @@ public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(final throws ConfigNotFoundException, IOException, JsonValidationException { final UUID sourceDefinitionId = sourceDefinitionIdRequestBody.getSourceDefinitionId(); final StandardSourceDefinition source = configRepository.getStandardSourceDefinition(sourceDefinitionId); - final String imageName = DockerUtils.getTaggedImageName(source.getDockerRepository(), source.getDockerImageTag()); - final SynchronousResponse response = getConnectorSpecification(imageName); + final SynchronousResponse response = specFetcher.getSpecJobResponse(source); final ConnectorSpecification spec = response.getOutput(); final SourceDefinitionSpecificationRead specRead = new SourceDefinitionSpecificationRead() .jobInfo(JobConverter.getSynchronousJobRead(response)) @@ -265,8 +262,7 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(final throws ConfigNotFoundException, IOException, JsonValidationException { final UUID destinationDefinitionId = destinationDefinitionIdRequestBody.getDestinationDefinitionId(); final StandardDestinationDefinition destination = configRepository.getStandardDestinationDefinition(destinationDefinitionId); - final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag()); - final SynchronousResponse response = getConnectorSpecification(imageName); + final SynchronousResponse response = specFetcher.getSpecJobResponse(destination); final ConnectorSpecification spec = response.getOutput(); final DestinationDefinitionSpecificationRead specRead = new DestinationDefinitionSpecificationRead() @@ -286,10 +282,6 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(final return specRead; } - public SynchronousResponse getConnectorSpecification(final String dockerImage) throws IOException { - return synchronousSchedulerClient.createGetSpecJob(dockerImage); - } - public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { final UUID connectionId = connectionIdRequestBody.getConnectionId(); @@ -378,9 +370,9 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE } private void cancelTemporalWorkflowIfPresent(final long jobId) throws IOException { - final var latestAttemptId = jobPersistence.getJob(jobId).getAttempts().size() - 1; // attempts ids are monotonically increasing starting from 0 - // and - // specific to a job id, allowing us to do this. + // attempts ids are monotonically increasing starting from 0 and specific to a job id, allowing us + // to do this. + final var latestAttemptId = jobPersistence.getJob(jobId).getAttempts().size() - 1; final var workflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, latestAttemptId); if (workflowId.isPresent()) { @@ -416,15 +408,13 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse getSpecResponse = schedulerClient.createGetSpecJob(imageName); + SpecFetcher.getSpecFromJob(getSpecResponse); } catch (final Exception e) { throw new BadObjectSchemaKnownException( String.format("Encountered an issue while validating input docker image (%s): %s", imageName, e.getMessage())); diff --git a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java index f50e59614414..1359555b3d99 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/ConfigDumpImporterTest.java @@ -70,7 +70,8 @@ public void setup() throws IOException, JsonValidationException, ConfigNotFoundE specFetcher = mock(SpecFetcher.class); emptyConnectorSpec = mock(ConnectorSpecification.class); when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject()); - when(specFetcher.execute(any())).thenReturn(emptyConnectorSpec); + when(specFetcher.getSpec(any(StandardSourceDefinition.class))).thenReturn(emptyConnectorSpec); + when(specFetcher.getSpec(any(StandardDestinationDefinition.class))).thenReturn(emptyConnectorSpec); configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence, workspaceHelper, mock(JsonSchemaValidator.class), specFetcher, true); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java index 52f639d60214..e5173a306368 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/ConfigurationUpdateTest.java @@ -88,7 +88,7 @@ void setup() { void testSourceUpdate() throws JsonValidationException, IOException, ConfigNotFoundException { when(configRepository.getSourceConnectionWithSecrets(UUID1)).thenReturn(ORIGINAL_SOURCE_CONNECTION); when(configRepository.getStandardSourceDefinition(UUID2)).thenReturn(SOURCE_DEFINITION); - when(specFetcher.execute(IMAGE_NAME)).thenReturn(CONNECTOR_SPECIFICATION); + when(specFetcher.getSpec(SOURCE_DEFINITION)).thenReturn(CONNECTOR_SPECIFICATION); when(secretsProcessor.copySecrets(ORIGINAL_CONFIGURATION, NEW_CONFIGURATION, SPEC)).thenReturn(NEW_CONFIGURATION); final SourceConnection actual = configurationUpdate.source(UUID1, ORIGINAL_SOURCE_CONNECTION.getName(), NEW_CONFIGURATION); @@ -100,7 +100,7 @@ void testSourceUpdate() throws JsonValidationException, IOException, ConfigNotFo void testDestinationUpdate() throws JsonValidationException, IOException, ConfigNotFoundException { when(configRepository.getDestinationConnectionWithSecrets(UUID1)).thenReturn(ORIGINAL_DESTINATION_CONNECTION); when(configRepository.getStandardDestinationDefinition(UUID2)).thenReturn(DESTINATION_DEFINITION); - when(specFetcher.execute(IMAGE_NAME)).thenReturn(CONNECTOR_SPECIFICATION); + when(specFetcher.getSpec(DESTINATION_DEFINITION)).thenReturn(CONNECTOR_SPECIFICATION); when(secretsProcessor.copySecrets(ORIGINAL_CONFIGURATION, NEW_CONFIGURATION, SPEC)).thenReturn(NEW_CONFIGURATION); final DestinationConnection actual = configurationUpdate.destination(UUID1, ORIGINAL_DESTINATION_CONNECTION.getName(), NEW_CONFIGURATION); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/SpecFetcherTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/SpecFetcherTest.java index 079aedd8061c..21dc94fdafe3 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/SpecFetcherTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/SpecFetcherTest.java @@ -11,20 +11,28 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.scheduler.client.SynchronousResponse; import io.airbyte.scheduler.client.SynchronousSchedulerClient; import java.io.IOException; +import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class SpecFetcherTest { - private static final String IMAGE_NAME = "foo:bar"; + private static final String DOCKER_REPOSITORY = "foo"; + private static final String DOCKER_IMAGE_TAG = "bar"; + private static final String IMAGE_NAME = DOCKER_REPOSITORY + ":" + DOCKER_IMAGE_TAG; private SynchronousSchedulerClient schedulerJobClient; private SynchronousResponse response; private ConnectorSpecification connectorSpecification; + private StandardSourceDefinition sourceDefinition; + private StandardDestinationDefinition destinationDefinition; @SuppressWarnings("unchecked") @BeforeEach @@ -32,25 +40,87 @@ void setup() { schedulerJobClient = mock(SynchronousSchedulerClient.class); response = mock(SynchronousResponse.class); connectorSpecification = new ConnectorSpecification().withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar"))); + sourceDefinition = new StandardSourceDefinition().withDockerRepository(DOCKER_REPOSITORY).withDockerImageTag(DOCKER_IMAGE_TAG); + destinationDefinition = new StandardDestinationDefinition().withDockerRepository(DOCKER_REPOSITORY).withDockerImageTag(DOCKER_IMAGE_TAG); } @Test - void testFetch() throws IOException { + void testGetSpecFromDockerImageSuccess() throws IOException { when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response); when(response.isSuccess()).thenReturn(true); when(response.getOutput()).thenReturn(connectorSpecification); final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); - assertEquals(connectorSpecification, specFetcher.execute(IMAGE_NAME)); + assertEquals(connectorSpecification, specFetcher.getSpec(IMAGE_NAME)); } @Test - void testFetchEmpty() throws IOException { + void testGetSpecFromDockerImageFail() throws IOException { when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response); when(response.isSuccess()).thenReturn(false); final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); - assertThrows(IllegalStateException.class, () -> specFetcher.execute(IMAGE_NAME)); + assertThrows(IllegalStateException.class, () -> specFetcher.getSpec(IMAGE_NAME)); + } + + @Test + void testGetSpecFromSourceDefinitionNotNull() throws IOException { + final StandardSourceDefinition sourceDefinitionWithSpec = Jsons.clone(sourceDefinition).withSpec(connectorSpecification); + + final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); + assertEquals(connectorSpecification, specFetcher.getSpec(sourceDefinitionWithSpec)); + } + + @Test + void testGetSpecFromSourceDefinitionNull() throws IOException { + when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response); + when(response.isSuccess()).thenReturn(true); + when(response.getOutput()).thenReturn(connectorSpecification); + + final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); + assertEquals(connectorSpecification, specFetcher.getSpec(sourceDefinition)); + } + + @Test + void testGetSpecFromDestinationDefinitionNotNull() throws IOException { + final StandardDestinationDefinition destinationDefinitionWithSpec = Jsons.clone(destinationDefinition).withSpec(connectorSpecification); + + final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); + assertEquals(connectorSpecification, specFetcher.getSpec(destinationDefinitionWithSpec)); + } + + @Test + void testGetSpecFromDestinationDefinitionNull() throws IOException { + when(schedulerJobClient.createGetSpecJob(IMAGE_NAME)).thenReturn(response); + when(response.isSuccess()).thenReturn(true); + when(response.getOutput()).thenReturn(connectorSpecification); + + final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); + assertEquals(connectorSpecification, specFetcher.getSpec(destinationDefinition)); + } + + @Test + void testGetSpecJobResponseFromSourceReturnsMockedJobMetadata() throws IOException { + final StandardSourceDefinition sourceDefinitionWithSpec = Jsons.clone(sourceDefinition).withSpec(connectorSpecification); + + final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); + final SynchronousResponse response = specFetcher.getSpecJobResponse(sourceDefinitionWithSpec); + + assertEquals(ConfigType.GET_SPEC, response.getMetadata().getConfigType()); + assertEquals(Optional.empty(), response.getMetadata().getConfigId()); + assertEquals(connectorSpecification, response.getOutput()); + } + + @Test + void testGetSpecJobResponseFromDestinationReturnsMockedJobMetadata() throws IOException { + final StandardDestinationDefinition destinationDefinitionWithSpec = Jsons.clone(destinationDefinition).withSpec(connectorSpecification); + + final SpecFetcher specFetcher = new SpecFetcher(schedulerJobClient); + final SynchronousResponse response = specFetcher.getSpecJobResponse(destinationDefinitionWithSpec); + + assertEquals(ConfigType.GET_SPEC, response.getMetadata().getConfigType()); + assertEquals(Optional.empty(), response.getMetadata().getConfigId()); + assertEquals(connectorSpecification, response.getOutput()); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index 2aa2546a191d..9ffe0bec0355 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -23,6 +23,7 @@ import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; import io.airbyte.config.persistence.ConfigPersistence; @@ -116,7 +117,8 @@ public void setup() throws Exception { final SpecFetcher specFetcher = mock(SpecFetcher.class); final ConnectorSpecification emptyConnectorSpec = mock(ConnectorSpecification.class); when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject()); - when(specFetcher.execute(any())).thenReturn(emptyConnectorSpec); + when(specFetcher.getSpec(any(StandardSourceDefinition.class))).thenReturn(emptyConnectorSpec); + when(specFetcher.getSpec(any(StandardDestinationDefinition.class))).thenReturn(emptyConnectorSpec); archiveHandler = new ArchiveHandler( VERSION, diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java index 5ea870cd8229..fee914ef03b4 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationHandlerTest.java @@ -110,7 +110,7 @@ void testCreateDestination() throws JsonValidationException, ConfigNotFoundExcep .thenReturn(destinationConnection); when(configRepository.getStandardDestinationDefinition(standardDestinationDefinition.getDestinationDefinitionId())) .thenReturn(standardDestinationDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardDestinationDefinition)).thenReturn(connectorSpecification); when(secretsProcessor.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification())) .thenReturn(destinationConnection.getConfiguration()); @@ -159,7 +159,7 @@ void testDeleteDestination() throws JsonValidationException, ConfigNotFoundExcep .thenReturn(expectedDestinationConnection); when(configRepository.getStandardDestinationDefinition(standardDestinationDefinition.getDestinationDefinitionId())) .thenReturn(standardDestinationDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardDestinationDefinition)).thenReturn(connectorSpecification); when(connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)).thenReturn(connectionReadList); destinationHandler.deleteDestination(destinationId); @@ -196,7 +196,7 @@ void testUpdateDestination() throws JsonValidationException, ConfigNotFoundExcep .thenReturn(standardDestinationDefinition); when(configRepository.getDestinationConnection(destinationConnection.getDestinationId())) .thenReturn(expectedDestinationConnection); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardDestinationDefinition)).thenReturn(connectorSpecification); when(configurationUpdate.destination(destinationConnection.getDestinationId(), updatedDestName, newConfiguration)) .thenReturn(expectedDestinationConnection); @@ -229,7 +229,7 @@ void testGetDestination() throws JsonValidationException, ConfigNotFoundExceptio when(configRepository.getDestinationConnection(destinationConnection.getDestinationId())).thenReturn(destinationConnection); when(configRepository.getStandardDestinationDefinition(standardDestinationDefinition.getDestinationDefinitionId())) .thenReturn(standardDestinationDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardDestinationDefinition)).thenReturn(connectorSpecification); final DestinationRead actualDestinationRead = destinationHandler.getDestination(destinationIdRequestBody); @@ -263,7 +263,7 @@ void testListDestinationForWorkspace() throws JsonValidationException, ConfigNot when(configRepository.getDestinationConnection(destinationConnection.getDestinationId())).thenReturn(destinationConnection); when(configRepository.listDestinationConnection()).thenReturn(Lists.newArrayList(destinationConnection)); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardDestinationDefinition)).thenReturn(connectorSpecification); when(configRepository.getStandardDestinationDefinition(standardDestinationDefinition.getDestinationDefinitionId())) .thenReturn(standardDestinationDefinition); when(secretsProcessor.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification())) @@ -288,7 +288,7 @@ void testSearchDestinations() throws JsonValidationException, ConfigNotFoundExce when(configRepository.getDestinationConnection(destinationConnection.getDestinationId())).thenReturn(destinationConnection); when(configRepository.listDestinationConnection()).thenReturn(Lists.newArrayList(destinationConnection)); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardDestinationDefinition)).thenReturn(connectorSpecification); when(configRepository.getStandardDestinationDefinition(standardDestinationDefinition.getDestinationDefinitionId())) .thenReturn(standardDestinationDefinition); when(secretsProcessor.maskSecrets(destinationConnection.getConfiguration(), destinationDefinitionSpecificationRead.getConnectionSpecification())) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 85c1b0b76ba5..3f86d2d6cc93 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -209,14 +209,15 @@ void testCheckSourceConnectionFromUpdate() throws IOException, JsonValidationExc .name(source.getName()) .sourceId(source.getSourceId()) .connectionConfiguration(source.getConfiguration()); + final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() + .withDockerRepository(DESTINATION_DOCKER_REPO) + .withDockerImageTag(DESTINATION_DOCKER_TAG) + .withSourceDefinitionId(source.getSourceDefinitionId()); when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) - .thenReturn(new StandardSourceDefinition() - .withDockerRepository(DESTINATION_DOCKER_REPO) - .withDockerImageTag(DESTINATION_DOCKER_TAG) - .withSourceDefinitionId(source.getSourceDefinitionId())); + .thenReturn(sourceDefinition); when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); when(configurationUpdate.source(source.getSourceId(), source.getName(), sourceUpdate.getConnectionConfiguration())).thenReturn(source); - when(specFetcher.execute(DESTINATION_DOCKER_IMAGE)).thenReturn(CONNECTION_SPECIFICATION); + when(specFetcher.getSpec(sourceDefinition)).thenReturn(CONNECTION_SPECIFICATION); final SourceConnection submittedSource = new SourceConnection() .withSourceDefinitionId(source.getSourceDefinitionId()) .withConfiguration(source.getConfiguration()); @@ -236,13 +237,14 @@ void testGetSourceSpec() throws JsonValidationException, IOException, ConfigNotF final SourceDefinitionIdRequestBody sourceDefinitionIdRequestBody = new SourceDefinitionIdRequestBody().sourceDefinitionId(UUID.randomUUID()); final SynchronousResponse specResponse = (SynchronousResponse) jobResponse; + final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition() + .withName("name") + .withDockerRepository(SOURCE_DOCKER_REPO) + .withDockerImageTag(SOURCE_DOCKER_TAG) + .withSourceDefinitionId(sourceDefinitionIdRequestBody.getSourceDefinitionId()); when(configRepository.getStandardSourceDefinition(sourceDefinitionIdRequestBody.getSourceDefinitionId())) - .thenReturn(new StandardSourceDefinition() - .withName("name") - .withDockerRepository(SOURCE_DOCKER_REPO) - .withDockerImageTag(SOURCE_DOCKER_TAG) - .withSourceDefinitionId(sourceDefinitionIdRequestBody.getSourceDefinitionId())); - when(synchronousSchedulerClient.createGetSpecJob(SOURCE_DOCKER_IMAGE)) + .thenReturn(sourceDefinition); + when(specFetcher.getSpecJobResponse(sourceDefinition)) .thenReturn(specResponse); when(specResponse.getOutput()).thenReturn(CONNECTION_SPECIFICATION); @@ -257,13 +259,14 @@ void testGetDestinationSpec() throws JsonValidationException, IOException, Confi new DestinationDefinitionIdRequestBody().destinationDefinitionId(UUID.randomUUID()); final SynchronousResponse specResponse = (SynchronousResponse) this.jobResponse; + final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() + .withName("name") + .withDockerRepository(DESTINATION_DOCKER_REPO) + .withDockerImageTag(DESTINATION_DOCKER_TAG) + .withDestinationDefinitionId(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); when(configRepository.getStandardDestinationDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) - .thenReturn(new StandardDestinationDefinition() - .withName("name") - .withDockerRepository(DESTINATION_DOCKER_REPO) - .withDockerImageTag(DESTINATION_DOCKER_TAG) - .withDestinationDefinitionId(destinationDefinitionIdRequestBody.getDestinationDefinitionId())); - when(synchronousSchedulerClient.createGetSpecJob(DESTINATION_DOCKER_IMAGE)) + .thenReturn(destinationDefinition); + when(specFetcher.getSpecJobResponse(destinationDefinition)) .thenReturn(specResponse); when(specResponse.getOutput()).thenReturn(CONNECTION_SPECIFICATION); @@ -272,16 +275,6 @@ void testGetDestinationSpec() throws JsonValidationException, IOException, Confi verify(configRepository).getStandardDestinationDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId()); } - @Test - public void testGetConnectorSpec() throws IOException { - final SynchronousResponse specResponse = (SynchronousResponse) jobResponse; - when(specResponse.getOutput()).thenReturn(CONNECTION_SPECIFICATION); - when(synchronousSchedulerClient.createGetSpecJob(SOURCE_DOCKER_IMAGE)) - .thenReturn(specResponse); - - assertEquals(CONNECTION_SPECIFICATION, schedulerHandler.getConnectorSpecification(SOURCE_DOCKER_IMAGE).getOutput()); - } - @Test void testCheckDestinationConnectionFromDestinationId() throws IOException, JsonValidationException, ConfigNotFoundException { final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID()); @@ -335,15 +328,16 @@ void testCheckDestinationConnectionFromUpdate() throws IOException, JsonValidati .name(destination.getName()) .destinationId(destination.getDestinationId()) .connectionConfiguration(destination.getConfiguration()); + final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition() + .withDockerRepository(DESTINATION_DOCKER_REPO) + .withDockerImageTag(DESTINATION_DOCKER_TAG) + .withDestinationDefinitionId(destination.getDestinationDefinitionId()); when(configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId())) - .thenReturn(new StandardDestinationDefinition() - .withDockerRepository(DESTINATION_DOCKER_REPO) - .withDockerImageTag(DESTINATION_DOCKER_TAG) - .withDestinationDefinitionId(destination.getDestinationDefinitionId())); + .thenReturn(destinationDefinition); when(configRepository.getDestinationConnection(destination.getDestinationId())).thenReturn(destination); when(configurationUpdate.destination(destination.getDestinationId(), destination.getName(), destinationUpdate.getConnectionConfiguration())) .thenReturn(destination); - when(specFetcher.execute(DESTINATION_DOCKER_IMAGE)).thenReturn(CONNECTION_SPECIFICATION); + when(specFetcher.getSpec(destinationDefinition)).thenReturn(CONNECTION_SPECIFICATION); final DestinationConnection submittedDestination = new DestinationConnection() .withDestinationDefinitionId(destination.getDestinationDefinitionId()) .withConfiguration(destination.getConfiguration()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java index 87fcde92f1a9..e63074131d28 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java @@ -105,7 +105,7 @@ void testCreateSource() throws JsonValidationException, ConfigNotFoundException, when(uuidGenerator.get()).thenReturn(sourceConnection.getSourceId()); when(configRepository.getSourceConnection(sourceConnection.getSourceId())).thenReturn(sourceConnection); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardSourceDefinition)).thenReturn(connectorSpecification); when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(secretsProcessor.maskSecrets(sourceCreate.getConnectionConfiguration(), sourceDefinitionSpecificationRead.getConnectionSpecification())) @@ -150,7 +150,7 @@ void testUpdateSource() throws JsonValidationException, ConfigNotFoundException, when(configRepository.getSourceConnection(sourceConnection.getSourceId())) .thenReturn(sourceConnection) .thenReturn(expectedSourceConnection); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardSourceDefinition)).thenReturn(connectorSpecification); when(configurationUpdate.source(sourceConnection.getSourceId(), updatedSourceName, newConfiguration)) .thenReturn(expectedSourceConnection); @@ -174,7 +174,7 @@ void testGetSource() throws JsonValidationException, ConfigNotFoundException, IO when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardSourceDefinition)).thenReturn(connectorSpecification); when(secretsProcessor.maskSecrets(sourceConnection.getConfiguration(), sourceDefinitionSpecificationRead.getConnectionSpecification())) .thenReturn(sourceConnection.getConfiguration()); @@ -206,7 +206,7 @@ void testListSourcesForWorkspace() throws JsonValidationException, ConfigNotFoun when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardSourceDefinition)).thenReturn(connectorSpecification); when(secretsProcessor.maskSecrets(sourceConnection.getConfiguration(), sourceDefinitionSpecificationRead.getConnectionSpecification())) .thenReturn(sourceConnection.getConfiguration()); @@ -225,7 +225,7 @@ void testSearchSources() throws JsonValidationException, ConfigNotFoundException when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardSourceDefinition)).thenReturn(connectorSpecification); when(secretsProcessor.maskSecrets(sourceConnection.getConfiguration(), sourceDefinitionSpecificationRead.getConnectionSpecification())) .thenReturn(sourceConnection.getConfiguration()); @@ -261,7 +261,7 @@ void testDeleteSource() throws JsonValidationException, ConfigNotFoundException, when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition); - when(specFetcher.execute(imageName)).thenReturn(connectorSpecification); + when(specFetcher.getSpec(standardSourceDefinition)).thenReturn(connectorSpecification); when(connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)).thenReturn(connectionReadList); when(secretsProcessor.maskSecrets(sourceConnection.getConfiguration(), sourceDefinitionSpecificationRead.getConnectionSpecification())) .thenReturn(sourceConnection.getConfiguration());