Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove NormalizationRunnerFactory #20245

Merged
merged 51 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fadad7f
updated IntegrationLauncherConfig.yaml and added to this class suport…
andriikorotkov Nov 24, 2022
7caa00c
updated minor remarks
andriikorotkov Nov 25, 2022
8c46fd3
updated minor remarks
andriikorotkov Nov 25, 2022
8c7f464
fixed minor remarks
andriikorotkov Nov 28, 2022
1d90e5d
added normalization data to the tests
andriikorotkov Nov 29, 2022
7f97179
fixed minor remarks
andriikorotkov Nov 29, 2022
980459c
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Nov 29, 2022
6360b55
removed NormalizationRunnerFactory
andriikorotkov Nov 29, 2022
e04c848
fixed remarks
andriikorotkov Nov 29, 2022
6a4bebb
Merge branch 'akorotkov/18232' of github.com:airbytehq/airbyte into a…
andriikorotkov Nov 29, 2022
a035df6
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Nov 29, 2022
cd65ebb
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Nov 29, 2022
7f72805
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Nov 30, 2022
f77d768
Merge branch 'akorotkov/18232' of github.com:airbytehq/airbyte into a…
andriikorotkov Nov 30, 2022
f1cf84d
fixed remarks
andriikorotkov Nov 30, 2022
0b0cfce
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Nov 30, 2022
d972133
fixed remarks
andriikorotkov Dec 1, 2022
f97da1d
updated acceptance tests
andriikorotkov Dec 1, 2022
ee82e54
updated acceptance tests
andriikorotkov Dec 2, 2022
d6203ba
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 4, 2022
77fe436
updated check_images_exist.sh script
andriikorotkov Dec 4, 2022
3afde21
updated method for get normalization image name for destination accep…
andriikorotkov Dec 4, 2022
f91c1b1
fixed code style
andriikorotkov Dec 4, 2022
002d253
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 5, 2022
48077f7
fixed code style and removed tests data
andriikorotkov Dec 5, 2022
ccc42c7
updated JobErrorReporterTest.java
andriikorotkov Dec 5, 2022
57f5087
updated JobErrorReporterTest.java
andriikorotkov Dec 5, 2022
413e91b
fixed remarks
andriikorotkov Dec 6, 2022
d2ac3ae
added integration type field to the dectination_definition file and a…
andriikorotkov Dec 8, 2022
afc8afa
fixed tests
andriikorotkov Dec 8, 2022
c60b268
Merge branch 'master' into akorotkov/remove_factory_
andriikorotkov Dec 8, 2022
213290a
fixed tests
andriikorotkov Dec 8, 2022
040f2a2
fixed minor changes after pulling master changes
andriikorotkov Dec 9, 2022
5ea2e14
fixed minor changes after pulling master changes
andriikorotkov Dec 9, 2022
534ea58
renamed integrationType to normalizationIntegrationType/ fixed minor …
andriikorotkov Dec 12, 2022
85ce1f5
renamed extra dependencies
andriikorotkov Dec 12, 2022
1dff835
updated docs
andriikorotkov Dec 12, 2022
8f035c2
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 12, 2022
53f9640
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 12, 2022
16c364d
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 12, 2022
8aa5eff
updated docs
andriikorotkov Dec 13, 2022
fb92225
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 13, 2022
bfe5c9d
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 13, 2022
f196bea
fixed minor remarks
andriikorotkov Dec 14, 2022
1df7802
added NormalizationDestinationDefinitionConfig.yaml for StandardDesti…
andriikorotkov Dec 14, 2022
33fba46
updated normalization tag
andriikorotkov Dec 14, 2022
b6fb7f0
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 14, 2022
3e0a6b3
updated DestinationAcceptanceTest.java
andriikorotkov Dec 14, 2022
811e855
updated DestinationAcceptanceTest.java
andriikorotkov Dec 14, 2022
3a09ee0
updated imports and descriptions
andriikorotkov Dec 15, 2022
d0c94b1
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Dec 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.40.23.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.23.002", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers.normalization;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
Expand Down Expand Up @@ -46,7 +45,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
.setLogPrefix("normalization")
.setPrefixColor(Color.GREEN_BACKGROUND);

private final DestinationType destinationType;
private final String normalizationIntegrationType;
private final ProcessFactory processFactory;
private final String normalizationImageName;
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
Expand All @@ -55,31 +54,12 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private Process process = null;

public enum DestinationType {
BIGQUERY,
MSSQL,
MYSQL,
ORACLE,
POSTGRES,
REDSHIFT,
SNOWFLAKE,
CLICKHOUSE,
TIDB
}

public DefaultNormalizationRunner(final DestinationType destinationType,
final ProcessFactory processFactory,
final String normalizationImageName) {
this.destinationType = destinationType;
this.processFactory = processFactory;
this.normalizationImageName = normalizationImageName;
}

public DefaultNormalizationRunner(final ProcessFactory processFactory,
final String normalizationImage) {
final String normalizationImage,
final String normalizationIntegrationType) {
this.processFactory = processFactory;
this.normalizationImageName = normalizationImage;
this.destinationType = null;
this.normalizationIntegrationType = normalizationIntegrationType;
}

