Skip to content

Commit

Permalink
Turn on MYSQL normalization flag. (#4651)
Browse files Browse the repository at this point in the history
* Turn on normalization flag. Bump versions
  • Loading branch information
davinchia authored Jul 9, 2021
1 parent 21c961e commit 830fac6
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.version=0.1.36
LABEL io.airbyte.name=airbyte/normalization
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs

dependsOn ':airbyte-integrations:bases:base-normalization:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-bigquery:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
cast({{ field }} as boolean)
{%- endmacro %}

{# -- MySQL does not support cast function converting string directly to boolean (an alias of tinyint(1), https://dev.mysql.com/doc/refman/8.0/en/cast-functions.html#function_cast #}
{% macro mysql__cast_to_boolean(field) -%}
IF(lower({{ field }}) = 'true', true, false)
{%- endmacro %}

{# -- Redshift does not support converting string directly to boolean, it must go through int first #}
{% macro redshift__cast_to_boolean(field) -%}
cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,16 @@ public void testLineBreakCharacters() throws Exception {

@Test
public void specNormalizationValueShouldBeCorrect() throws Exception {
assertEquals(normalizationFromSpec(), supportsNormalization());
final boolean normalizationFromSpec = normalizationFromSpec();
assertEquals(normalizationFromSpec, supportsNormalization());
boolean normalizationRunnerFactorySupportsDestinationImage;
try {
NormalizationRunnerFactory.create(getImageName(), processFactory);
normalizationRunnerFactorySupportsDestinationImage = true;
} catch (IllegalStateException e) {
normalizationRunnerFactorySupportsDestinationImage = false;
}
assertEquals(normalizationFromSpec, normalizationRunnerFactorySupportsDestinationImage);
}

@Test
Expand Down Expand Up @@ -666,11 +675,11 @@ protected int getMaxRecordValueLimit() {
}

@Test
void testCustomDbtTransformations() throws Exception {
public void testCustomDbtTransformations() throws Exception {
if (!normalizationFromSpec() || !dbtFromSpec()) {
// TODO : Fix this, this test should not be restricted to destinations that support normalization
// to do so, we need to inject extra packages for dbt to run with dbt community adapters depending
// on the destination
// we require normalization implementation for this destination, because we make sure to install
// required dbt dependency in the normalization docker image in order to run this test successfully
// (we don't actually rely on normalization running anything here though)
return;
}

Expand All @@ -684,7 +693,7 @@ void testCustomDbtTransformations() throws Exception {
final OperatorDbt dbtConfig = new OperatorDbt()
.withGitRepoUrl("https://github.com/fishtown-analytics/jaffle_shop.git")
.withGitRepoBranch("main")
.withDockerImage("fishtownanalytics/dbt:0.19.1");
.withDockerImage("airbyte/normalization:dev");
//
// jaffle_shop is a fictional ecommerce store maintained by fishtownanalytics/dbt.
//
Expand Down Expand Up @@ -733,13 +742,10 @@ void testCustomDbtTransformations() throws Exception {

@Test
void testCustomDbtTransformationsFailure() throws Exception {
if (!normalizationFromSpec()) {
// TODO : Fix this, this test should not be restricted to destinations that support normalization
// to do so, we need to inject extra packages for dbt to run with dbt community adapters depending
// on the destination
return;
}
if (!dbtFromSpec()) {
if (!normalizationFromSpec() || !dbtFromSpec()) {
// we require normalization implementation for this destination, because we make sure to install
// required dbt dependency in the normalization docker image in order to run this test successfully
// (we don't actually rely on normalization running anything here though)
return;
}

Expand Down Expand Up @@ -1002,11 +1008,16 @@ private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
}
LOGGER.info("For {} Expected {} vs Actual {}", key, expectedValue, actualValue);
assertTrue(actualData.has(key));
assertEquals(expectedValue, actualValue);
assertSameValue(expectedValue, actualValue);
}
}
}

// Allows subclasses to implement custom comparison asserts
protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
assertEquals(expectedValue, actualValue);
}

protected List<AirbyteRecordMessage> retrieveNormalizedRecords(AirbyteCatalog catalog, String defaultSchema) throws Exception {
final List<AirbyteRecordMessage> actualMessages = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@

import io.airbyte.integrations.destination.ExtendedNameTransformer;

/**
* Note that MySQL documentation discusses about identifiers case sensitivity using the
* lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a
* consistent convention, such as always creating and referring to databases and tables using
* lowercase names. This convention is recommended for maximum portability and ease of use.
*
* Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html"
*
* As a result, we are here forcing all identifier (table, schema and columns) names to lowercase.
*/
public class MySQLNameTransformer extends ExtendedNameTransformer {

// These constants must match those in destination_name_transformer.py
Expand All @@ -39,19 +49,19 @@ public class MySQLNameTransformer extends ExtendedNameTransformer {

@Override
public String getIdentifier(String name) {
String identifier = super.getIdentifier(name);
String identifier = applyDefaultCase(super.getIdentifier(name));
return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH);
}

@Override
public String getTmpTableName(String streamName) {
String tmpTableName = super.getTmpTableName(streamName);
String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName));
return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH);
}

@Override
public String getRawTableName(String streamName) {
String rawTableName = super.getRawTableName(streamName);
String rawTableName = applyDefaultCase(super.getRawTableName(streamName));
return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsNormalization": true,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
Expand All @@ -32,6 +34,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
Expand All @@ -45,7 +48,7 @@ public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected String getImageName() {
Expand All @@ -62,6 +65,11 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected boolean supportsNormalization() {
return true;
}

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
Expand Down Expand Up @@ -123,6 +131,25 @@ private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaN
.collect(Collectors.toList()));
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
String schema = namingResolver.getIdentifier(namespace);
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected void setup(TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
Expand All @@ -141,7 +168,7 @@ private void revokeAllPermissions() {
}

private void grantCorrectPermissions() {
executeQuery("GRANT CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(String query) {
Expand All @@ -168,10 +195,28 @@ protected void tearDown(TestDestinationEnv testEnv) {
db.close();
}

@Override
@Test
public void testCustomDbtTransformations() throws Exception {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
// overrides test with a no-op until https://github.com/dbt-labs/jaffle_shop/pull/8 is merged
// super.testCustomDbtTransformations();
}

@Override
@Test
public void testLineBreakCharacters() {
// overrides test with a no-op until we handle full UTF-8 in the destination
}

protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
if (expectedValue.isBoolean()) {
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
} else {
assertEquals(expectedValue, actualValue);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);

public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.35";
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.36";

private final DestinationType destinationType;
private final ProcessFactory processFactory;
Expand Down
13 changes: 13 additions & 0 deletions docs/integrations/destinations/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,23 @@ You should now have all the requirements needed to configure MySQL as a destinat
* **Password**
* **Database**

## Known limitations

Note that MySQL documentation discusses identifiers case sensitivity using the `lower_case_table_names` system variable.
One of their recommendations is:

"It is best to adopt a consistent convention, such as always creating and referring to databases and tables using lowercase names.
This convention is recommended for maximum portability and ease of use."

[Source: MySQL docs](https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html)

As a result, Airbyte MySQL destination forces all identifier (table, schema and columns) names to be lowercase.

## CHANGELOG

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.7 | 2021-07-09 | [#4651](https://github.com/airbytehq/airbyte/pull/4651) | Switch normalization flag on so users can use normalization. |
| 0.1.6 | 2021-07-03 | [#4531](https://github.com/airbytehq/airbyte/pull/4531) | Added normalization for MySQL. |
| 0.1.5 | 2021-07-03 | [#3973](https://github.com/airbytehq/airbyte/pull/3973) | Added `AIRBYTE_ENTRYPOINT` for kubernetes support. |
| 0.1.4 | 2021-07-03 | [#3290](https://github.com/airbytehq/airbyte/pull/3290) | Switched to get states from destination instead of source. |
Expand Down

0 comments on commit 830fac6

Please sign in to comment.