Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Remove NormalizationRunnerFactory" #20560

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.40.23.002", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.23.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.normalization;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
Expand Down Expand Up @@ -45,7 +46,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
.setLogPrefix("normalization")
.setPrefixColor(Color.GREEN_BACKGROUND);

private final String normalizationIntegrationType;
private final DestinationType destinationType;
private final ProcessFactory processFactory;
private final String normalizationImageName;
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
Expand All @@ -54,12 +55,31 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private Process process = null;

public enum DestinationType {
BIGQUERY,
MSSQL,
MYSQL,
ORACLE,
POSTGRES,
REDSHIFT,
SNOWFLAKE,
CLICKHOUSE,
TIDB
}

public DefaultNormalizationRunner(final DestinationType destinationType,
final ProcessFactory processFactory,
final String normalizationImageName) {
this.destinationType = destinationType;
this.processFactory = processFactory;
this.normalizationImageName = normalizationImageName;
}

public DefaultNormalizationRunner(final ProcessFactory processFactory,
final String normalizationImage,
final String normalizationIntegrationType) {
final String normalizationImage) {
this.processFactory = processFactory;
this.normalizationImageName = normalizationImage;
this.normalizationIntegrationType = normalizationIntegrationType;
this.destinationType = null;
}

@Override
Expand All @@ -79,12 +99,12 @@ public boolean configureDbt(final String jobId,
final String gitRepoBranch = dbtConfig.getGitRepoBranch();
if (Strings.isNullOrEmpty(gitRepoBranch)) {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl);
} else {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl,
"--git-branch", gitRepoBranch);
Expand All @@ -104,7 +124,7 @@ public boolean normalize(final String jobId,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "run",
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
}
Expand Down Expand Up @@ -213,4 +233,9 @@ private String buildInternalErrorMessageFromDbtStackTrace() {
return errorMap.get(SentryExceptionHelper.ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY);
}

@VisibleForTesting
DestinationType getDestinationType() {
return destinationType;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.normalization;

import com.google.common.collect.ImmutableMap;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.ProcessFactory;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;

@Slf4j
public class NormalizationRunnerFactory {

public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization";
public static final String NORMALIZATION_VERSION = "0.2.25";

static final Map<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>> NORMALIZATION_MAPPING =
ImmutableMap.<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>>builder()
// map destination connectors (alphabetically) to their expected normalization settings
.put("airbyte/destination-bigquery", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DefaultNormalizationRunner.DestinationType.BIGQUERY))
.put("airbyte/destination-bigquery-denormalized",
ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DefaultNormalizationRunner.DestinationType.BIGQUERY))
.put("airbyte/destination-clickhouse", ImmutablePair.of("airbyte/normalization-clickhouse", DestinationType.CLICKHOUSE))
.put("airbyte/destination-clickhouse-strict-encrypt", ImmutablePair.of("airbyte/normalization-clickhouse", DestinationType.CLICKHOUSE))
.put("airbyte/destination-mssql", ImmutablePair.of("airbyte/normalization-mssql", DestinationType.MSSQL))
.put("airbyte/destination-mssql-strict-encrypt", ImmutablePair.of("airbyte/normalization-mssql", DestinationType.MSSQL))
.put("airbyte/destination-mysql", ImmutablePair.of("airbyte/normalization-mysql", DestinationType.MYSQL))
.put("airbyte/destination-mysql-strict-encrypt", ImmutablePair.of("airbyte/normalization-mysql", DestinationType.MYSQL))
.put("airbyte/destination-oracle", ImmutablePair.of("airbyte/normalization-oracle", DestinationType.ORACLE))
.put("airbyte/destination-oracle-strict-encrypt", ImmutablePair.of("airbyte/normalization-oracle", DestinationType.ORACLE))
.put("airbyte/destination-postgres", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.POSTGRES))
.put("airbyte/destination-postgres-strict-encrypt", ImmutablePair.of(BASE_NORMALIZATION_IMAGE_NAME, DestinationType.POSTGRES))
.put("airbyte/destination-redshift", ImmutablePair.of("airbyte/normalization-redshift", DestinationType.REDSHIFT))
.put("airbyte/destination-snowflake", ImmutablePair.of("airbyte/normalization-snowflake", DestinationType.SNOWFLAKE))
.put("airbyte/destination-tidb", ImmutablePair.of("airbyte/normalization-tidb", DestinationType.TIDB))
.build();

public static NormalizationRunner create(final String connectorImageName,
final ProcessFactory processFactory,
final String normalizationVersion,
final String normalizationImage) {
final var valuePair = getNormalizationInfoForConnector(connectorImageName);
final String factoryNormalizationImage = String.format("%s:%s", valuePair.getLeft(), normalizationVersion);
if (Objects.nonNull(normalizationImage)
&& !normalizationImage.equalsIgnoreCase(factoryNormalizationImage)) {
log.error(
"The normalization image name or tag in the definition file is different from the normalization image or tag in the NormalizationRunnerFactory!");
log.error(
"the definition file value - {}, the NormalizationRunnerFactory value - {}", normalizationImage, factoryNormalizationImage);
}
return new DefaultNormalizationRunner(
valuePair.getRight(),
processFactory,
factoryNormalizationImage);
}

