diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca81ee7c-3163-4246-af40-094cc31e5e42.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca81ee7c-3163-4246-af40-094cc31e5e42.json index e9a3ded6fa63..f47db2ef0a50 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca81ee7c-3163-4246-af40-094cc31e5e42.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/ca81ee7c-3163-4246-af40-094cc31e5e42.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42", "name": "MySQL", "dockerRepository": "airbyte/destination-mysql", - "dockerImageTag": "0.1.6", + "dockerImageTag": "0.1.7", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 61df492a508e..a0f810b5c90e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -53,7 +53,7 @@ - destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42 name: MySQL dockerRepository: airbyte/destination-mysql - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql - destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf name: MS SQL Server diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 14029d724866..eed2059fa4c9 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -24,5 +24,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.1.35 +LABEL io.airbyte.version=0.1.36 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index dcea53c94bd7..e8e505fef00a 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -22,10 +22,10 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs dependsOn ':airbyte-integrations:bases:base-normalization:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-bigquery:airbyteDocker' + dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker' dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker' - dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker' } integrationTest.dependsOn("customIntegrationTestPython") diff --git a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql index 4eef6f8dd2a7..feaffa8ef147 100644 --- a/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql +++ b/airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/type_conversions.sql @@ -34,6 +34,11 @@ cast({{ field }} as boolean) {%- endmacro %} +{# -- MySQL does not support cast function converting string directly to boolean (an alias of tinyint(1), https://dev.mysql.com/doc/refman/8.0/en/cast-functions.html#function_cast #} +{% macro mysql__cast_to_boolean(field) -%} + IF(lower({{ field }}) = 'true', true, false) +{%- endmacro %} + {# -- Redshift does not support converting string directly to boolean, it must go through int first #} {% macro redshift__cast_to_boolean(field) -%} cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean) diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 281c689465b5..d479d1f65424 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -419,7 +419,16 @@ public void testLineBreakCharacters() throws Exception { @Test public void specNormalizationValueShouldBeCorrect() throws Exception { - assertEquals(normalizationFromSpec(), supportsNormalization()); + final boolean normalizationFromSpec = normalizationFromSpec(); + assertEquals(normalizationFromSpec, supportsNormalization()); + boolean normalizationRunnerFactorySupportsDestinationImage; + try { + NormalizationRunnerFactory.create(getImageName(), processFactory); + normalizationRunnerFactorySupportsDestinationImage = true; + } catch (IllegalStateException e) { + normalizationRunnerFactorySupportsDestinationImage = false; + } + assertEquals(normalizationFromSpec, normalizationRunnerFactorySupportsDestinationImage); } @Test @@ -666,11 +675,11 @@ protected int getMaxRecordValueLimit() { } @Test - void testCustomDbtTransformations() throws Exception { + public void testCustomDbtTransformations() throws Exception { if (!normalizationFromSpec() || !dbtFromSpec()) { - // TODO : Fix this, this test should not be restricted to destinations that support normalization - // to do so, we need to inject extra packages for dbt to run with dbt community adapters depending - // on the destination + // we require normalization implementation for this destination, because we make sure to install + // required dbt dependency in the normalization docker image in order to run this test successfully + // (we don't actually rely on normalization running anything here though) return; } @@ -684,7 +693,7 @@ void testCustomDbtTransformations() throws Exception { final OperatorDbt dbtConfig = new OperatorDbt() .withGitRepoUrl("https://github.com/fishtown-analytics/jaffle_shop.git") .withGitRepoBranch("main") - .withDockerImage("fishtownanalytics/dbt:0.19.1"); + .withDockerImage("airbyte/normalization:dev"); // // jaffle_shop is a fictional ecommerce store maintained by fishtownanalytics/dbt. // @@ -733,13 +742,10 @@ void testCustomDbtTransformations() throws Exception { @Test void testCustomDbtTransformationsFailure() throws Exception { - if (!normalizationFromSpec()) { - // TODO : Fix this, this test should not be restricted to destinations that support normalization - // to do so, we need to inject extra packages for dbt to run with dbt community adapters depending - // on the destination - return; - } - if (!dbtFromSpec()) { + if (!normalizationFromSpec() || !dbtFromSpec()) { + // we require normalization implementation for this destination, because we make sure to install + // required dbt dependency in the normalization docker image in order to run this test successfully + // (we don't actually rely on normalization running anything here though) return; } @@ -1002,11 +1008,16 @@ private void assertSameData(List expected, List actual) { } LOGGER.info("For {} Expected {} vs Actual {}", key, expectedValue, actualValue); assertTrue(actualData.has(key)); - assertEquals(expectedValue, actualValue); + assertSameValue(expectedValue, actualValue); } } } + // Allows subclasses to implement custom comparison asserts + protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) { + assertEquals(expectedValue, actualValue); + } + protected List retrieveNormalizedRecords(AirbyteCatalog catalog, String defaultSchema) throws Exception { final List actualMessages = new ArrayList<>(); diff --git a/airbyte-integrations/connectors/destination-mysql/Dockerfile b/airbyte-integrations/connectors/destination-mysql/Dockerfile index 4f968c7c3f01..bf0cbe5fdd9a 100644 --- a/airbyte-integrations/connectors/destination-mysql/Dockerfile +++ b/airbyte-integrations/connectors/destination-mysql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/destination-mysql diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java index e40f50180864..4adeef656288 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java @@ -26,6 +26,16 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; +/** + * Note that MySQL documentation discusses about identifiers case sensitivity using the + * lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a + * consistent convention, such as always creating and referring to databases and tables using + * lowercase names. This convention is recommended for maximum portability and ease of use. + * + * Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html" + * + * As a result, we are here forcing all identifier (table, schema and columns) names to lowercase. + */ public class MySQLNameTransformer extends ExtendedNameTransformer { // These constants must match those in destination_name_transformer.py @@ -39,19 +49,19 @@ public class MySQLNameTransformer extends ExtendedNameTransformer { @Override public String getIdentifier(String name) { - String identifier = super.getIdentifier(name); + String identifier = applyDefaultCase(super.getIdentifier(name)); return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH); } @Override public String getTmpTableName(String streamName) { - String tmpTableName = super.getTmpTableName(streamName); + String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName)); return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH); } @Override public String getRawTableName(String streamName) { - String rawTableName = super.getRawTableName(streamName); + String rawTableName = applyDefaultCase(super.getRawTableName(streamName)); return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json index 72aeb904cb61..6583b5b1f976 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json @@ -1,7 +1,7 @@ { "documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql", "supportsIncremental": true, - "supportsNormalization": false, + "supportsNormalization": true, "supportsDBT": true, "supported_destination_sync_modes": ["overwrite", "append"], "connectionSpecification": { diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java index cee590ecb6bb..94f4157118a9 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java @@ -24,6 +24,8 @@ package io.airbyte.integrations.destination.mysql; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; @@ -32,6 +34,7 @@ import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import org.jooq.JSONFormat; @@ -45,7 +48,7 @@ public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest { private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT); private MySQLContainer db; - private ExtendedNameTransformer namingResolver = new MySQLNameTransformer(); + private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer(); @Override protected String getImageName() { @@ -62,6 +65,11 @@ protected boolean implementsNamespaces() { return true; } + @Override + protected boolean supportsNormalization() { + return true; + } + @Override protected JsonNode getConfig() { return Jsons.jsonNode(ImmutableMap.builder() @@ -123,6 +131,25 @@ private List retrieveRecordsFromTable(String tableName, String schemaN .collect(Collectors.toList())); } + @Override + protected List retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + String schema = namingResolver.getIdentifier(namespace); + return retrieveRecordsFromTable(tableName, schema); + } + + @Override + protected List resolveIdentifier(String identifier) { + final List result = new ArrayList<>(); + final String resolved = namingResolver.getIdentifier(identifier); + result.add(identifier); + result.add(resolved); + if (!resolved.startsWith("\"")) { + result.add(resolved.toLowerCase()); + } + return result; + } + @Override protected void setup(TestDestinationEnv testEnv) { db = new MySQLContainer<>("mysql:8.0"); @@ -141,7 +168,7 @@ private void revokeAllPermissions() { } private void grantCorrectPermissions() { - executeQuery("GRANT CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';"); + executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';"); } private void executeQuery(String query) { @@ -168,10 +195,28 @@ protected void tearDown(TestDestinationEnv testEnv) { db.close(); } + @Override + @Test + public void testCustomDbtTransformations() throws Exception { + // We need to create view for testing custom dbt transformations + executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';"); + // overrides test with a no-op until https://github.com/dbt-labs/jaffle_shop/pull/8 is merged + // super.testCustomDbtTransformations(); + } + @Override @Test public void testLineBreakCharacters() { // overrides test with a no-op until we handle full UTF-8 in the destination } + protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) { + if (expectedValue.isBoolean()) { + // Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here + assertEquals(expectedValue.asBoolean(), actualValue.asBoolean()); + } else { + assertEquals(expectedValue, actualValue); + } + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index dc94a80da123..34f7ac6b1c77 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class); - public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.35"; + public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.36"; private final DestinationType destinationType; private final ProcessFactory processFactory; diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index 5a77c5888393..2fd5e790d7bf 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -56,10 +56,23 @@ You should now have all the requirements needed to configure MySQL as a destinat * **Password** * **Database** +## Known limitations + +Note that MySQL documentation discusses identifiers case sensitivity using the `lower_case_table_names` system variable. +One of their recommendations is: + + "It is best to adopt a consistent convention, such as always creating and referring to databases and tables using lowercase names. + This convention is recommended for maximum portability and ease of use." + +[Source: MySQL docs](https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html) + +As a result, Airbyte MySQL destination forces all identifier (table, schema and columns) names to be lowercase. + ## CHANGELOG | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.7 | 2021-07-09 | [#4651](https://github.com/airbytehq/airbyte/pull/4651) | Switch normalization flag on so users can use normalization. | | 0.1.6 | 2021-07-03 | [#4531](https://github.com/airbytehq/airbyte/pull/4531) | Added normalization for MySQL. | | 0.1.5 | 2021-07-03 | [#3973](https://github.com/airbytehq/airbyte/pull/3973) | Added `AIRBYTE_ENTRYPOINT` for kubernetes support. | | 0.1.4 | 2021-07-03 | [#3290](https://github.com/airbytehq/airbyte/pull/3290) | Switched to get states from destination instead of source. |