+ * This component sets up a {@link TimeZoneAdapter} that is specific to how the MySQL Binary Log client library
+ * works. The {@link AbstractRowsEventDataDeserializer} class has various methods to instantiate the
+ * {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and {@link java.sql.Timestamp} temporal values,
+ * where the values for {@link java.util.Date}, {@link java.sql.Date}, and {@link java.sql.Time} are all in terms of
+ * the local time zone (since it uses {@link java.util.Calendar#getInstance()}), but where the
+ * {@link java.sql.Timestamp} values are created differently using the milliseconds past epoch and therefore in terms of
+ * the UTC time zone.
+ *
+ * And, because Kafka Connect {@link Time}, {@link Date}, and {@link Timestamp} logical
+ * schema types all expect the {@link java.util.Date} to be in terms of the UTC time zone, the
+ * {@link TimeZoneAdapter} also needs to produce {@link java.util.Date} values that will be correct in UTC.
*
* @param config the connector configuration, which is presumed to be valid
* @param serverName the name of the server
@@ -78,10 +96,18 @@ public MySqlSchema(Configuration config, String serverName) {
this.tables = new Tables();
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
- this.schemaBuilder = new TableSchemaBuilder();
- if ( serverName != null ) serverName = serverName.trim();
+
+ // Specific to how the MySQL Binary Log client library creates temporal values ...
+ TimeZoneAdapter tzAdapter = TimeZoneAdapter.create()
+ .withLocalZoneForUtilDate()
+ .withLocalZoneForSqlDate()
+ .withLocalZoneForSqlTime()
+ .withUtcZoneForSqlTimestamp()
+ .withUtcTargetZone();
+ this.schemaBuilder = new TableSchemaBuilder(tzAdapter);
+ if (serverName != null) serverName = serverName.trim();
this.serverName = serverName;
- if ( this.serverName == null || serverName.isEmpty() ) {
+ if (this.serverName == null || serverName.isEmpty()) {
this.schemaPrefix = "";
} else {
this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + ".";
@@ -95,7 +121,7 @@ public MySqlSchema(Configuration config, String serverName) {
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false);
- this.dbHistory.configure(dbHistoryConfig,HISTORY_COMPARATOR); // validates
+ this.dbHistory.configure(dbHistoryConfig, HISTORY_COMPARATOR); // validates
}
/**
@@ -210,8 +236,8 @@ protected void changeTablesAndRecordInHistory(SourceInfo source, Callable
changeFunction.call();
} catch (Exception e) {
this.tables = copy;
- if ( e instanceof SQLException) throw (SQLException)e;
- this.logger.error("Unexpected error whle changing model of MySQL schemas: {}",e.getMessage(),e);
+ if (e instanceof SQLException) throw (SQLException) e;
+ this.logger.error("Unexpected error whle changing model of MySQL schemas: {}", e.getMessage(), e);
}
// At least one table has changed or was removed, so first refresh the Kafka Connect schemas ...
@@ -307,12 +333,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// to the same _affected_ database...
ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> {
if (filters.databaseFilter().test(dbName)) {
- if ( dbName == null ) dbName = "";
+ if (dbName == null) dbName = "";
statementConsumer.consume(dbName, ddlStatements);
}
});
} else if (filters.databaseFilter().test(databaseName)) {
- if ( databaseName == null ) databaseName = "";
+ if (databaseName == null) databaseName = "";
statementConsumer.consume(databaseName, ddlStatements);
}
}
diff --git a/debezium-connector-mysql/src/test/docker/init/setup.sql b/debezium-connector-mysql/src/test/docker/init/setup.sql
index 1137e917f4c..892443ae190 100644
--- a/debezium-connector-mysql/src/test/docker/init/setup.sql
+++ b/debezium-connector-mysql/src/test/docker/init/setup.sql
@@ -220,3 +220,12 @@ CREATE TABLE dbz84_integer_types_table (
);
INSERT INTO dbz84_integer_types_table
VALUES(127,-128,128,255, default,201,202,203, default,301,302,303, default,401,402,403, default,501,502,503);
+
+-- DBZ-85 handle fractional part of seconds
+CREATE TABLE dbz_85_fractest (
+ c1 DATE,
+ c2 TIME(2),
+ c3 DATETIME(2),
+ c4 TIMESTAMP(2)
+);
+INSERT INTO dbz_85_fractest VALUES ('2014-09-08', '17:51:04.777', '2014-09-08 17:51:04.777', '2014-09-08 17:51:04.777');
\ No newline at end of file
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java
index 9fc98c41b4c..654e191f673 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java
@@ -7,7 +7,13 @@
import java.nio.file.Path;
import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.Month;
+import java.time.ZoneId;
+import org.apache.kafka.connect.data.Struct;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -16,6 +22,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
+import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
@@ -70,21 +77,74 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
- SourceRecords records = consumeRecordsByTopic(3 + 2); // 3 schema change record, 2 inserts
+ SourceRecords records = consumeRecordsByTopic(4 + 3); // 4 schema change record, 3 inserts
stopConnector();
assertThat(records).isNotNull();
- assertThat(records.recordsForTopic("regression").size()).isEqualTo(3);
+ assertThat(records.recordsForTopic("regression").size()).isEqualTo(4);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
- assertThat(records.topics().size()).isEqualTo(3);
+ assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
+ assertThat(records.topics().size()).isEqualTo(4);
assertThat(records.databaseNames().size()).isEqualTo(1);
- assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(3);
+ assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(4);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
+ records.forEach(record->{
+ Struct value = (Struct)record.value();
+ if ( record.topic().endsWith("dbz_85_fractest")) {
+ // The microseconds of all three should be exactly 780
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ java.util.Date c1 = (java.util.Date)after.get("c1");
+ java.util.Date c2 = (java.util.Date)after.get("c2");
+ java.util.Date c3 = (java.util.Date)after.get("c3");
+ java.util.Date c4 = (java.util.Date)after.get("c4");
+ Testing.debug("c1 = " + c1.getTime());
+ Testing.debug("c2 = " + c2.getTime());
+ Testing.debug("c3 = " + c3.getTime());
+ Testing.debug("c4 = " + c4.getTime());
+ assertThat(c1.getTime() % 1000).isEqualTo(0); // date only, no time
+ assertThat(c2.getTime() % 1000).isEqualTo(780);
+ assertThat(c3.getTime() % 1000).isEqualTo(780);
+ assertThat(c4.getTime() % 1000).isEqualTo(780);
+ assertThat(c1.getTime()).isEqualTo(1410134400000L);
+ assertThat(c2.getTime()).isEqualTo(64264780L);
+ assertThat(c3.getTime()).isEqualTo(1410198664780L);
+ assertThat(c4.getTime()).isEqualTo(1410198664780L);
+ // None of these Dates have timezone information, so to convert to locals we have to use our local timezone ...
+ ZoneId utc = ZoneId.of("UTC");
+ LocalDate localC1 = c1.toInstant().atZone(utc).toLocalDate();
+ LocalTime localC2 = c2.toInstant().atZone(utc).toLocalTime();
+ LocalDateTime localC3 = c3.toInstant().atZone(utc).toLocalDateTime();
+ LocalDateTime localC4 = c4.toInstant().atZone(utc).toLocalDateTime();
+ // row is ('2014-09-08', '17:51:04.78', '2014-09-08 17:51:04.78', '2014-09-08 17:51:04.78')
+ final int expectedNanos = 780 * 1000 * 1000;
+ assertThat(localC1.getYear()).isEqualTo(2014);
+ assertThat(localC1.getMonth()).isEqualTo(Month.SEPTEMBER);
+ assertThat(localC1.getDayOfMonth()).isEqualTo(8);
+ assertThat(localC2.getHour()).isEqualTo(17);
+ assertThat(localC2.getMinute()).isEqualTo(51);
+ assertThat(localC2.getSecond()).isEqualTo(4);
+ assertThat(localC2.getNano()).isEqualTo(expectedNanos);
+ assertThat(localC3.getYear()).isEqualTo(2014);
+ assertThat(localC3.getMonth()).isEqualTo(Month.SEPTEMBER);
+ assertThat(localC3.getDayOfMonth()).isEqualTo(8);
+ assertThat(localC3.getHour()).isEqualTo(17);
+ assertThat(localC3.getMinute()).isEqualTo(51);
+ assertThat(localC3.getSecond()).isEqualTo(4);
+ assertThat(localC3.getNano()).isEqualTo(expectedNanos);
+ assertThat(localC4.getYear()).isEqualTo(2014);
+ assertThat(localC4.getMonth()).isEqualTo(Month.SEPTEMBER);
+ assertThat(localC4.getDayOfMonth()).isEqualTo(8);
+ assertThat(localC4.getHour()).isEqualTo(17);
+ assertThat(localC4.getMinute()).isEqualTo(51);
+ assertThat(localC4.getSecond()).isEqualTo(4);
+ assertThat(localC4.getNano()).isEqualTo(expectedNanos);
+ }
+ });
}
}
diff --git a/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java b/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java
index 7c6b6c8f8bd..0370d747e08 100644
--- a/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java
+++ b/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java
@@ -6,6 +6,9 @@
package io.debezium.data;
import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@@ -22,7 +25,7 @@
* @author Randall Hauch
*/
public class SchemaUtil {
-
+
private SchemaUtil() {
}
@@ -228,8 +231,24 @@ public RecordWriter append(Object obj) {
}
appendAdditional("value", record.value());
sb.append('}');
+ } else if ( obj instanceof java.sql.Time ){
+ java.sql.Time time = (java.sql.Time)obj;
+ append(DateTimeFormatter.ISO_LOCAL_TIME.format(time.toLocalTime()));
+ } else if ( obj instanceof java.sql.Date ){
+ java.sql.Date date = (java.sql.Date)obj;
+ append(DateTimeFormatter.ISO_DATE.format(date.toLocalDate()));
+ } else if ( obj instanceof java.sql.Timestamp ){
+ java.sql.Timestamp ts = (java.sql.Timestamp)obj;
+ Instant instant = ts.toInstant();
+ append(DateTimeFormatter.ISO_INSTANT.format(instant));
+ } else if ( obj instanceof java.util.Date ){
+ java.util.Date date = (java.util.Date)obj;
+ append(DateTimeFormatter.ISO_INSTANT.format(date.toInstant()));
+ } else if ( obj instanceof TemporalAccessor ){
+ TemporalAccessor temporal = (TemporalAccessor)obj;
+ append(DateTimeFormatter.ISO_INSTANT.format(temporal));
} else {
- sb.append(obj.toString());
+ append(obj.toString());
}
return this;
}
diff --git a/debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java b/debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java
new file mode 100644
index 00000000000..fb594ceaa74
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.jdbc;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+
+import io.debezium.annotation.Immutable;
+
+/**
+ * An adapter that can convert {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and
+ * {@link java.sql.Timestamp} objects to {@link ZonedDateTime} instances, where the time zone in which the temporal objects
+ * were created by the database/driver can be adjusted.
+ *
+ * @author Randall Hauch
+ */
+@Immutable
+public class TimeZoneAdapter {
+
+ private static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
+ public static final ZoneId UTC = ZoneId.of("UTC");
+
+ /**
+ * Create a new adapter with UTC as the target zone and for a database that uses UTC for all temporal values.
+ *
+ * @return the new adapter
+ */
+ public static TimeZoneAdapter create() {
+ return new TimeZoneAdapter(UTC, UTC, UTC, UTC, UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the specified zone for all temporal values.
+ *
+ * @param zoneId the zone in which all temporal values are created by the database; may not be null
+ * @return the new adapter
+ */
+ public static TimeZoneAdapter originatingIn(ZoneId zoneId) {
+ return new TimeZoneAdapter(ZoneId.systemDefault(), zoneId, zoneId, zoneId, zoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that creates all temporal values in UTC.
+ *
+ * @return the new adapter
+ */
+ public static TimeZoneAdapter originatingInUtc() {
+ return originatingIn(UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that creates all temporal values in the local system time zone,
+ * which is the same time zone used by {@link java.util.Calendar#getInstance()}.
+ *
+ * @return the new adapter
+ */
+ public static TimeZoneAdapter originatingInLocal() {
+ return originatingIn(ZoneId.systemDefault()); // same as Calendar.getInstance().getTimeZone().toZoneId()
+ }
+
+ private final ZoneId targetZoneId;
+ private final ZoneId utilDateZoneId;
+ private final ZoneId sqlDateZoneId;
+ private final ZoneId sqlTimeZoneId;
+ private final ZoneId sqlTimestampZoneId;
+
+ /**
+ * Create an adapter for temporal values defined in terms of the given zone.
+ *
+ * @param targetZoneId the zone in which the output temporal values are defined; may not be null
+ * @param utilDateZoneId the zone in which {@link java.util.Date} values are defined; may not be null
+ * @param sqlDateZoneId the zone in which {@link java.sql.Date} values are defined; may not be null
+ * @param sqlTimeZoneId the zone in which {@link java.sql.Time} values are defined; may not be null
+ * @param sqlTimestampZoneId the zone in which {@link java.sql.Timestamp} values are defined; may not be null
+ */
+ protected TimeZoneAdapter(ZoneId targetZoneId, ZoneId utilDateZoneId, ZoneId sqlDateZoneId, ZoneId sqlTimeZoneId,
+ ZoneId sqlTimestampZoneId) {
+ this.targetZoneId = targetZoneId;
+ this.utilDateZoneId = utilDateZoneId;
+ this.sqlDateZoneId = sqlDateZoneId;
+ this.sqlTimeZoneId = sqlTimeZoneId;
+ this.sqlTimestampZoneId = sqlTimestampZoneId;
+ }
+
+ protected ZoneId targetZoneId() {
+ return targetZoneId;
+ }
+
+ /**
+ * Convert the specified database {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, or
+ * {@link java.sql.Timestamp} objects to a date and time in the same time zone in which the database created the
+ * value. If only {@link java.sql.Time time} information is provided in the input value, the date information will
+ * be set to the first day of the epoch. If only {@link java.sql.Date date} information is provided in the input
+ * value, the time information will be at midnight on the specified day.
+ *
+ * @param dbDate the database-generated value; may not be null
+ * @return the date time in the same zone used by the database; never null
+ */
+ public ZonedDateTime toZonedDateTime(java.util.Date dbDate) {
+ if (dbDate instanceof java.sql.Date) {
+ return toZonedDateTime((java.sql.Date) dbDate);
+ }
+ if (dbDate instanceof java.sql.Time) {
+ return toZonedDateTime((java.sql.Time) dbDate);
+ }
+ if (dbDate instanceof java.sql.Timestamp) {
+ return toZonedDateTime((java.sql.Timestamp) dbDate);
+ }
+ return dbDate.toInstant().atZone(UTC) // milliseconds is in terms of UTC
+ .withZoneSameInstant(sqlTimeZoneId) // correct value in the zone where it was created
+ .withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone
+ }
+
+ /**
+ * Convert the specified database {@link java.sql.Date} to a date (at midnight) in the same time zone in which the
+ * database created the value.
+ *
+ * @param dbDate the database-generated value; may not be null
+ * @return the date (at midnight) in the same zone used by the database; never null
+ */
+ public ZonedDateTime toZonedDateTime(java.sql.Date dbDate) {
+ long millis = dbDate.getTime();
+ Instant instant = Instant.ofEpochMilli(millis).truncatedTo(ChronoUnit.DAYS);
+ return instant.atZone(sqlDateZoneId).withZoneSameInstant(targetZoneId);
+ }
+
+ /**
+ * Convert the specified database {@link java.sql.Time} to a time (on the first epoch day) in the same time zone in which
+ * the database created the value.
+ *
+ * @param dbTime the database-generated value; may not be null
+ * @return the time (on the first epoch day) in the same zone used by the database; never null
+ */
+ public ZonedDateTime toZonedDateTime(java.sql.Time dbTime) {
+ long millis = dbTime.getTime();
+ LocalTime local = LocalTime.ofNanoOfDay(millis * 1000 * 1000);
+ return ZonedDateTime.of(EPOCH, local, UTC) // milliseconds is in terms of UTC
+ .withZoneSameInstant(sqlTimeZoneId) // correct value in the zone where it was created
+ .withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone
+ }
+
+ /**
+ * Convert the specified database {@link java.sql.Timestamp} to a timestamp in the same time zone in which
+ * the database created the value.
+ *
+ * @param dbTimestamp the database-generated value; may not be null
+ * @return the timestamp in the same zone used by the database; never null
+ */
+ public ZonedDateTime toZonedDateTime(java.sql.Timestamp dbTimestamp) {
+ return dbTimestamp.toInstant().atZone(UTC) // milliseconds is in terms of UTC
+ .withZoneSameInstant(sqlTimestampZoneId) // correct value in the zone where it was created
+ .withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone
+ }
+
+ /**
+ * Create a new adapter that produces temporal values in the specified time zone.
+ *
+ * @param zoneId the zone in which all temporal values are to be defined; may not be null
+ * @return the new adapter
+ */
+ public TimeZoneAdapter withTargetZone(ZoneId zoneId) {
+ if (targetZoneId.equals(zoneId)) return this;
+ return new TimeZoneAdapter(zoneId, utilDateZoneId, sqlDateZoneId, sqlTimeZoneId, sqlTimestampZoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the specified zone for all temporal values and this adapter's target zone.
+ *
+ * @param zoneId the zone in which all temporal values are created by the database; may not be null
+ * @return the new adapter
+ */
+ public TimeZoneAdapter withZoneForAll(ZoneId zoneId) {
+ return new TimeZoneAdapter(targetZoneId, zoneId, zoneId, zoneId, zoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
+ * zone for {@link java.util.Date} temporal values.
+ *
+ * @param zoneId the zone in which all {@link java.util.Date} values are created by the database; may not be null
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withZoneForUtilDate(ZoneId zoneId) {
+ if (utilDateZoneId.equals(zoneId)) return this;
+ return new TimeZoneAdapter(targetZoneId, zoneId, sqlDateZoneId, sqlTimeZoneId, sqlTimestampZoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
+ * zone for {@link java.sql.Date} temporal values.
+ *
+ * @param zoneId the zone in which all {@link java.sql.Date} values are created by the database; may not be null
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withZoneForSqlDate(ZoneId zoneId) {
+ if (sqlDateZoneId.equals(zoneId)) return this;
+ return new TimeZoneAdapter(targetZoneId, utilDateZoneId, zoneId, sqlTimeZoneId, sqlTimestampZoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
+ * zone for {@link java.sql.Time} temporal values.
+ *
+ * @param zoneId the zone in which all {@link java.sql.Time} values are created by the database; may not be null
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withZoneForSqlTime(ZoneId zoneId) {
+ if (sqlTimeZoneId.equals(zoneId)) return this;
+ return new TimeZoneAdapter(targetZoneId, utilDateZoneId, sqlDateZoneId, zoneId, sqlTimestampZoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
+ * zone for {@link java.sql.Timestamp} temporal values.
+ *
+ * @param zoneId the zone in which all {@link java.sql.Timestamp} values are created by the database; may not be null
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withZoneForSqlTimestamp(ZoneId zoneId) {
+ if (sqlTimestampZoneId.equals(zoneId)) return this;
+ return new TimeZoneAdapter(targetZoneId, utilDateZoneId, sqlDateZoneId, sqlTimeZoneId, zoneId);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for the target.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withUtcTargetZone() {
+ return withTargetZone(UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.util.Date} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withUtcZoneForUtilDate() {
+ return withZoneForUtilDate(UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.sql.Date} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withUtcZoneForSqlDate() {
+ return withZoneForSqlDate(UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.sql.Time} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withUtcZoneForSqlTime() {
+ return withZoneForSqlTime(UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.sql.Timestamp} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withUtcZoneForSqlTimestamp() {
+ return withZoneForSqlTimestamp(UTC);
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for the target.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withLocalTargetZone() {
+ return withTargetZone(ZoneId.systemDefault());
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.util.Date} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withLocalZoneForUtilDate() {
+ return withZoneForUtilDate(ZoneId.systemDefault());
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.sql.Date} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withLocalZoneForSqlDate() {
+ return withZoneForSqlDate(ZoneId.systemDefault());
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.sql.Time} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withLocalZoneForSqlTime() {
+ return withZoneForSqlTime(ZoneId.systemDefault());
+ }
+
+ /**
+ * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
+ * zone for {@link java.sql.Timestamp} temporal values.
+ *
+ * @return the new adapter; never null
+ */
+ public TimeZoneAdapter withLocalZoneForSqlTimestamp() {
+ return withZoneForSqlTimestamp(ZoneId.systemDefault());
+ }
+}
\ No newline at end of file
diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
index ade5ea3fb93..f03eb563261 100644
--- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
+++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java
@@ -10,11 +10,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
-import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@@ -43,6 +43,7 @@
import io.debezium.data.IsoTimestamp;
import io.debezium.data.SchemaUtil;
import io.debezium.jdbc.JdbcConnection;
+import io.debezium.jdbc.TimeZoneAdapter;
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;
@@ -76,10 +77,21 @@ public class TableSchemaBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class);
private static final LocalDate EPOCH_DAY = LocalDate.ofEpochDay(0);
+ private final TimeZoneAdapter timeZoneAdapter;
+
/**
- * Create a new instance of the builder.
+ * Create a new instance of the builder that uses the {@link TimeZoneAdapter#create() default time zone adapter}.
*/
public TableSchemaBuilder() {
+ this(TimeZoneAdapter.create());
+ }
+
+ /**
+ * Create a new instance of the builder.
+ * @param timeZoneAdapter the adapter for temporal objects created by the source database; may not be null
+ */
+ public TableSchemaBuilder(TimeZoneAdapter timeZoneAdapter) {
+ this.timeZoneAdapter = timeZoneAdapter;
}
/**
@@ -606,6 +618,7 @@ protected Object handleUnknownData(Column column, Field fieldDefn, Object data)
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
OffsetDateTime dateTime = null;
+ LOGGER.debug("TimestampWithZone: " + data + " , class=" + data.getClass());
if (data instanceof OffsetDateTime) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
dateTime = (OffsetDateTime) data;
@@ -667,6 +680,7 @@ protected OffsetDateTime unexpectedTimestampWithZone(Object value, Field fieldDe
protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
OffsetTime time = null;
+ LOGGER.debug("TimeWithZone: " + data + " , class=" + data.getClass());
if (data instanceof OffsetTime) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
time = (OffsetTime) data;
@@ -721,15 +735,10 @@ protected OffsetTime unexpectedTimeWithZone(Object value, Field fieldDefn) {
protected Object convertTimestamp(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
- if (data instanceof java.sql.Timestamp) {
- // JDBC specification indicates that this will be the canonical object for this JDBC type.
- date = (java.util.Date) data;
- } else if (data instanceof java.sql.Date) {
- // This should still work, even though it should have just date info
- date = (java.util.Date) data;
- } else if (data instanceof java.util.Date) {
- // Possible that some implementations might use this.
- date = (java.util.Date) data;
+ LOGGER.debug("Timestamp: " + data + " , class=" + data.getClass());
+ if (data instanceof java.util.Date) {
+ ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data);
+ date = java.util.Date.from(zdt.toInstant());
} else if (data instanceof java.time.LocalDate) {
// If we get a local date (no TZ info), we need to just convert to a util.Date (no TZ info) ...
java.time.LocalDate local = (java.time.LocalDate) data;
@@ -774,16 +783,10 @@ protected java.util.Date unexpectedTimestamp(Object value, Field fieldDefn) {
protected Object convertTime(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
- if (data instanceof java.sql.Time) {
- // JDBC specification indicates that this will be the canonical object for this JDBC type.
- // Contains only time info, with the date set to the epoch day ...
- date = (java.sql.Date) data;
- } else if (data instanceof java.util.Date) {
- // Possible that some implementations might use this. We ignore any date info by converting to an
- // instant and changing the date to the epoch date, and finally creating a new java.util.Date ...
- date = (java.util.Date) data;
- Instant instant = Instant.ofEpochMilli(date.getTime()).with(ChronoField.EPOCH_DAY, 0);
- date = new java.util.Date(instant.toEpochMilli());
+ LOGGER.debug("Time: " + data + " , class=" + data.getClass());
+ if (data instanceof java.util.Date) {
+ ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data);
+ date = java.util.Date.from(zdt.toInstant());
} else if (data instanceof java.time.LocalTime) {
// If we get a local time (no TZ info), we need to just convert to a util.Date (no TZ info) ...
java.time.LocalTime local = (java.time.LocalTime) data;
@@ -828,21 +831,10 @@ protected java.util.Date unexpectedTime(Object value, Field fieldDefn) {
protected Object convertDate(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
- if (data instanceof java.sql.Date) {
- // JDBC specification indicates that this will be the nominal object for this JDBC type.
- // Contains only date info, with all time values set to all zeros (e.g. midnight).
- // However, the java.sql.Date object *may* contain timezone information for some DBMS+Driver combinations.
- // Therefore, first convert it to a local LocalDate, then to a LocalDateTime at midnight, and then to an
- // instant in UTC ...
- java.sql.Date sqlDate = (java.sql.Date) data;
- LocalDate localDate = sqlDate.toLocalDate();
- date = java.util.Date.from(localDate.atStartOfDay().toInstant(ZoneOffset.UTC));
- } else if (data instanceof java.util.Date) {
- // Possible that some implementations might use this. We should be prepared to ignore any time,
- // information by truncating to days and creating a new java.util.Date ...
- date = (java.util.Date) data;
- Instant instant = Instant.ofEpochMilli(date.getTime()).truncatedTo(ChronoUnit.DAYS);
- date = new java.util.Date(instant.toEpochMilli());
+ LOGGER.debug("Date: " + data + " , class=" + data.getClass());
+ if (data instanceof java.util.Date) {
+ ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data);
+ date = java.util.Date.from(zdt.toInstant());
} else if (data instanceof java.time.LocalDate) {
// If we get a local date (no TZ info), we need to just convert to a util.Date (no TZ info) ...
java.time.LocalDate local = (java.time.LocalDate) data;
diff --git a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java
index ec3d36ac237..b06cb83f206 100644
--- a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java
+++ b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java
@@ -207,8 +207,12 @@ public static void isValidTombstone(SourceRecord record, String pkField, int pk)
*/
public static void schemaMatchesStruct(SchemaAndValue value) {
Object val = value.value();
- assertThat(val).isInstanceOf(Struct.class);
- fieldsInSchema((Struct) val, value.schema());
+ if (val == null) {
+ assertThat(value.schema()).isNull();
+ } else {
+ assertThat(val).isInstanceOf(Struct.class);
+ fieldsInSchema((Struct) val, value.schema());
+ }
}
/**
@@ -327,9 +331,9 @@ public static void isValid(SourceRecord record) {
msg = "deserializing value using JSON converter";
valueWithSchema = valueJsonConverter.toConnectData(record.topic(), valueBytes);
msg = "comparing value schema to that serialized/deserialized with JSON converter";
- assertEquals(valueWithSchema.schema(),record.valueSchema());
+ assertEquals(valueWithSchema.schema(), record.valueSchema());
msg = "comparing value to that serialized/deserialized with JSON converter";
- assertEquals(valueWithSchema.value(),record.value());
+ assertEquals(valueWithSchema.value(), record.value());
msg = "comparing value to its schema";
schemaMatchesStruct(valueWithSchema);
@@ -339,9 +343,9 @@ public static void isValid(SourceRecord record) {
msg = "deserializing key using Avro converter";
avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes);
msg = "comparing key schema to that serialized/deserialized with Avro converter";
- assertEquals(keyWithSchema.schema(),record.keySchema());
+ assertEquals(keyWithSchema.schema(), record.keySchema());
msg = "comparing key to that serialized/deserialized with Avro converter";
- assertEquals(keyWithSchema.value(),record.key());
+ assertEquals(keyWithSchema.value(), record.key());
msg = "comparing key to its schema";
schemaMatchesStruct(keyWithSchema);
@@ -351,9 +355,9 @@ public static void isValid(SourceRecord record) {
msg = "deserializing value using Avro converter";
avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes);
msg = "comparing value schema to that serialized/deserialized with Avro converter";
- assertEquals(valueWithSchema.schema(),record.valueSchema());
+ assertEquals(valueWithSchema.schema(), record.valueSchema());
msg = "comparing value to that serialized/deserialized with Avro converter";
- assertEquals(valueWithSchema.value(),record.value());
+ assertEquals(valueWithSchema.value(), record.value());
msg = "comparing value to its schema";
schemaMatchesStruct(valueWithSchema);
@@ -427,65 +431,65 @@ protected static String prettyJson(JsonNode json) {
return null;
}
}
-
+
// The remaining methods are needed simply because of the KAFKA-3803, so our comparisons cannot rely upon Struct.equals
-
- protected static void assertEquals( Object o1, Object o2 ) {
+
+ protected static void assertEquals(Object o1, Object o2) {
// assertThat(o1).isEqualTo(o2);
- if ( !equals(o1,o2) ) {
+ if (!equals(o1, o2)) {
fail(SchemaUtil.asString(o1) + " was not equal to " + SchemaUtil.asString(o2));
}
}
-
+
@SuppressWarnings("unchecked")
- protected static boolean equals( Object o1, Object o2 ) {
- if ( o1 == o2 ) return true;
+ protected static boolean equals(Object o1, Object o2) {
+ if (o1 == o2) return true;
if (o1 == null) return o2 == null ? true : false;
- if (o2 == null ) return false;
- if ( o1 instanceof ByteBuffer ) {
- o1 = ((ByteBuffer)o1).array();
+ if (o2 == null) return false;
+ if (o1 instanceof ByteBuffer) {
+ o1 = ((ByteBuffer) o1).array();
}
- if ( o2 instanceof ByteBuffer ) {
- o2 = ((ByteBuffer)o2).array();
+ if (o2 instanceof ByteBuffer) {
+ o2 = ((ByteBuffer) o2).array();
}
- if ( o1 instanceof byte[] && o2 instanceof byte[] ) {
- boolean result = Arrays.equals((byte[])o1,(byte[])o2);
+ if (o1 instanceof byte[] && o2 instanceof byte[]) {
+ boolean result = Arrays.equals((byte[]) o1, (byte[]) o2);
return result;
}
- if ( o1 instanceof Object[] && o2 instanceof Object[] ) {
- boolean result = deepEquals((Object[])o1,(Object[])o2);
+ if (o1 instanceof Object[] && o2 instanceof Object[]) {
+ boolean result = deepEquals((Object[]) o1, (Object[]) o2);
return result;
}
- if ( o1 instanceof Map && o2 instanceof Map ) {
- Map m1 = (Map)o1;
- Map m2 = (Map)o2;
- if ( !m1.keySet().equals(m2.keySet())) return false;
- for ( Map.Entry entry : m1.entrySet()) {
+ if (o1 instanceof Map && o2 instanceof Map) {
+ Map m1 = (Map) o1;
+ Map m2 = (Map) o2;
+ if (!m1.keySet().equals(m2.keySet())) return false;
+ for (Map.Entry entry : m1.entrySet()) {
Object v1 = entry.getValue();
Object v2 = m2.get(entry.getKey());
- if ( !equals(v1,v2) ) return false;
+ if (!equals(v1, v2)) return false;
}
return true;
}
- if ( o1 instanceof Collection && o2 instanceof Collection ) {
- Collection