Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix temporal type default value bug mysql #15917

Merged
merged 6 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.4
dockerImageTag: 0.6.5
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6046,7 +6046,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.4"
- dockerImage: "airbyte/source-mysql:0.6.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.time.Conversions;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -42,6 +46,12 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
}
}

private int getTimePrecision(final RelationalColumn field) {
return field.length().orElse(-1);
}

// Ref :
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

Expand All @@ -50,13 +60,33 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
return DebeziumConverterUtils.convertDefaultValue(field);
}

return switch (fieldType.toUpperCase(Locale.ROOT)) {
case "DATETIME" -> DateTimeConverter.convertToTimestamp(x);
case "DATE" -> DateTimeConverter.convertToDate(x);
case "TIME" -> DateTimeConverter.convertToTime(x);
case "TIMESTAMP" -> DateTimeConverter.convertToTimestampWithTimezone(x);
default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
};
switch (fieldType.toUpperCase(Locale.ROOT)) {
case "DATETIME":
if (x instanceof final Long l) {
if (getTimePrecision(field) <= 3) {
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMillis(l));
}
if (getTimePrecision(field) <= 6) {
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMicros(l));
}
}
return DateTimeConverter.convertToTimestamp(x);
case "DATE":
if (x instanceof final Integer i) {
return DateTimeConverter.convertToDate(LocalDate.ofEpochDay(i));
}
return DateTimeConverter.convertToDate(x);
case "TIME":
if (x instanceof Long) {
long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1));
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
}
return DateTimeConverter.convertToTime(x);
case "TIMESTAMP":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: did you test with different precisions? It seems kind of weird that debezium would have special handling for datetime but not timestamp (and that timestamp isn't actually in their table >.> )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. They have listed it in the docs as well

Excluding the TIMESTAMP data type, MySQL temporal types depend on the value of the time.precision.mode connector configuration property.

return DateTimeConverter.convertToTimestampWithTimezone(x);
default:
throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.version=0.6.5
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.4
LABEL io.airbyte.version=0.6.5
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -232,31 +232,66 @@ protected void initTests() {
.addExpectedValues("1700000.01")
.build());

for (final String type : Set.of("date", "date not null default '0000-00-00'")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.fullSourceDataType(type)
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("'1999-01-08'", "'2021-01-01'")
.addExpectedValues("1999-01-08", "2021-01-01")
.build());
}

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("null", "'2021-01-01'")
.addExpectedValues(null, "2021-01-01")
.addInsertValues("null")
.addExpectedValues((String) null)
.build());

for (final String fullSourceType : Set.of("datetime", "datetime not null default now()")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'")
.addExpectedValues("2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000")
.build());
}

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'")
.addExpectedValues(null, "2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000")
.addInsertValues("null")
.addExpectedValues((String) null)
.build());

addTimestampDataTypeTest();

for (final String fullSourceType : Set.of("time", "time not null default '00:00:00'")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("'-22:59:59'", "'23:59:59'", "'00:00:00'")
.addExpectedValues("22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
.build());

}

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("null", "'-22:59:59'", "'23:59:59'", "'00:00:00'")
.addExpectedValues(null, "22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
.addInsertValues("null")
.addExpectedValues((String) null)
.build());

addDataTypeTestData(
Expand Down