Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turn on MYSQL normalization flag. #4651

Merged
merged 11 commits into from
Jul 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.version=0.1.36
LABEL io.airbyte.name=airbyte/normalization
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs

dependsOn ':airbyte-integrations:bases:base-normalization:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-bigquery:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
cast({{ field }} as boolean)
{%- endmacro %}

{# -- MySQL does not support cast function converting string directly to boolean (an alias of tinyint(1), https://dev.mysql.com/doc/refman/8.0/en/cast-functions.html#function_cast #}
{% macro mysql__cast_to_boolean(field) -%}
IF(lower({{ field }}) = 'true', true, false)
{%- endmacro %}

{# -- Redshift does not support converting string directly to boolean, it must go through int first #}
{% macro redshift__cast_to_boolean(field) -%}
cast(decode({{ field }}, 'true', '1', 'false', '0')::integer as boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,16 @@ public void testLineBreakCharacters() throws Exception {

@Test
public void specNormalizationValueShouldBeCorrect() throws Exception {
assertEquals(normalizationFromSpec(), supportsNormalization());
final boolean normalizationFromSpec = normalizationFromSpec();
assertEquals(normalizationFromSpec, supportsNormalization());
boolean normalizationRunnerFactorySupportsDestinationImage;
try {
NormalizationRunnerFactory.create(getImageName(), processFactory);
normalizationRunnerFactorySupportsDestinationImage = true;
} catch (IllegalStateException e) {
normalizationRunnerFactorySupportsDestinationImage = false;
}
assertEquals(normalizationFromSpec, normalizationRunnerFactorySupportsDestinationImage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should make sure the spec.json boolean is properly updated for future normalization implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💪🏼

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're too quick!!

}

@Test
Expand Down Expand Up @@ -666,11 +675,11 @@ protected int getMaxRecordValueLimit() {
}

@Test
void testCustomDbtTransformations() throws Exception {
public void testCustomDbtTransformations() throws Exception {
if (!normalizationFromSpec() || !dbtFromSpec()) {
// TODO : Fix this, this test should not be restricted to destinations that support normalization
// to do so, we need to inject extra packages for dbt to run with dbt community adapters depending
// on the destination
// we require normalization implementation for this destination, because we make sure to install
// required dbt dependency in the normalization docker image in order to run this test successfully
// (we don't actually rely on normalization running anything here though)
return;
}

Expand All @@ -684,7 +693,7 @@ void testCustomDbtTransformations() throws Exception {
final OperatorDbt dbtConfig = new OperatorDbt()
.withGitRepoUrl("https://github.com/fishtown-analytics/jaffle_shop.git")
.withGitRepoBranch("main")
.withDockerImage("fishtownanalytics/dbt:0.19.1");
.withDockerImage("airbyte/normalization:dev");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just making sure i follow here...so we are using airbyte normalization because it has the right dbt deps. But when we hit dbt run below what are we actually running then? Airbyte normalization?

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's just using the dbt binary and packages from the docker image, the models are coming from what is defined in GitRepoUrl (ie https://github.com/fishtown-analytics/jaffle_shop.git)

p.s. it turns out the SQL files in the jaffle shop are not compatible with MySQL ... that's why we had to disable the test until we merge a PR into the jaffle_shop repo... or we could make a fork instead...

//
// jaffle_shop is a fictional ecommerce store maintained by fishtownanalytics/dbt.
//
Expand Down Expand Up @@ -733,13 +742,10 @@ void testCustomDbtTransformations() throws Exception {

@Test
void testCustomDbtTransformationsFailure() throws Exception {
if (!normalizationFromSpec()) {
// TODO : Fix this, this test should not be restricted to destinations that support normalization
// to do so, we need to inject extra packages for dbt to run with dbt community adapters depending
// on the destination
return;
}
if (!dbtFromSpec()) {
if (!normalizationFromSpec() || !dbtFromSpec()) {
// we require normalization implementation for this destination, because we make sure to install
// required dbt dependency in the normalization docker image in order to run this test successfully
// (we don't actually rely on normalization running anything here though)
return;
}

Expand Down Expand Up @@ -1002,11 +1008,16 @@ private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
}
LOGGER.info("For {} Expected {} vs Actual {}", key, expectedValue, actualValue);
assertTrue(actualData.has(key));
assertEquals(expectedValue, actualValue);
assertSameValue(expectedValue, actualValue);
}
}
}

// Allows subclasses to implement custom comparison asserts
protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
ChristopheDuong marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(expectedValue, actualValue);
}

protected List<AirbyteRecordMessage> retrieveNormalizedRecords(AirbyteCatalog catalog, String defaultSchema) throws Exception {
final List<AirbyteRecordMessage> actualMessages = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@

import io.airbyte.integrations.destination.ExtendedNameTransformer;

/**
* Note that MySQL documentation discusses about identifiers case sensitivity using the
* lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a
* consistent convention, such as always creating and referring to databases and tables using
* lowercase names. This convention is recommended for maximum portability and ease of use.
*
* Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html"
*
* As a result, we are here forcing all identifier (table, schema and columns) names to lowercase.
*/
public class MySQLNameTransformer extends ExtendedNameTransformer {

// These constants must match those in destination_name_transformer.py
Expand All @@ -39,19 +49,19 @@ public class MySQLNameTransformer extends ExtendedNameTransformer {

@Override
public String getIdentifier(String name) {
String identifier = super.getIdentifier(name);
String identifier = applyDefaultCase(super.getIdentifier(name));
return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH);
}

@Override
public String getTmpTableName(String streamName) {
String tmpTableName = super.getTmpTableName(streamName);
String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName));
return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH);
}

@Override
public String getRawTableName(String streamName) {
String rawTableName = super.getRawTableName(streamName);
String rawTableName = applyDefaultCase(super.getRawTableName(streamName));
return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsNormalization": true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was it turned off?

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if has never been ON

Therefore it was OFF because the implementation of normalization for MySQL was not implemented yet when the destination was first implemented

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but why didn't the PR for normalization turn it on? just missed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it missed it

Copy link
Contributor

@ChristopheDuong ChristopheDuong Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
Expand All @@ -32,6 +34,7 @@
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
Expand All @@ -45,7 +48,7 @@ public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected String getImageName() {
Expand All @@ -62,6 +65,11 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected boolean supportsNormalization() {
return true;
}

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
Expand Down Expand Up @@ -123,6 +131,25 @@ private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaN
.collect(Collectors.toList()));
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
String schema = namingResolver.getIdentifier(namespace);
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected void setup(TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
Expand All @@ -141,7 +168,7 @@ private void revokeAllPermissions() {
}

private void grantCorrectPermissions() {
executeQuery("GRANT CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(String query) {
Expand All @@ -168,10 +195,28 @@ protected void tearDown(TestDestinationEnv testEnv) {
db.close();
}

@Override
@Test
public void testCustomDbtTransformations() throws Exception {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
// overrides test with a no-op until https://github.com/dbt-labs/jaffle_shop/pull/8 is merged
// super.testCustomDbtTransformations();
}

@Override
@Test
public void testLineBreakCharacters() {
// overrides test with a no-op until we handle full UTF-8 in the destination
}

protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
if (expectedValue.isBoolean()) {
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
} else {
assertEquals(expectedValue, actualValue);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
Expand Down Expand Up @@ -71,10 +68,7 @@
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_at"]
},
Expand Down Expand Up @@ -105,10 +99,7 @@
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
Expand Down Expand Up @@ -139,10 +130,7 @@
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
Expand Down Expand Up @@ -217,10 +205,7 @@
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
Expand All @@ -245,10 +230,7 @@
}
}
},
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": []
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,7 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator

from .streams import (
Limits,
ObjectRecords,
ObjectTypePolicies,
ObjectTypes,
RelationshipRecords,
RelationshipTypes,
)
from .streams import Limits, ObjectRecords, ObjectTypePolicies, ObjectTypes, RelationshipRecords, RelationshipTypes


class Base64HttpAuthenticator(TokenAuthenticator):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Zendesk Sunshine Spec",
"type": "object",
"required": ["api_token", "email", "start_date" ,"subdomain"],
"required": ["api_token", "email", "start_date", "subdomain"],
"additionalProperties": false,
"properties": {
"api_token": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ObjectRecords(IncrementalSunshineStream):
To support Incremental for this stream I had to use `query` endpoint instead of `objects/records` -
this allows me to use date filters. This is the only way to have incremental support.
"""

http_method = "POST"

def request_body_json(
Expand Down Expand Up @@ -213,6 +214,7 @@ class Jobs(SunshineStream):
This stream is dynamic. The data can exist today, but may be absent tomorrow.
Since we need to have some data in the stream this stream is disabled.
"""

def path(self, **kwargs) -> str:
return "jobs"

Expand Down
Loading