diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 54b1c1f508a0..65566dd091a5 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -142,7 +142,7 @@ void testBootloaderAppBlankDb() throws Exception { val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); // this line should change with every new migration // to show that you meant to make a new migration to the prod database - assertEquals("0.40.23.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.40.23.002", configsMigrator.getLatestMigration().getVersion().getVersion()); val jobsPersistence = new DefaultJobPersistence(jobDatabase); assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get()); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 01c392b759b1..e96f9c81d403 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -5,7 +5,6 @@ package io.airbyte.workers.normalization; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; @@ -46,7 +45,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner { .setLogPrefix("normalization") .setPrefixColor(Color.GREEN_BACKGROUND); - private final DestinationType destinationType; + private final String normalizationIntegrationType; private final ProcessFactory processFactory; private final String normalizationImageName; private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER); @@ -55,31 +54,12 @@ public class DefaultNormalizationRunner implements NormalizationRunner { private Process process = null; - public enum DestinationType { - BIGQUERY, - MSSQL, - MYSQL, - ORACLE, - POSTGRES, - REDSHIFT, - SNOWFLAKE, - CLICKHOUSE, - TIDB - } - - public DefaultNormalizationRunner(final DestinationType destinationType, - final ProcessFactory processFactory, - final String normalizationImageName) { - this.destinationType = destinationType; - this.processFactory = processFactory; - this.normalizationImageName = normalizationImageName; - } - public DefaultNormalizationRunner(final ProcessFactory processFactory, - final String normalizationImage) { + final String normalizationImage, + final String normalizationIntegrationType) { this.processFactory = processFactory; this.normalizationImageName = normalizationImage; - this.destinationType = null; + this.normalizationIntegrationType = normalizationIntegrationType; } @Override @@ -99,12 +79,12 @@ public boolean configureDbt(final String jobId, final String gitRepoBranch = dbtConfig.getGitRepoBranch(); if (Strings.isNullOrEmpty(gitRepoBranch)) { return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt", - "--integration-type", destinationType.toString().toLowerCase(), + "--integration-type", normalizationIntegrationType.toLowerCase(), "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--git-repo", gitRepoUrl); } else { return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt", - "--integration-type", destinationType.toString().toLowerCase(), + "--integration-type", normalizationIntegrationType.toLowerCase(), "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--git-repo", gitRepoUrl, "--git-branch", gitRepoBranch); @@ -124,7 +104,7 @@ public boolean normalize(final String jobId, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "run", - "--integration-type", destinationType.toString().toLowerCase(), + "--integration-type", normalizationIntegrationType.toLowerCase(), "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME); } @@ -233,9 +213,4 @@ private String buildInternalErrorMessageFromDbtStackTrace() { return errorMap.get(SentryExceptionHelper.ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY); } - @VisibleForTesting - DestinationType getDestinationType() { - return destinationType; - } - } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java deleted file mode 100644 index c11b2662bb59..000000000000 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.normalization; - -import com.google.common.collect.ImmutableMap; -import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType; -import io.airbyte.workers.process.ProcessFactory; -import java.util.Map; -import java.util.Objects; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.ImmutablePair; - -@Slf4j -public class NormalizationRunnerFactory { - - public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.2.25"; - - static final Map> NORMALIZATION_MAPPING = - ImmutableMap.>builder() - // map destination connectors (alphabetically) to their expected normalization settings - .put("airbyte/destination-bigquery", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DefaultNormalizationRunner.DestinationType.BIGQUERY)) - .put("airbyte/destination-bigquery-denormalized", - ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DefaultNormalizationRunner.DestinationType.BIGQUERY)) - .put("airbyte/destination-clickhouse", ImmutablePair.of("airbyte/normalization-clickhouse", DestinationType.CLICKHOUSE)) - .put("airbyte/destination-clickhouse-strict-encrypt", ImmutablePair.of("airbyte/normalization-clickhouse", DestinationType.CLICKHOUSE)) - .put("airbyte/destination-mssql", ImmutablePair.of("airbyte/normalization-mssql", DestinationType.MSSQL)) - .put("airbyte/destination-mssql-strict-encrypt", ImmutablePair.of("airbyte/normalization-mssql", DestinationType.MSSQL)) - .put("airbyte/destination-mysql", ImmutablePair.of("airbyte/normalization-mysql", DestinationType.MYSQL)) - .put("airbyte/destination-mysql-strict-encrypt", ImmutablePair.of("airbyte/normalization-mysql", DestinationType.MYSQL)) - .put("airbyte/destination-oracle", ImmutablePair.of("airbyte/normalization-oracle", DestinationType.ORACLE)) - .put("airbyte/destination-oracle-strict-encrypt", ImmutablePair.of("airbyte/normalization-oracle", DestinationType.ORACLE)) - .put("airbyte/destination-postgres", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.POSTGRES)) - .put("airbyte/destination-postgres-strict-encrypt", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.POSTGRES)) - .put("airbyte/destination-redshift", ImmutablePair.of("airbyte/normalization-redshift", DestinationType.REDSHIFT)) - .put("airbyte/destination-snowflake", ImmutablePair.of("airbyte/normalization-snowflake", DestinationType.SNOWFLAKE)) - .put("airbyte/destination-tidb", ImmutablePair.of("airbyte/normalization-tidb", DestinationType.TIDB)) - .build(); - - public static NormalizationRunner create(final String connectorImageName, - final ProcessFactory processFactory, - final String normalizationVersion, - final String normalizationImage) { - final var valuePair = getNormalizationInfoForConnector(connectorImageName); - final String factoryNormalizationImage = String.format("%s:%s", valuePair.getLeft(), normalizationVersion); - if (Objects.nonNull(normalizationImage) - && !normalizationImage.equalsIgnoreCase(factoryNormalizationImage)) { - log.error( - "The normalization image name or tag in the definition file is different from the normalization image or tag in the NormalizationRunnerFactory!"); - log.error( - "the definition file value - {}, the NormalizationRunnerFactory value - {}", normalizationImage, factoryNormalizationImage); - } - return new DefaultNormalizationRunner( - valuePair.getRight(), - processFactory, - factoryNormalizationImage); - } - - public static ImmutablePair getNormalizationInfoForConnector(final String connectorImageName) { - final String imageNameWithoutTag = connectorImageName.contains(":") ? connectorImageName.split(":")[0] : connectorImageName; - if (NORMALIZATION_MAPPING.containsKey(imageNameWithoutTag)) { - return NORMALIZATION_MAPPING.get(imageNameWithoutTag); - } else { - throw new IllegalStateException( - String.format("Requested normalization for %s, but it is not included in the normalization mappings.", connectorImageName)); - } - } - -} diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index 63f798bf141c..0c3eb0633c06 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.LoggingHelper.Color; @@ -26,7 +27,6 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.exception.WorkerException; -import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; import java.io.ByteArrayInputStream; @@ -49,6 +49,10 @@ class DefaultNormalizationRunnerTest { private static final String JOB_ID = "0"; private static final int JOB_ATTEMPT = 0; + private static final String NORMALIZATION_IMAGE = "airbyte/normalization"; + private static final String NORMALIZATION_TAG = "42.42.42"; + private static final String INTEGRATION_TYPE = "postgres"; + private static Path logJobRoot; static { @@ -82,14 +86,14 @@ void setup() throws IOException, WorkerException { WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, false, files, null, + DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null, workerConfigs.getResourceRequirements(), Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP, AirbyteIntegrationLauncher.NORMALIZE_STEP), Map.of(), Map.of(), "run", - "--integration-type", "bigquery", + "--integration-type", INTEGRATION_TYPE, "--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, "--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME)) .thenReturn(process); @@ -110,8 +114,7 @@ public void tearDown() throws IOException { @Test void test() throws Exception { final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); when(process.exitValue()).thenReturn(0); @@ -122,8 +125,7 @@ void test() throws Exception { void testLog() throws Exception { final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); when(process.exitValue()).thenReturn(0); @@ -145,8 +147,7 @@ void testClose() throws Exception { when(process.isAlive()).thenReturn(true).thenReturn(false); final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()); runner.close(); @@ -158,8 +159,7 @@ void testFailure() throws Exception { when(process.exitValue()).thenReturn(1); final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); verify(process).waitFor(); @@ -180,8 +180,7 @@ void testFailureWithTraceMessage() throws Exception { when(process.getInputStream()).thenReturn(new ByteArrayInputStream(errorTraceString.getBytes(StandardCharsets.UTF_8))); final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); assertEquals(1, runner.getTraceMessages().count()); @@ -207,8 +206,7 @@ void testFailureWithDbtError() throws Exception { when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8))); final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); assertEquals(1, runner.getTraceMessages().count()); @@ -229,8 +227,7 @@ void testFailureWithDbtErrorJsonFormat() throws Exception { when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8))); final NormalizationRunner runner = - new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE); assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); assertEquals(1, runner.getTraceMessages().count()); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/NormalizationRunnerFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/NormalizationRunnerFactoryTest.java deleted file mode 100644 index bc1420c6d1a5..000000000000 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/NormalizationRunnerFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.normalization; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; - -import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType; -import io.airbyte.workers.process.ProcessFactory; -import java.util.Map.Entry; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class NormalizationRunnerFactoryTest { - - public static final String NORMALIZATION_VERSION = "dev"; - private ProcessFactory processFactory; - - @BeforeEach - void setup() { - processFactory = mock(ProcessFactory.class); - } - - @Test - void testMappings() { - for (final Entry> entry : NormalizationRunnerFactory.NORMALIZATION_MAPPING.entrySet()) { - assertEquals(entry.getValue().getValue(), - ((DefaultNormalizationRunner) NormalizationRunnerFactory.create( - String.format("%s:0.1.0", entry.getKey()), processFactory, NORMALIZATION_VERSION, String.format("%s:0.1.0", entry.getKey()))) - .getDestinationType()); - } - assertThrows(IllegalStateException.class, - () -> NormalizationRunnerFactory.create("airbyte/destination-csv:0.1.0", processFactory, - NORMALIZATION_VERSION, "airbyte/destination-csv:0.1.0")); - } - -} diff --git a/airbyte-config/config-models/src/main/resources/types/NormalizationDestinationDefinitionConfig.yaml b/airbyte-config/config-models/src/main/resources/types/NormalizationDestinationDefinitionConfig.yaml new file mode 100644 index 000000000000..69aacfa2dea8 --- /dev/null +++ b/airbyte-config/config-models/src/main/resources/types/NormalizationDestinationDefinitionConfig.yaml @@ -0,0 +1,21 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NormalizationDestinationDefinitionConfig.yaml +title: NormalizationDestinationDefinitionConfig +description: describes a normalization config for destination definition +type: object +required: + - normalizationRepository + - normalizationTag + - normalizationIntegrationType +additionalProperties: true +properties: + normalizationRepository: + type: string + description: a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used. + normalizationTag: + type: string + description: a field indicating the tag of the docker repository to be used for normalization. + normalizationIntegrationType: + type: string + description: a field indicating the type of integration dialect to use for normalization. diff --git a/airbyte-config/config-models/src/main/resources/types/StandardDestinationDefinition.yaml b/airbyte-config/config-models/src/main/resources/types/StandardDestinationDefinition.yaml index f3f37ebb3971..103530d33b50 100644 --- a/airbyte-config/config-models/src/main/resources/types/StandardDestinationDefinition.yaml +++ b/airbyte-config/config-models/src/main/resources/types/StandardDestinationDefinition.yaml @@ -58,10 +58,8 @@ properties: protocolVersion: type: string description: the Airbyte Protocol version supported by the connector - normalizationRepository: - type: string - normalizationTag: - type: string + normalizationConfig: + "$ref": NormalizationDestinationDefinitionConfig.yaml supportsDbt: type: boolean - default: false + description: an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used. diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java index 51b37e9d7795..dbcf9dc3985e 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ActorDefinitionMigrator.java @@ -18,6 +18,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; +import io.airbyte.config.NormalizationDestinationDefinitionConfig; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSourceDefinition.SourceType; @@ -271,7 +272,9 @@ private void writeOrUpdateStandardDefinition(final DSLContext ctx, sourceDef.withProtocolVersion(getProtocolVersion(sourceDef.getSpec())); ConfigWriter.writeStandardSourceDefinition(Collections.singletonList(sourceDef), ctx); } else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) { - final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class); + final NormalizationDestinationDefinitionConfig normalizationConfig = Jsons.object(definition, NormalizationDestinationDefinitionConfig.class); + final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class) + .withNormalizationConfig(normalizationConfig); destDef.withProtocolVersion(getProtocolVersion(destDef.getSpec())); ConfigWriter.writeStandardDestinationDefinition(Collections.singletonList(destDef), ctx); } else { diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java index cb836f63fbb9..00640eadfd9b 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigWriter.java @@ -19,11 +19,8 @@ import io.airbyte.db.instance.configs.jooq.generated.enums.SourceType; import java.time.LocalDate; import java.time.OffsetDateTime; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; import org.jooq.DSLContext; @@ -175,9 +172,19 @@ static void writeStandardDestinationDefinition(final List runJob() throws Exception { Math.toIntExact(jobRunConfig.getAttemptId()), workerConfigs.getResourceRequirements(), new DbtTransformationRunner( - processFactory, NormalizationRunnerFactory.create( - destinationLauncherConfig.getDockerImage(), + processFactory, new DefaultNormalizationRunner( processFactory, - NormalizationRunnerFactory.NORMALIZATION_VERSION, - destinationLauncherConfig.getNormalizationDockerImage()))); + destinationLauncherConfig.getNormalizationDockerImage(), + destinationLauncherConfig.getNormalizationIntegrationType()))); log.info("Running dbt worker..."); final Path jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(), diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/NormalizationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/NormalizationJobOrchestrator.java index aec91a39b78b..5c0b2e178ad2 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/NormalizationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/NormalizationJobOrchestrator.java @@ -18,7 +18,7 @@ import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.general.DefaultNormalizationWorker; -import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.airbyte.workers.normalization.DefaultNormalizationRunner; import io.airbyte.workers.normalization.NormalizationWorker; import io.airbyte.workers.process.KubePodProcess; import io.airbyte.workers.process.ProcessFactory; @@ -70,11 +70,10 @@ public Optional runJob() throws Exception { final NormalizationWorker normalizationWorker = new DefaultNormalizationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), - NormalizationRunnerFactory.create( - destinationLauncherConfig.getDockerImage(), + new DefaultNormalizationRunner( processFactory, - NormalizationRunnerFactory.NORMALIZATION_VERSION, - destinationLauncherConfig.getNormalizationDockerImage()), + destinationLauncherConfig.getNormalizationDockerImage(), + destinationLauncherConfig.getNormalizationIntegrationType()), configs.getWorkerEnvironment()); log.info("Running normalization worker..."); diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_23_002__AddNormalizationIntegrationTypeToActorDefinition.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_23_002__AddNormalizationIntegrationTypeToActorDefinition.java new file mode 100644 index 000000000000..1f8f1726c897 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_40_23_002__AddNormalizationIntegrationTypeToActorDefinition.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_40_23_002__AddNormalizationIntegrationTypeToActorDefinition extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger( + V0_40_23_002__AddNormalizationIntegrationTypeToActorDefinition.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + final DSLContext ctx = DSL.using(context.getConnection()); + addIntegrationTypeColumn(ctx); + } + + static void addIntegrationTypeColumn(final DSLContext ctx) { + ctx.alterTable("actor_definition") + .addColumnIfNotExists(DSL.field( + "normalization_integration_type", + SQLDataType.VARCHAR(255).nullable(true))) + .execute(); + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/configs_database/normalized_tables_schema.txt b/airbyte-db/db-lib/src/main/resources/configs_database/normalized_tables_schema.txt index fa9c708d43d9..82533ef0e397 100644 --- a/airbyte-db/db-lib/src/main/resources/configs_database/normalized_tables_schema.txt +++ b/airbyte-db/db-lib/src/main/resources/configs_database/normalized_tables_schema.txt @@ -71,26 +71,27 @@ Referenced by: Table "public.actor_definition" - Column | Type | Collation | Nullable | Default -----------------------------+--------------------------+-----------+----------+------------------- - id | uuid | | not null | - name | character varying(256) | | not null | - docker_repository | character varying(256) | | not null | - docker_image_tag | character varying(256) | | not null | - documentation_url | character varying(256) | | | - icon | character varying(256) | | | - actor_type | actor_type | | not null | - source_type | source_type | | | - spec | jsonb | | not null | - created_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP - updated_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP - tombstone | boolean | | not null | false - release_stage | release_stage | | | - release_date | date | | | - resource_requirements | jsonb | | | - normalization_repository | character varying(255) | | | - normalization_tag | character varying(255) | | | - supports_dbt | boolean | | | + Column | Type | Collation | Nullable | Default +--------------------------------+--------------------------+-----------+----------+------------------- + id | uuid | | not null | + name | character varying(256) | | not null | + docker_repository | character varying(256) | | not null | + docker_image_tag | character varying(256) | | not null | + documentation_url | character varying(256) | | | + icon | character varying(256) | | | + actor_type | actor_type | | not null | + source_type | source_type | | | + spec | jsonb | | not null | + created_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP + updated_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP + tombstone | boolean | | not null | false + release_stage | release_stage | | | + release_date | date | | | + resource_requirements | jsonb | | | + normalization_repository | character varying(255) | | | + normalization_tag | character varying(255) | | | + supports_dbt | boolean | | | + normalization_integration_type | character varying(255) | | | Indexes: "actor_definition_pkey" PRIMARY KEY, btree (id) Referenced by: diff --git a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt index b6d17a880f00..4e3b80eae471 100644 --- a/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt @@ -57,6 +57,7 @@ create table "public"."actor_definition"( "normalization_repository" varchar(255) null, "normalization_tag" varchar(255) null, "supports_dbt" bool null, + "normalization_integration_type" varchar(255) null, constraint "actor_definition_pkey" primary key ("id") ); diff --git a/airbyte-integrations/bases/standard-destination-test/build.gradle b/airbyte-integrations/bases/standard-destination-test/build.gradle index d69a666b31fa..bb98faae7b65 100644 --- a/airbyte-integrations/bases/standard-destination-test/build.gradle +++ b/airbyte-integrations/bases/standard-destination-test/build.gradle @@ -5,6 +5,8 @@ dependencies { implementation project(':airbyte-db:db-lib') implementation project(':airbyte-commons-worker') implementation project(':airbyte-config:config-models') + implementation project(':airbyte-config:init') + implementation project(':airbyte-json-validation') implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-protocol:protocol-models') implementation project(':airbyte-commons-worker') diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 6987641a29a3..04efcf05381a 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -30,7 +30,9 @@ import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.WorkerDestinationConfig; +import io.airbyte.config.init.LocalDefinitionsProvider; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.standardtest.destination.argproviders.DataArgumentsProvider; import io.airbyte.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider; @@ -57,8 +59,8 @@ import io.airbyte.workers.helper.EntrypointEnvChecker; import io.airbyte.workers.internal.AirbyteDestination; import io.airbyte.workers.internal.DefaultAirbyteDestination; +import io.airbyte.workers.normalization.DefaultNormalizationRunner; import io.airbyte.workers.normalization.NormalizationRunner; -import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.DockerProcessFactory; import io.airbyte.workers.process.ProcessFactory; @@ -72,6 +74,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -121,6 +124,29 @@ public abstract class DestinationAcceptanceTest { */ protected abstract String getImageName(); + private String getImageNameWithoutTag() { + return getImageName().contains(":") ? getImageName().split(":")[0] : getImageName(); + } + + private Optional getOptionalDestinationDefinitionFromProvider(final String imageNameWithoutTag) { + try { + LocalDefinitionsProvider provider = new LocalDefinitionsProvider(LocalDefinitionsProvider.DEFAULT_SEED_DEFINITION_RESOURCE_CLASS); + return provider.getDestinationDefinitions().stream() + .filter(definition -> imageNameWithoutTag.equalsIgnoreCase(definition.getDockerRepository())) + .findFirst(); + } catch (IOException e) { + return Optional.empty(); + } + } + + protected String getNormalizationImageName() { + return getOptionalDestinationDefinitionFromProvider(getImageNameWithoutTag()) + .filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig())) + .map(standardDestinationDefinition -> standardDestinationDefinition.getNormalizationConfig().getNormalizationRepository() + ":" + + NORMALIZATION_VERSION) + .orElse(null); + } + /** * Configuration specific to the integration. Will be passed to integration where appropriate in * each test. Should be valid. @@ -200,24 +226,26 @@ protected boolean implementsAppend() throws WorkerException { } } - protected boolean normalizationFromSpec() throws Exception { - final ConnectorSpecification spec = runSpec(); - assertNotNull(spec); - if (spec.getSupportsNormalization() != null) { - return spec.getSupportsNormalization(); - } else { - return false; - } + protected boolean normalizationFromDefinition() { + return getOptionalDestinationDefinitionFromProvider(getImageNameWithoutTag()) + .filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig())) + .map(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig().getNormalizationRepository()) + && Objects.nonNull(standardDestinationDefinition.getNormalizationConfig().getNormalizationTag())) + .orElse(false); } - protected boolean dbtFromSpec() throws WorkerException { - final ConnectorSpecification spec = runSpec(); - assertNotNull(spec); - if (spec.getSupportsDBT() != null) { - return spec.getSupportsDBT(); - } else { - return false; - } + protected boolean dbtFromDefinition() { + return getOptionalDestinationDefinitionFromProvider(getImageNameWithoutTag()) + .map(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getSupportsDbt()) + && standardDestinationDefinition.getSupportsDbt()) + .orElse(false); + } + + protected String getNormalizationIntegrationType() { + return getOptionalDestinationDefinitionFromProvider(getImageNameWithoutTag()) + .filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig())) + .map(standardDestinationDefinition -> standardDestinationDefinition.getNormalizationConfig().getNormalizationIntegrationType()) + .orElse(null); } /** @@ -278,7 +306,7 @@ protected boolean implementsRecordSizeLimitChecks() { * Same idea as {@link #retrieveRecords(TestDestinationEnv, String, String, JsonNode)}. Except this * method should pull records from the table that contains the normalized records and convert them * back into the data as it would appear in an {@link AirbyteRecordMessage}. Only need to override - * this method if {@link #normalizationFromSpec} returns true. + * this method if {@link #normalizationFromDefinition} returns true. * * @param testEnv - information about the test environment. * @param streamName - name of the stream for which we are retrieving records. @@ -537,23 +565,26 @@ public void testLineBreakCharacters() throws Exception { @Test public void specNormalizationValueShouldBeCorrect() throws Exception { - final boolean normalizationFromSpec = normalizationFromSpec(); - assertEquals(normalizationFromSpec, supportsNormalization()); - if (normalizationFromSpec) { + final boolean normalizationFromDefinition = normalizationFromDefinition(); + assertEquals(normalizationFromDefinition, supportsNormalization()); + if (normalizationFromDefinition) { boolean normalizationRunnerFactorySupportsDestinationImage; try { - NormalizationRunnerFactory.create(getImageName(), processFactory, NORMALIZATION_VERSION, ""); + new DefaultNormalizationRunner( + processFactory, + getNormalizationImageName(), + getNormalizationIntegrationType()); normalizationRunnerFactorySupportsDestinationImage = true; } catch (final IllegalStateException e) { normalizationRunnerFactorySupportsDestinationImage = false; } - assertEquals(normalizationFromSpec, normalizationRunnerFactorySupportsDestinationImage); + assertEquals(normalizationFromDefinition, normalizationRunnerFactorySupportsDestinationImage); } } @Test - public void specDBTValueShouldBeCorrect() throws WorkerException { - assertEquals(dbtFromSpec(), supportsDBT()); + public void specDBTValueShouldBeCorrect() { + assertEquals(dbtFromDefinition(), supportsDBT()); } /** @@ -622,7 +653,7 @@ public void testIncrementalSync() throws Exception { @ArgumentsSource(DataArgumentsProvider.class) public void testSyncWithNormalization(final String messagesFilename, final String catalogFilename) throws Exception { - if (!normalizationFromSpec()) { + if (!normalizationFromDefinition()) { return; } @@ -829,7 +860,7 @@ protected int getMaxRecordValueLimit() { @Test public void testCustomDbtTransformations() throws Exception { - if (!dbtFromSpec()) { + if (!dbtFromDefinition()) { return; } @@ -845,10 +876,10 @@ public void testCustomDbtTransformations() throws Exception { // 'profiles.yml' // (we don't actually rely on normalization running anything else here though) final DbtTransformationRunner runner = new DbtTransformationRunner(processFactory, - NormalizationRunnerFactory.create( - getImageName(), + new DefaultNormalizationRunner( processFactory, - NORMALIZATION_VERSION, "")); + getNormalizationImageName(), + getNormalizationIntegrationType())); runner.start(); final Path transformationRoot = Files.createDirectories(jobRoot.resolve("transform")); final OperatorDbt dbtConfig = new OperatorDbt() @@ -861,9 +892,7 @@ public void testCustomDbtTransformations() throws Exception { // TODO once we're on DBT 1.x, switch this back to using the main branch .withGitRepoUrl("https://github.com/airbytehq/jaffle_shop.git") .withGitRepoBranch("pre_dbt_upgrade") - .withDockerImage( - NormalizationRunnerFactory.getNormalizationInfoForConnector(getImageName()).getLeft() - + ":" + NORMALIZATION_VERSION); + .withDockerImage(getNormalizationImageName()); // // jaffle_shop is a fictional ecommerce store maintained by fishtownanalytics/dbt. // @@ -913,7 +942,7 @@ public void testCustomDbtTransformations() throws Exception { @Test void testCustomDbtTransformationsFailure() throws Exception { - if (!normalizationFromSpec() || !dbtFromSpec()) { + if (!normalizationFromDefinition() || !dbtFromDefinition()) { // we require normalization implementation for this destination, because we make sure to install // required dbt dependency in the normalization docker image in order to run this test successfully // (we don't actually rely on normalization running anything here though) @@ -923,10 +952,10 @@ void testCustomDbtTransformationsFailure() throws Exception { final JsonNode config = getConfig(); final DbtTransformationRunner runner = new DbtTransformationRunner(processFactory, - NormalizationRunnerFactory.create( - getImageName(), + new DefaultNormalizationRunner( processFactory, - NORMALIZATION_VERSION, "")); + getNormalizationImageName(), + getNormalizationIntegrationType())); runner.start(); final Path transformationRoot = Files.createDirectories(jobRoot.resolve("transform")); final OperatorDbt dbtConfig = new OperatorDbt() @@ -1270,10 +1299,10 @@ private List runSync( return destinationOutput; } - final NormalizationRunner runner = NormalizationRunnerFactory.create( - getImageName(), + final NormalizationRunner runner = new DefaultNormalizationRunner( processFactory, - NORMALIZATION_VERSION, ""); + getNormalizationImageName(), + getNormalizationIntegrationType()); runner.start(); final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize")); if (!runner.normalize(JOB_ID, JOB_ATTEMPT, normalizationRoot, diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java index e73187ed00a8..dd89ebc2d955 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestinationAcceptanceTest.java @@ -43,7 +43,7 @@ public void testSyncWithNormalizationWithKeyPairEncrypt(final String messagesFil } private void testSyncWithNormalizationWithKeyPairAuth(String messagesFilename, String catalogFilename, String configName) throws Exception { - if (!normalizationFromSpec()) { + if (!normalizationFromDefinition()) { return; } diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReporter.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReporter.java index 64d9996bf3d3..71544885062f 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReporter.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/errorreporter/JobErrorReporter.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableSet; import edu.umd.cs.findbugs.annotations.Nullable; +import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.map.MoreMaps; import io.airbyte.config.AttemptFailureSummary; @@ -55,24 +56,18 @@ public class JobErrorReporter { private final ConfigRepository configRepository; private final DeploymentMode deploymentMode; private final String airbyteVersion; - private final String normalizationImage; - private final String normalizationVersion; private final WebUrlHelper webUrlHelper; private final JobErrorReportingClient jobErrorReportingClient; public JobErrorReporter(final ConfigRepository configRepository, final DeploymentMode deploymentMode, final String airbyteVersion, - final String normalizationImage, - final String normalizationVersion, final WebUrlHelper webUrlHelper, final JobErrorReportingClient jobErrorReportingClient) { this.configRepository = configRepository; this.deploymentMode = deploymentMode; this.airbyteVersion = airbyteVersion; - this.normalizationImage = normalizationImage; - this.normalizationVersion = normalizationVersion; this.webUrlHelper = webUrlHelper; this.jobErrorReportingClient = jobErrorReportingClient; } @@ -119,10 +114,11 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu // the destination) final Map metadata = MoreMaps.merge( commonMetadata, - getNormalizationMetadata(), + getNormalizationMetadata(destinationDefinition.getNormalizationConfig().getNormalizationRepository()), prefixConnectorMetadataKeys(getSourceMetadata(sourceDefinition), "source"), getDestinationMetadata(destinationDefinition)); - final String dockerImage = String.format("%s:%s", normalizationImage, normalizationVersion); + final String dockerImage = DockerUtils.getTaggedImageName(destinationDefinition.getNormalizationConfig().getNormalizationRepository(), + destinationDefinition.getNormalizationConfig().getNormalizationTag()); reportJobFailureReason(workspace, failureReason, dockerImage, metadata); } @@ -229,7 +225,7 @@ private Map getSourceMetadata(final StandardSourceDefinition sou Map.entry(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value())); } - private Map getNormalizationMetadata() { + private Map getNormalizationMetadata(String normalizationImage) { return Map.ofEntries( Map.entry(NORMALIZATION_REPOSITORY_META_KEY, normalizationImage)); } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/errorreporter/JobErrorReporterTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/errorreporter/JobErrorReporterTest.java index ae97e8fbeedb..7f63364c3665 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/errorreporter/JobErrorReporterTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/errorreporter/JobErrorReporterTest.java @@ -12,6 +12,7 @@ import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; import io.airbyte.config.Metadata; +import io.airbyte.config.NormalizationDestinationDefinitionConfig; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardWorkspace; @@ -38,7 +39,8 @@ class JobErrorReporterTest { private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS; private static final String AIRBYTE_VERSION = "0.1.40"; private static final String NORMALIZATION_IMAGE = "airbyte/normalization"; - private static final String NORMALIZATION_VERSION = "0.2.18"; + private static final String NORMALIZATION_VERSION = "0.2.24"; + private static final String NORMALIZATION_INTEGRATION_TYPE = "snowflake"; private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); private static final String SOURCE_DEFINITION_NAME = "stripe"; private static final String SOURCE_DOCKER_REPOSITORY = "airbyte/source-stripe"; @@ -85,7 +87,7 @@ void setup() { jobErrorReportingClient = mock(JobErrorReportingClient.class); webUrlHelper = mock(WebUrlHelper.class); jobErrorReporter = new JobErrorReporter( - configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, NORMALIZATION_IMAGE, NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient); + configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, webUrlHelper, jobErrorReportingClient); Mockito.when(webUrlHelper.getConnectionUrl(WORKSPACE_ID, CONNECTION_ID)).thenReturn(CONNECTION_URL); Mockito.when(webUrlHelper.getWorkspaceUrl(WORKSPACE_ID)).thenReturn(WORKSPACE_URL); @@ -138,6 +140,10 @@ void testReportSyncJobFailure() { .withDockerRepository(DESTINATION_DOCKER_REPOSITORY) .withReleaseStage(DESTINATION_RELEASE_STAGE) .withDestinationDefinitionId(DESTINATION_DEFINITION_ID) + .withNormalizationConfig(new NormalizationDestinationDefinitionConfig() + .withNormalizationTag(NORMALIZATION_VERSION) + .withNormalizationRepository(NORMALIZATION_IMAGE) + .withNormalizationIntegrationType(NORMALIZATION_INTEGRATION_TYPE)) .withName(DESTINATION_DEFINITION_NAME)); final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 718b52985e71..611466a02c6d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -73,7 +73,6 @@ import io.airbyte.server.services.AirbyteGithubStore; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.workers.helper.ConnectionHelper; -import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.temporal.serviceclient.WorkflowServiceStubs; import java.net.http.HttpClient; import java.util.Map; @@ -224,8 +223,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, - NormalizationRunnerFactory.NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java index 8a4b14d6660d..4b07f3bda272 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java @@ -251,8 +251,7 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe .withName(currentDestination.getName()) .withDocumentationUrl(currentDestination.getDocumentationUrl()) .withIcon(currentDestination.getIcon()) - .withNormalizationRepository(currentDestination.getNormalizationRepository()) - .withNormalizationTag(currentDestination.getNormalizationTag()) + .withNormalizationConfig(currentDestination.getNormalizationConfig()) .withSupportsDbt(currentDestination.getSupportsDbt()) .withSpec(spec) .withProtocolVersion(airbyteProtocolVersion.serialize()) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index d53044e95596..cb68ad2b8205 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -10,12 +10,38 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.airbyte.api.model.generated.*; +import io.airbyte.api.model.generated.AttemptInfoRead; +import io.airbyte.api.model.generated.AttemptNormalizationStatusRead; +import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.model.generated.AttemptRead; +import io.airbyte.api.model.generated.ConnectionRead; +import io.airbyte.api.model.generated.DestinationIdRequestBody; +import io.airbyte.api.model.generated.DestinationRead; +import io.airbyte.api.model.generated.JobConfigType; +import io.airbyte.api.model.generated.JobDebugInfoRead; +import io.airbyte.api.model.generated.JobDebugRead; +import io.airbyte.api.model.generated.JobIdRequestBody; +import io.airbyte.api.model.generated.JobInfoLightRead; +import io.airbyte.api.model.generated.JobInfoRead; +import io.airbyte.api.model.generated.JobListRequestBody; +import io.airbyte.api.model.generated.JobRead; +import io.airbyte.api.model.generated.JobReadList; +import io.airbyte.api.model.generated.JobWithAttemptsRead; +import io.airbyte.api.model.generated.LogRead; +import io.airbyte.api.model.generated.Pagination; +import io.airbyte.api.model.generated.SourceIdRequestBody; +import io.airbyte.api.model.generated.SourceRead; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.version.AirbyteVersion; -import io.airbyte.config.*; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobCheckConnectionConfig; +import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; @@ -34,7 +60,12 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Path; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; diff --git a/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml b/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml index 1c3d326144cd..e8035d05b6ab 100644 --- a/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml +++ b/airbyte-worker-models/src/main/resources/workers_models/IntegrationLauncherConfig.yaml @@ -21,6 +21,8 @@ properties: supportsDbt: type: boolean default: false + normalizationIntegrationType: + type: string protocolVersion: type: object existingJavaType: io.airbyte.commons.version.Version diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java index f9f6c8aeb03d..44b1e323408f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/JobErrorReportingBeanFactory.java @@ -13,7 +13,6 @@ import io.airbyte.persistence.job.errorreporter.LoggingJobErrorReportingClient; import io.airbyte.persistence.job.errorreporter.SentryExceptionHelper; import io.airbyte.persistence.job.errorreporter.SentryJobErrorReportingClient; -import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; @@ -59,8 +58,6 @@ public JobErrorReporter jobErrorReporter( configRepository, deploymentMode, airbyteVersion, - NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, - NormalizationRunnerFactory.NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient.orElseGet(() -> new LoggingJobErrorReportingClient())); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 97e1f074a0f6..2a550a62eb36 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -31,6 +31,7 @@ import jakarta.inject.Singleton; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; @Singleton @@ -95,9 +96,19 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .equalsIgnoreCase( DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), destinationDefinition.getDockerImageTag()))) .findFirst(); - final String destinationNormalizationDockerImage = optionalDestinationDefinition.map(standardDestinationDefinition -> String.format("%s:%s", - standardDestinationDefinition.getNormalizationRepository(), standardDestinationDefinition.getNormalizationTag())).orElse(null); - final boolean supportDbt = optionalDestinationDefinition.isPresent() ? optionalDestinationDefinition.get().getSupportsDbt() : false; + final String destinationNormalizationDockerImage = optionalDestinationDefinition + .filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig())) + .map(standardDestinationDefinition -> String.format("%s:%s", + standardDestinationDefinition.getNormalizationConfig().getNormalizationRepository(), + standardDestinationDefinition.getNormalizationConfig().getNormalizationTag())) + .orElse(null); + final boolean supportstDbt = optionalDestinationDefinition.isPresent() && Objects.nonNull(optionalDestinationDefinition.get().getSupportsDbt()) + ? optionalDestinationDefinition.get().getSupportsDbt() + : false; + final String normalizationIntegrationType = optionalDestinationDefinition + .filter(standardDestinationDefinition -> Objects.nonNull(standardDestinationDefinition.getNormalizationConfig())) + .map(standardDestinationDefinition -> standardDestinationDefinition.getNormalizationConfig().getNormalizationIntegrationType()) + .orElse(null); final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig() .withJobId(String.valueOf(jobId)) @@ -113,7 +124,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withProtocolVersion(config.getDestinationProtocolVersion()) .withIsCustomConnector(config.getIsDestinationCustomConnector()) .withNormalizationDockerImage(destinationNormalizationDockerImage) - .withSupportsDbt(supportDbt); + .withSupportsDbt(supportstDbt) + .withNormalizationIntegrationType(normalizationIntegrationType); final StandardSyncInput syncInput = new StandardSyncInput() .withNamespaceDefinition(config.getNamespaceDefinition()) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index 447174e223cf..e0ca2929904a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -31,7 +31,7 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.general.DbtTransformationRunner; import io.airbyte.workers.general.DbtTransformationWorker; -import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.airbyte.workers.normalization.DefaultNormalizationRunner; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.sync.DbtLauncherWorker; import io.airbyte.workers.temporal.TemporalAttemptExecution; @@ -142,11 +142,10 @@ private CheckedSupplier, Exception> getLegacyWork Math.toIntExact(jobRunConfig.getAttemptId()), resourceRequirements, new DbtTransformationRunner( - processFactory, NormalizationRunnerFactory.create( - destinationLauncherConfig.getDockerImage(), + processFactory, new DefaultNormalizationRunner( processFactory, - NormalizationRunnerFactory.NORMALIZATION_VERSION, - destinationLauncherConfig.getNormalizationDockerImage()))); + destinationLauncherConfig.getNormalizationDockerImage(), + destinationLauncherConfig.getNormalizationIntegrationType()))); } private CheckedSupplier, Exception> getContainerLauncherWorkerFactory( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 89ca22f61eed..a498162b8eef 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -33,7 +33,7 @@ import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.general.DefaultNormalizationWorker; -import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.airbyte.workers.normalization.DefaultNormalizationRunner; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.sync.NormalizationLauncherWorker; import io.airbyte.workers.temporal.TemporalAttemptExecution; @@ -150,11 +150,10 @@ private CheckedSupplier, Except return () -> new DefaultNormalizationWorker( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), - NormalizationRunnerFactory.create( - destinationLauncherConfig.getDockerImage(), + new DefaultNormalizationRunner( processFactory, - NormalizationRunnerFactory.NORMALIZATION_VERSION, - destinationLauncherConfig.getNormalizationDockerImage()), + destinationLauncherConfig.getNormalizationDockerImage(), + destinationLauncherConfig.getNormalizationIntegrationType()), workerEnvironment); } diff --git a/tools/bin/check_images_exist.sh b/tools/bin/check_images_exist.sh index 1ffbf8195d0c..2098a9c73218 100755 --- a/tools/bin/check_images_exist.sh +++ b/tools/bin/check_images_exist.sh @@ -82,16 +82,18 @@ checkPlatformImages() { checkNormalizationImages() { echo -e "$blue_text""Checking Normalization images exist...""$default_text" - # the only way to know what version of normalization the platform is using is looking in NormalizationRunnerFactory. local image_version; - factory_path=airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java + definition_file_path=airbyte-config/init/src/main/resources/seed/destination_definitions.yaml # -f True if file exists and is a regular file - if ! test -f $factory_path; then - echo -e "$red_text""No NormalizationRunnerFactory found at path! H4LP!!!""$default_text" + if ! test -f $definition_file_path; then + echo -e "$red_text""Destination definition file not found at path! H4LP!!!""$default_text" fi - image_version=$(cat $factory_path | grep 'NORMALIZATION_VERSION =' | cut -d"=" -f2 | sed 's:;::' | sed -e 's:"::g' | sed -e 's:[[:space:]]::g') - echo -e "$blue_text""Checking normalization images with version $image_version exist...""$default_text" - VERSION=$image_version + normalization_image_versions=$(cat $definition_file_path | grep 'normalizationTag:' | cut -d":" -f2 | sed 's:;::' | sed -e 's:"::g' | sed -e 's:[[:space:]]::g') + IFS=' ' read -r -a array <<< "$normalization_image_versions" + # Get the first value of the normalization tag + normalization_image=${array[0]} + echo -e "$blue_text""Checking normalization images with version $normalization_image exist...""$default_text" + VERSION=$normalization_image check_compose_image_exist airbyte-integrations/bases/base-normalization/docker-compose.yaml $VERSION }