public static ImmutablePair<String, DestinationType> getNormalizationInfoForConnector(final String connectorImageName) {
final String imageNameWithoutTag = connectorImageName.contains(":") ? connectorImageName.split(":")[0] : connectorImageName;
if (NORMALIZATION_MAPPING.containsKey(imageNameWithoutTag)) {
return NORMALIZATION_MAPPING.get(imageNameWithoutTag);
} else {
throw new IllegalStateException(
String.format("Requested normalization for %s, but it is not included in the normalization mappings.", connectorImageName));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
Expand All @@ -27,6 +26,7 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.ByteArrayInputStream;
Expand All @@ -49,10 +49,6 @@ class DefaultNormalizationRunnerTest {
private static final String JOB_ID = "0";
private static final int JOB_ATTEMPT = 0;

private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
private static final String NORMALIZATION_TAG = "42.42.42";
private static final String INTEGRATION_TYPE = "postgres";

private static Path logJobRoot;

static {
Expand Down Expand Up @@ -86,14 +82,14 @@ void setup() throws IOException, WorkerException {
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, false, files, null,
workerConfigs.getResourceRequirements(),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Map.of(),
Map.of(),
"run",
"--integration-type", INTEGRATION_TYPE,
"--integration-type", "bigquery",
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME))
.thenReturn(process);
Expand All @@ -114,7 +110,8 @@ public void tearDown() throws IOException {
@Test
void test() throws Exception {
final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);

when(process.exitValue()).thenReturn(0);

Expand All @@ -125,7 +122,8 @@ void test() throws Exception {
void testLog() throws Exception {

final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);

when(process.exitValue()).thenReturn(0);

Expand All @@ -147,7 +145,8 @@ void testClose() throws Exception {
when(process.isAlive()).thenReturn(true).thenReturn(false);

final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements());
runner.close();

Expand All @@ -159,7 +158,8 @@ void testFailure() throws Exception {
when(process.exitValue()).thenReturn(1);

final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

verify(process).waitFor();
Expand All @@ -180,7 +180,8 @@ void testFailureWithTraceMessage() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(errorTraceString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand All @@ -206,7 +207,8 @@ void testFailureWithDbtError() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand All @@ -227,7 +229,8 @@ void testFailureWithDbtErrorJsonFormat() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.normalization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.ProcessFactory;
import java.util.Map.Entry;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class NormalizationRunnerFactoryTest {

public static final String NORMALIZATION_VERSION = "dev";
private ProcessFactory processFactory;

@BeforeEach
void setup() {
processFactory = mock(ProcessFactory.class);
}

@Test
void testMappings() {
for (final Entry<String, ImmutablePair<String, DestinationType>> entry : NormalizationRunnerFactory.NORMALIZATION_MAPPING.entrySet()) {
assertEquals(entry.getValue().getValue(),
((DefaultNormalizationRunner) NormalizationRunnerFactory.create(
String.format("%s:0.1.0", entry.getKey()), processFactory, NORMALIZATION_VERSION, String.format("%s:0.1.0", entry.getKey())))
.getDestinationType());
}
assertThrows(IllegalStateException.class,
() -> NormalizationRunnerFactory.create("airbyte/destination-csv:0.1.0", processFactory,
NORMALIZATION_VERSION, "airbyte/destination-csv:0.1.0"));
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ properties:
protocolVersion:
type: string
description: the Airbyte Protocol version supported by the connector
normalizationConfig:
"$ref": NormalizationDestinationDefinitionConfig.yaml
normalizationRepository:
type: string
normalizationTag:
type: string
supportsDbt:
type: boolean
description: an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.
default: false
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.NormalizationDestinationDefinitionConfig;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
Expand Down Expand Up @@ -272,9 +271,7 @@ private void writeOrUpdateStandardDefinition(final DSLContext ctx,
sourceDef.withProtocolVersion(getProtocolVersion(sourceDef.getSpec()));
ConfigWriter.writeStandardSourceDefinition(Collections.singletonList(sourceDef), ctx);
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
final NormalizationDestinationDefinitionConfig normalizationConfig = Jsons.object(definition, NormalizationDestinationDefinitionConfig.class);
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class)
.withNormalizationConfig(normalizationConfig);
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class);
destDef.withProtocolVersion(getProtocolVersion(destDef.getSpec()));
ConfigWriter.writeStandardDestinationDefinition(Collections.singletonList(destDef), ctx);
} else {
Expand Down
Loading