Skip to content

Commit

Permalink
Remove NormalizationRunnerFactory (#20245)
Browse files Browse the repository at this point in the history
* updated IntegrationLauncherConfig.yaml and added to this class suportDBT and normalizationImage fields. Added to the GenerateInputActivityImpl and TemporalClient classes code parts for read destination_definition.yaml and get suportDBT and normalizationImage fields. Added logging and comparing normalization images from NormalizationRunnerFactory and destination_definition.yaml

* updated minor remarks

* updated minor remarks

* fixed minor remarks

* added normalization data to the tests

* fixed minor remarks

* removed NormalizationRunnerFactory

* fixed remarks

* fixed remarks

* fixed remarks

* updated acceptance tests

* updated acceptance tests

* updated check_images_exist.sh script

* updated method for get normalization image name for destination acceptance test

* fixed code style

* fixed code style and removed tests data

* updated JobErrorReporterTest.java

* updated JobErrorReporterTest.java

* fixed remarks

* added integration type field to the dectination_definition file and actor_definition table

* fixed tests

* fixed tests

* fixed minor changes after pulling master changes

* fixed minor changes after pulling master changes

* renamed integrationType to normalizationIntegrationType/ fixed minor remarks

* renamed extra dependencies

* updated docs

* updated docs

* fixed minor remarks

* added NormalizationDestinationDefinitionConfig.yaml for StandardDestinationDefinition.yaml and updated configuration

* updated normalization tag

* updated DestinationAcceptanceTest.java

* updated DestinationAcceptanceTest.java

* updated imports and descriptions
  • Loading branch information
andriikorotkov authored Dec 15, 2022
1 parent 90c17de commit eecfafd
Show file tree
Hide file tree
Showing 30 changed files with 327 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.40.23.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.23.002", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers.normalization;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
Expand Down Expand Up @@ -46,7 +45,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
.setLogPrefix("normalization")
.setPrefixColor(Color.GREEN_BACKGROUND);

private final DestinationType destinationType;
private final String normalizationIntegrationType;
private final ProcessFactory processFactory;
private final String normalizationImageName;
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
Expand All @@ -55,31 +54,12 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private Process process = null;

public enum DestinationType {
BIGQUERY,
MSSQL,
MYSQL,
ORACLE,
POSTGRES,
REDSHIFT,
SNOWFLAKE,
CLICKHOUSE,
TIDB
}

public DefaultNormalizationRunner(final DestinationType destinationType,
final ProcessFactory processFactory,
final String normalizationImageName) {
this.destinationType = destinationType;
this.processFactory = processFactory;
this.normalizationImageName = normalizationImageName;
}

public DefaultNormalizationRunner(final ProcessFactory processFactory,
final String normalizationImage) {
final String normalizationImage,
final String normalizationIntegrationType) {
this.processFactory = processFactory;
this.normalizationImageName = normalizationImage;
this.destinationType = null;
this.normalizationIntegrationType = normalizationIntegrationType;
}

@Override
Expand All @@ -99,12 +79,12 @@ public boolean configureDbt(final String jobId,
final String gitRepoBranch = dbtConfig.getGitRepoBranch();
if (Strings.isNullOrEmpty(gitRepoBranch)) {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl);
} else {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl,
"--git-branch", gitRepoBranch);
Expand All @@ -124,7 +104,7 @@ public boolean normalize(final String jobId,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "run",
"--integration-type", destinationType.toString().toLowerCase(),
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
}
Expand Down Expand Up @@ -233,9 +213,4 @@ private String buildInternalErrorMessageFromDbtStackTrace() {
return errorMap.get(SentryExceptionHelper.ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY);
}

@VisibleForTesting
DestinationType getDestinationType() {
return destinationType;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
Expand All @@ -26,7 +27,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.ByteArrayInputStream;
Expand All @@ -49,6 +49,10 @@ class DefaultNormalizationRunnerTest {
private static final String JOB_ID = "0";
private static final int JOB_ATTEMPT = 0;

private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
private static final String NORMALIZATION_TAG = "42.42.42";
private static final String INTEGRATION_TYPE = "postgres";

private static Path logJobRoot;

static {
Expand Down Expand Up @@ -82,14 +86,14 @@ void setup() throws IOException, WorkerException {
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, false, files, null,
DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null,
workerConfigs.getResourceRequirements(),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Map.of(),
Map.of(),
"run",
"--integration-type", "bigquery",
"--integration-type", INTEGRATION_TYPE,
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME))
.thenReturn(process);
Expand All @@ -110,8 +114,7 @@ public void tearDown() throws IOException {
@Test
void test() throws Exception {
final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);

when(process.exitValue()).thenReturn(0);

Expand All @@ -122,8 +125,7 @@ void test() throws Exception {
void testLog() throws Exception {

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);

when(process.exitValue()).thenReturn(0);

Expand All @@ -145,8 +147,7 @@ void testClose() throws Exception {
when(process.isAlive()).thenReturn(true).thenReturn(false);

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements());
runner.close();

Expand All @@ -158,8 +159,7 @@ void testFailure() throws Exception {
when(process.exitValue()).thenReturn(1);

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

verify(process).waitFor();
Expand All @@ -180,8 +180,7 @@ void testFailureWithTraceMessage() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(errorTraceString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand All @@ -207,8 +206,7 @@ void testFailureWithDbtError() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand All @@ -229,8 +227,7 @@ void testFailureWithDbtErrorJsonFormat() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NormalizationDestinationDefinitionConfig.yaml
title: NormalizationDestinationDefinitionConfig
description: describes a normalization config for destination definition
type: object
required:
- normalizationRepository
- normalizationTag
- normalizationIntegrationType
additionalProperties: true
properties:
normalizationRepository:
type: string
description: a field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.
normalizationTag:
type: string
description: a field indicating the tag of the docker repository to be used for normalization.
normalizationIntegrationType:
type: string
description: a field indicating the type of integration dialect to use for normalization.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ properties:
protocolVersion:
type: string
description: the Airbyte Protocol version supported by the connector
normalizationRepository:
type: string
normalizationTag:
type: string
normalizationConfig:
"$ref": NormalizationDestinationDefinitionConfig.yaml
supportsDbt:
type: boolean
default: false
description: an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.NormalizationDestinationDefinitionConfig;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
Expand Down Expand Up @@ -271,7 +272,9 @@ private void writeOrUpdateStandardDefinition(final DSLContext ctx,
sourceDef.withProtocolVersion(getProtocolVersion(sourceDef.getSpec()));
ConfigWriter.writeStandardSourceDefinition(Collections.singletonList(sourceDef), ctx);
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class);
final NormalizationDestinationDefinitionConfig normalizationConfig = Jsons.object(definition, NormalizationDestinationDefinitionConfig.class);
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class)
.withNormalizationConfig(normalizationConfig);
destDef.withProtocolVersion(getProtocolVersion(destDef.getSpec()));
ConfigWriter.writeStandardDestinationDefinition(Collections.singletonList(destDef), ctx);
} else {
Expand Down
Loading

0 comments on commit eecfafd

Please sign in to comment.