@Override
Expand All @@ -99,12 +79,12 @@ public boolean configureDbt(final String jobId,
final String gitRepoBranch = dbtConfig.getGitRepoBranch();
if (Strings.isNullOrEmpty(gitRepoBranch)) {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl);
} else {
return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", gitRepoUrl,
"--git-branch", gitRepoBranch);
Expand All @@ -124,7 +104,7 @@ public boolean normalize(final String jobId,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

return runProcess(jobId, attempt, jobRoot, files, resourceRequirements, "run",
"--integration-type", destinationType.toString().toLowerCase(),
"--integration-type", normalizationIntegrationType.toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
}
Expand Down Expand Up @@ -233,9 +213,4 @@ private String buildInternalErrorMessageFromDbtStackTrace() {
return errorMap.get(SentryExceptionHelper.ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY);
}

@VisibleForTesting
DestinationType getDestinationType() {
return destinationType;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
Expand All @@ -26,7 +27,6 @@
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.DefaultNormalizationRunner.DestinationType;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.ByteArrayInputStream;
Expand All @@ -49,6 +49,10 @@ class DefaultNormalizationRunnerTest {
private static final String JOB_ID = "0";
private static final int JOB_ATTEMPT = 0;

private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
private static final String NORMALIZATION_TAG = "42.42.42";
private static final String INTEGRATION_TYPE = "postgres";

private static Path logJobRoot;

static {
Expand Down Expand Up @@ -82,14 +86,14 @@ void setup() throws IOException, WorkerException {
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));

when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, false, false, files, null,
DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null,
workerConfigs.getResourceRequirements(),
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
AirbyteIntegrationLauncher.NORMALIZE_STEP),
Map.of(),
Map.of(),
"run",
"--integration-type", "bigquery",
"--integration-type", INTEGRATION_TYPE,
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME))
.thenReturn(process);
Expand All @@ -110,8 +114,7 @@ public void tearDown() throws IOException {
@Test
void test() throws Exception {
final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);

when(process.exitValue()).thenReturn(0);

Expand All @@ -122,8 +125,7 @@ void test() throws Exception {
void testLog() throws Exception {

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);

when(process.exitValue()).thenReturn(0);

Expand All @@ -145,8 +147,7 @@ void testClose() throws Exception {
when(process.isAlive()).thenReturn(true).thenReturn(false);

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements());
runner.close();

Expand All @@ -158,8 +159,7 @@ void testFailure() throws Exception {
when(process.exitValue()).thenReturn(1);

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

verify(process).waitFor();
Expand All @@ -180,8 +180,7 @@ void testFailureWithTraceMessage() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(errorTraceString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand All @@ -207,8 +206,7 @@ void testFailureWithDbtError() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand All @@ -229,8 +227,7 @@ void testFailureWithDbtErrorJsonFormat() throws Exception {
when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8)));

final NormalizationRunner runner =
new DefaultNormalizationRunner(DestinationType.BIGQUERY, processFactory,
NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME);
new DefaultNormalizationRunner(processFactory, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), INTEGRATION_TYPE);
assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements()));

assertEquals(1, runner.getTraceMessages().count());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/NormalizationDestinationDefinitionConfig.yaml
title: NormalizationDestinationDefinitionConfig
description: describes a normalization config for destination definition
type: object
required:
- normalizationRepository
- normalizationTag
- normalizationIntegrationType
additionalProperties: true
properties:
normalizationRepository:
type: string
description: an optional field indicating the name of the repository to be used for normalization. If the value of the flag is NULL - normalization is not used.
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
normalizationTag:
type: string
description: an optional field indicating the tag of the docker repository to be used for normalization.
normalizationIntegrationType:
type: string
description: an optional field indicating the type of integration dialect to use for normalization.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ properties:
protocolVersion:
type: string
description: the Airbyte Protocol version supported by the connector
normalizationRepository:
type: string
normalizationTag:
type: string
normalizationConfig:
"$ref": NormalizationDestinationDefinitionConfig.yaml
supportsDbt:
type: boolean
default: false
description: an optional flag indicating whether DBT is used in the normalization. If the flag value is NULL - DBT is not used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we to default to false? The behavior would be the same and that could remove some null ptr exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried leaving the default value here, but it overwrites the values that come from the definition file. I haven't figured out why this is happening yet, but it gives the wrong values to the actor_definition table when starting and reading information from the file.

If you are afraid NPE - in all possible places, I added a check for NULL to prevent it from occurring.

Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.*;
andriikorotkov marked this conversation as resolved.
Show resolved Hide resolved
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.configs.jooq.generated.enums.ActorType;
Expand Down Expand Up @@ -271,7 +268,9 @@ private void writeOrUpdateStandardDefinition(final DSLContext ctx,
sourceDef.withProtocolVersion(getProtocolVersion(sourceDef.getSpec()));
ConfigWriter.writeStandardSourceDefinition(Collections.singletonList(sourceDef), ctx);
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class);
final NormalizationDestinationDefinitionConfig normalizationConfig = Jsons.object(definition, NormalizationDestinationDefinitionConfig.class);
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class)
.withNormalizationConfig(normalizationConfig);
destDef.withProtocolVersion(getProtocolVersion(destDef.getSpec()));
ConfigWriter.writeStandardDestinationDefinition(Collections.singletonList(destDef), ctx);
} else {
Expand Down
Loading