diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 57727fe3fb32..c9e0818b85fc 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -647,7 +647,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.6.4 + dockerImageTag: 0.6.5 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 0d2c9a3653e5..5e4f8f499cda 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6046,7 +6046,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.6.4" +- dockerImage: "airbyte/source-mysql:0.6.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java index 008167aff336..6ef3033be18a 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java @@ -8,9 +8,13 @@ import io.airbyte.db.jdbc.DateTimeConverter; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; +import io.debezium.time.Conversions; +import java.time.LocalDate; +import java.time.LocalTime; import java.util.Arrays; import java.util.Locale; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,12 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati } } + private int getTimePrecision(final RelationalColumn field) { + return field.length().orElse(-1); + } + + // Ref : + // https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types private void registerDate(final RelationalColumn field, final ConverterRegistration registration) { final var fieldType = field.typeName(); @@ -50,13 +60,33 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat return DebeziumConverterUtils.convertDefaultValue(field); } - return switch (fieldType.toUpperCase(Locale.ROOT)) { - case "DATETIME" -> DateTimeConverter.convertToTimestamp(x); - case "DATE" -> DateTimeConverter.convertToDate(x); - case "TIME" -> DateTimeConverter.convertToTime(x); - case "TIMESTAMP" -> DateTimeConverter.convertToTimestampWithTimezone(x); - default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT)); - }; + switch (fieldType.toUpperCase(Locale.ROOT)) { + case "DATETIME": + if (x instanceof final Long l) { + if (getTimePrecision(field) <= 3) { + return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMillis(l)); + } + if (getTimePrecision(field) <= 6) { + return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMicros(l)); + } + } + return DateTimeConverter.convertToTimestamp(x); + case "DATE": + if (x instanceof final Integer i) { + return DateTimeConverter.convertToDate(LocalDate.ofEpochDay(i)); + } + return DateTimeConverter.convertToDate(x); + case "TIME": + if (x instanceof Long) { + long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1)); + return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l)); + } + return DateTimeConverter.convertToTime(x); + case "TIMESTAMP": + return DateTimeConverter.convertToTimestampWithTimezone(x); + default: + throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT)); + } }); } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index b739548f1899..ecaa376a49fa 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.4 +LABEL io.airbyte.version=0.6.5 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index e9c0303ccec2..afc64e32dcc5 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.4 +LABEL io.airbyte.version=0.6.5 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java index d50e1b1df010..b93a74e1d2b0 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java @@ -232,31 +232,66 @@ protected void initTests() { .addExpectedValues("1700000.01") .build()); + for (final String type : Set.of("date", "date not null default '0000-00-00'")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("date") + .fullSourceDataType(type) + .airbyteType(JsonSchemaType.STRING_DATE) + .addInsertValues("'1999-01-08'", "'2021-01-01'") + .addExpectedValues("1999-01-08", "2021-01-01") + .build()); + } + addDataTypeTestData( TestDataHolder.builder() .sourceType("date") .airbyteType(JsonSchemaType.STRING_DATE) - .addInsertValues("null", "'2021-01-01'") - .addExpectedValues(null, "2021-01-01") + .addInsertValues("null") + .addExpectedValues((String) null) .build()); + for (final String fullSourceType : Set.of("datetime", "datetime not null default now()")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("datetime") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) + .addInsertValues("'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'") + .addExpectedValues("2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000") + .build()); + } + addDataTypeTestData( TestDataHolder.builder() .sourceType("datetime") .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) - .addInsertValues("null", "'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'") - .addExpectedValues(null, "2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000") + .addInsertValues("null") + .addExpectedValues((String) null) .build()); addTimestampDataTypeTest(); + for (final String fullSourceType : Set.of("time", "time not null default '00:00:00'")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("time") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) + // JDBC driver can process only "clock"(00:00:00-23:59:59) values. + .addInsertValues("'-22:59:59'", "'23:59:59'", "'00:00:00'") + .addExpectedValues("22:59:59.000000", "23:59:59.000000", "00:00:00.000000") + .build()); + + } + addDataTypeTestData( TestDataHolder.builder() .sourceType("time") .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) // JDBC driver can process only "clock"(00:00:00-23:59:59) values. - .addInsertValues("null", "'-22:59:59'", "'23:59:59'", "'00:00:00'") - .addExpectedValues(null, "22:59:59.000000", "23:59:59.000000", "00:00:00.000000") + .addInsertValues("null") + .addExpectedValues((String) null) .build()); addDataTypeTestData( diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index b0a035535645..7d7b985fdb9f 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -188,6 +188,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------| +| 0.6.5 | 2022-08-25 | [15917](https://github.com/airbytehq/airbyte/pull/15917) | Fix temporal data type default value bug | | 0.6.4 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | | 0.6.3 | 2022-08-12 | [15044](https://github.com/airbytehq/airbyte/pull/15044) | Added the ability to connect using different SSL modes and SSL certificates | | 0.6.2 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state |