From b4b20d1fe7a5989fdfb8325cb9b298f67d37943d Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Tue, 19 Jul 2016 14:04:42 -0500 Subject: [PATCH] DBZ-85 Added test case and made correction to temporal values Added an integration test case to diagnose the loss of the fractional seconds from MySQL temporal values. The problem appears to be a bug in the MySQL Binary Log Connector library that we used, and this bug was reported as https://github.com/shyiko/mysql-binlog-connector-java/issues/103. That was fixed in version 0.3.2 of the library, which Stanley was kind enough to release for us. During testing, though, several issues were discovered in how temporal values are handled and converted from the MySQL events, through the MySQL Binary Log client library, and through the Debezium MySQL connector to conform with Kafka Connect's various temporal logical schema types. Most of the issues involved converting most of the temporal values from local time zone (which is how they are created by the MySQL Binary Log client) into UTC (which is how Kafka Connect expects them). Really, java.util.Date doesn't have time zone information and instead tracks the number of milliseconds past epoch, but the conversion of normal timestamp information to the milliseconds past epoch in UTC depends on the time zone in which that conversion happens. --- .../debezium/connector/mysql/MySqlSchema.java | 42 ++- .../src/test/docker/init/setup.sql | 9 + .../mysql/MySqlConnectorRegressionIT.java | 68 +++- .../java/io/debezium/data/SchemaUtil.java | 23 +- .../io/debezium/jdbc/TimeZoneAdapter.java | 330 ++++++++++++++++++ .../relational/TableSchemaBuilder.java | 64 ++-- .../java/io/debezium/data/VerifyRecord.java | 92 ++--- .../io/debezium/jdbc/TimeZoneAdapterTest.java | 208 +++++++++++ pom.xml | 2 +- 9 files changed, 743 insertions(+), 95 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java create mode 100644 debezium-core/src/test/java/io/debezium/jdbc/TimeZoneAdapterTest.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 874bc4074f1..df014fe6614 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -11,14 +11,20 @@ import java.util.Set; import java.util.concurrent.Callable; +import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; + import io.debezium.annotation.NotThreadSafe; import io.debezium.config.Configuration; import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.TimeZoneAdapter; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchema; @@ -68,6 +74,18 @@ public class MySqlSchema { /** * Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}. + *

+ * This component sets up a {@link TimeZoneAdapter} that is specific to how the MySQL Binary Log client library + * works. The {@link AbstractRowsEventDataDeserializer} class has various methods to instantiate the + * {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and {@link java.sql.Timestamp} temporal values, + * where the values for {@link java.util.Date}, {@link java.sql.Date}, and {@link java.sql.Time} are all in terms of + * the local time zone (since it uses {@link java.util.Calendar#getInstance()}), but where the + * {@link java.sql.Timestamp} values are created differently using the milliseconds past epoch and therefore in terms of + * the UTC time zone. + *

+ * And, because Kafka Connect {@link Time}, {@link Date}, and {@link Timestamp} logical + * schema types all expect the {@link java.util.Date} to be in terms of the UTC time zone, the + * {@link TimeZoneAdapter} also needs to produce {@link java.util.Date} values that will be correct in UTC. * * @param config the connector configuration, which is presumed to be valid * @param serverName the name of the server @@ -78,10 +96,18 @@ public MySqlSchema(Configuration config, String serverName) { this.tables = new Tables(); this.ddlChanges = new DdlChanges(this.ddlParser.terminator()); this.ddlParser.addListener(ddlChanges); - this.schemaBuilder = new TableSchemaBuilder(); - if ( serverName != null ) serverName = serverName.trim(); + + // Specific to how the MySQL Binary Log client library creates temporal values ... + TimeZoneAdapter tzAdapter = TimeZoneAdapter.create() + .withLocalZoneForUtilDate() + .withLocalZoneForSqlDate() + .withLocalZoneForSqlTime() + .withUtcZoneForSqlTimestamp() + .withUtcTargetZone(); + this.schemaBuilder = new TableSchemaBuilder(tzAdapter); + if (serverName != null) serverName = serverName.trim(); this.serverName = serverName; - if ( this.serverName == null || serverName.isEmpty() ) { + if (this.serverName == null || serverName.isEmpty()) { this.schemaPrefix = ""; } else { this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + "."; @@ -95,7 +121,7 @@ public MySqlSchema(Configuration config, String serverName) { } // Do not remove the prefix from the subset of config properties ... Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false); - this.dbHistory.configure(dbHistoryConfig,HISTORY_COMPARATOR); // validates + this.dbHistory.configure(dbHistoryConfig, HISTORY_COMPARATOR); // validates } /** @@ -210,8 +236,8 @@ protected void changeTablesAndRecordInHistory(SourceInfo source, Callable changeFunction.call(); } catch (Exception e) { this.tables = copy; - if ( e instanceof SQLException) throw (SQLException)e; - this.logger.error("Unexpected error whle changing model of MySQL schemas: {}",e.getMessage(),e); + if (e instanceof SQLException) throw (SQLException) e; + this.logger.error("Unexpected error whle changing model of MySQL schemas: {}", e.getMessage(), e); } // At least one table has changed or was removed, so first refresh the Kafka Connect schemas ... @@ -307,12 +333,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem // to the same _affected_ database... ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> { if (filters.databaseFilter().test(dbName)) { - if ( dbName == null ) dbName = ""; + if (dbName == null) dbName = ""; statementConsumer.consume(dbName, ddlStatements); } }); } else if (filters.databaseFilter().test(databaseName)) { - if ( databaseName == null ) databaseName = ""; + if (databaseName == null) databaseName = ""; statementConsumer.consume(databaseName, ddlStatements); } } diff --git a/debezium-connector-mysql/src/test/docker/init/setup.sql b/debezium-connector-mysql/src/test/docker/init/setup.sql index 1137e917f4c..892443ae190 100644 --- a/debezium-connector-mysql/src/test/docker/init/setup.sql +++ b/debezium-connector-mysql/src/test/docker/init/setup.sql @@ -220,3 +220,12 @@ CREATE TABLE dbz84_integer_types_table ( ); INSERT INTO dbz84_integer_types_table VALUES(127,-128,128,255, default,201,202,203, default,301,302,303, default,401,402,403, default,501,502,503); + +-- DBZ-85 handle fractional part of seconds +CREATE TABLE dbz_85_fractest ( + c1 DATE, + c2 TIME(2), + c3 DATETIME(2), + c4 TIMESTAMP(2) +); +INSERT INTO dbz_85_fractest VALUES ('2014-09-08', '17:51:04.777', '2014-09-08 17:51:04.777', '2014-09-08 17:51:04.777'); \ No newline at end of file diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index 9fc98c41b4c..654e191f673 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -7,7 +7,13 @@ import java.nio.file.Path; import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.Month; +import java.time.ZoneId; +import org.apache.kafka.connect.data.Struct; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -16,6 +22,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; +import io.debezium.data.Envelope; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.relational.history.FileDatabaseHistory; import io.debezium.util.Testing; @@ -70,21 +77,74 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- // Testing.Debug.enable(); - SourceRecords records = consumeRecordsByTopic(3 + 2); // 3 schema change record, 2 inserts + SourceRecords records = consumeRecordsByTopic(4 + 3); // 4 schema change record, 3 inserts stopConnector(); assertThat(records).isNotNull(); - assertThat(records.recordsForTopic("regression").size()).isEqualTo(3); + assertThat(records.recordsForTopic("regression").size()).isEqualTo(4); assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1); - assertThat(records.topics().size()).isEqualTo(3); + assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(4); assertThat(records.databaseNames().size()).isEqualTo(1); - assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(3); + assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(4); assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); records.ddlRecordsForDatabase("regression_test").forEach(this::print); // Check that all records are valid, can be serialized and deserialized ... records.forEach(this::validate); + records.forEach(record->{ + Struct value = (Struct)record.value(); + if ( record.topic().endsWith("dbz_85_fractest")) { + // The microseconds of all three should be exactly 780 + Struct after = value.getStruct(Envelope.FieldName.AFTER); + java.util.Date c1 = (java.util.Date)after.get("c1"); + java.util.Date c2 = (java.util.Date)after.get("c2"); + java.util.Date c3 = (java.util.Date)after.get("c3"); + java.util.Date c4 = (java.util.Date)after.get("c4"); + Testing.debug("c1 = " + c1.getTime()); + Testing.debug("c2 = " + c2.getTime()); + Testing.debug("c3 = " + c3.getTime()); + Testing.debug("c4 = " + c4.getTime()); + assertThat(c1.getTime() % 1000).isEqualTo(0); // date only, no time + assertThat(c2.getTime() % 1000).isEqualTo(780); + assertThat(c3.getTime() % 1000).isEqualTo(780); + assertThat(c4.getTime() % 1000).isEqualTo(780); + assertThat(c1.getTime()).isEqualTo(1410134400000L); + assertThat(c2.getTime()).isEqualTo(64264780L); + assertThat(c3.getTime()).isEqualTo(1410198664780L); + assertThat(c4.getTime()).isEqualTo(1410198664780L); + // None of these Dates have timezone information, so to convert to locals we have to use our local timezone ... + ZoneId utc = ZoneId.of("UTC"); + LocalDate localC1 = c1.toInstant().atZone(utc).toLocalDate(); + LocalTime localC2 = c2.toInstant().atZone(utc).toLocalTime(); + LocalDateTime localC3 = c3.toInstant().atZone(utc).toLocalDateTime(); + LocalDateTime localC4 = c4.toInstant().atZone(utc).toLocalDateTime(); + // row is ('2014-09-08', '17:51:04.78', '2014-09-08 17:51:04.78', '2014-09-08 17:51:04.78') + final int expectedNanos = 780 * 1000 * 1000; + assertThat(localC1.getYear()).isEqualTo(2014); + assertThat(localC1.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(localC1.getDayOfMonth()).isEqualTo(8); + assertThat(localC2.getHour()).isEqualTo(17); + assertThat(localC2.getMinute()).isEqualTo(51); + assertThat(localC2.getSecond()).isEqualTo(4); + assertThat(localC2.getNano()).isEqualTo(expectedNanos); + assertThat(localC3.getYear()).isEqualTo(2014); + assertThat(localC3.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(localC3.getDayOfMonth()).isEqualTo(8); + assertThat(localC3.getHour()).isEqualTo(17); + assertThat(localC3.getMinute()).isEqualTo(51); + assertThat(localC3.getSecond()).isEqualTo(4); + assertThat(localC3.getNano()).isEqualTo(expectedNanos); + assertThat(localC4.getYear()).isEqualTo(2014); + assertThat(localC4.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(localC4.getDayOfMonth()).isEqualTo(8); + assertThat(localC4.getHour()).isEqualTo(17); + assertThat(localC4.getMinute()).isEqualTo(51); + assertThat(localC4.getSecond()).isEqualTo(4); + assertThat(localC4.getNano()).isEqualTo(expectedNanos); + } + }); } } diff --git a/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java b/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java index 7c6b6c8f8bd..0370d747e08 100644 --- a/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java +++ b/debezium-core/src/main/java/io/debezium/data/SchemaUtil.java @@ -6,6 +6,9 @@ package io.debezium.data; import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAccessor; import java.util.Base64; import java.util.List; import java.util.Map; @@ -22,7 +25,7 @@ * @author Randall Hauch */ public class SchemaUtil { - + private SchemaUtil() { } @@ -228,8 +231,24 @@ public RecordWriter append(Object obj) { } appendAdditional("value", record.value()); sb.append('}'); + } else if ( obj instanceof java.sql.Time ){ + java.sql.Time time = (java.sql.Time)obj; + append(DateTimeFormatter.ISO_LOCAL_TIME.format(time.toLocalTime())); + } else if ( obj instanceof java.sql.Date ){ + java.sql.Date date = (java.sql.Date)obj; + append(DateTimeFormatter.ISO_DATE.format(date.toLocalDate())); + } else if ( obj instanceof java.sql.Timestamp ){ + java.sql.Timestamp ts = (java.sql.Timestamp)obj; + Instant instant = ts.toInstant(); + append(DateTimeFormatter.ISO_INSTANT.format(instant)); + } else if ( obj instanceof java.util.Date ){ + java.util.Date date = (java.util.Date)obj; + append(DateTimeFormatter.ISO_INSTANT.format(date.toInstant())); + } else if ( obj instanceof TemporalAccessor ){ + TemporalAccessor temporal = (TemporalAccessor)obj; + append(DateTimeFormatter.ISO_INSTANT.format(temporal)); } else { - sb.append(obj.toString()); + append(obj.toString()); } return this; } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java b/debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java new file mode 100644 index 00000000000..fb594ceaa74 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/jdbc/TimeZoneAdapter.java @@ -0,0 +1,330 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.jdbc; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; + +import io.debezium.annotation.Immutable; + +/** + * An adapter that can convert {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and + * {@link java.sql.Timestamp} objects to {@link ZonedDateTime} instances, where the time zone in which the temporal objects + * were created by the database/driver can be adjusted. + * + * @author Randall Hauch + */ +@Immutable +public class TimeZoneAdapter { + + private static final LocalDate EPOCH = LocalDate.ofEpochDay(0); + public static final ZoneId UTC = ZoneId.of("UTC"); + + /** + * Create a new adapter with UTC as the target zone and for a database that uses UTC for all temporal values. + * + * @return the new adapter + */ + public static TimeZoneAdapter create() { + return new TimeZoneAdapter(UTC, UTC, UTC, UTC, UTC); + } + + /** + * Create a new adapter for a database that uses the specified zone for all temporal values. + * + * @param zoneId the zone in which all temporal values are created by the database; may not be null + * @return the new adapter + */ + public static TimeZoneAdapter originatingIn(ZoneId zoneId) { + return new TimeZoneAdapter(ZoneId.systemDefault(), zoneId, zoneId, zoneId, zoneId); + } + + /** + * Create a new adapter for a database that creates all temporal values in UTC. + * + * @return the new adapter + */ + public static TimeZoneAdapter originatingInUtc() { + return originatingIn(UTC); + } + + /** + * Create a new adapter for a database that creates all temporal values in the local system time zone, + * which is the same time zone used by {@link java.util.Calendar#getInstance()}. + * + * @return the new adapter + */ + public static TimeZoneAdapter originatingInLocal() { + return originatingIn(ZoneId.systemDefault()); // same as Calendar.getInstance().getTimeZone().toZoneId() + } + + private final ZoneId targetZoneId; + private final ZoneId utilDateZoneId; + private final ZoneId sqlDateZoneId; + private final ZoneId sqlTimeZoneId; + private final ZoneId sqlTimestampZoneId; + + /** + * Create an adapter for temporal values defined in terms of the given zone. + * + * @param targetZoneId the zone in which the output temporal values are defined; may not be null + * @param utilDateZoneId the zone in which {@link java.util.Date} values are defined; may not be null + * @param sqlDateZoneId the zone in which {@link java.sql.Date} values are defined; may not be null + * @param sqlTimeZoneId the zone in which {@link java.sql.Time} values are defined; may not be null + * @param sqlTimestampZoneId the zone in which {@link java.sql.Timestamp} values are defined; may not be null + */ + protected TimeZoneAdapter(ZoneId targetZoneId, ZoneId utilDateZoneId, ZoneId sqlDateZoneId, ZoneId sqlTimeZoneId, + ZoneId sqlTimestampZoneId) { + this.targetZoneId = targetZoneId; + this.utilDateZoneId = utilDateZoneId; + this.sqlDateZoneId = sqlDateZoneId; + this.sqlTimeZoneId = sqlTimeZoneId; + this.sqlTimestampZoneId = sqlTimestampZoneId; + } + + protected ZoneId targetZoneId() { + return targetZoneId; + } + + /** + * Convert the specified database {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, or + * {@link java.sql.Timestamp} objects to a date and time in the same time zone in which the database created the + * value. If only {@link java.sql.Time time} information is provided in the input value, the date information will + * be set to the first day of the epoch. If only {@link java.sql.Date date} information is provided in the input + * value, the time information will be at midnight on the specified day. + * + * @param dbDate the database-generated value; may not be null + * @return the date time in the same zone used by the database; never null + */ + public ZonedDateTime toZonedDateTime(java.util.Date dbDate) { + if (dbDate instanceof java.sql.Date) { + return toZonedDateTime((java.sql.Date) dbDate); + } + if (dbDate instanceof java.sql.Time) { + return toZonedDateTime((java.sql.Time) dbDate); + } + if (dbDate instanceof java.sql.Timestamp) { + return toZonedDateTime((java.sql.Timestamp) dbDate); + } + return dbDate.toInstant().atZone(UTC) // milliseconds is in terms of UTC + .withZoneSameInstant(sqlTimeZoneId) // correct value in the zone where it was created + .withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone + } + + /** + * Convert the specified database {@link java.sql.Date} to a date (at midnight) in the same time zone in which the + * database created the value. + * + * @param dbDate the database-generated value; may not be null + * @return the date (at midnight) in the same zone used by the database; never null + */ + public ZonedDateTime toZonedDateTime(java.sql.Date dbDate) { + long millis = dbDate.getTime(); + Instant instant = Instant.ofEpochMilli(millis).truncatedTo(ChronoUnit.DAYS); + return instant.atZone(sqlDateZoneId).withZoneSameInstant(targetZoneId); + } + + /** + * Convert the specified database {@link java.sql.Time} to a time (on the first epoch day) in the same time zone in which + * the database created the value. + * + * @param dbTime the database-generated value; may not be null + * @return the time (on the first epoch day) in the same zone used by the database; never null + */ + public ZonedDateTime toZonedDateTime(java.sql.Time dbTime) { + long millis = dbTime.getTime(); + LocalTime local = LocalTime.ofNanoOfDay(millis * 1000 * 1000); + return ZonedDateTime.of(EPOCH, local, UTC) // milliseconds is in terms of UTC + .withZoneSameInstant(sqlTimeZoneId) // correct value in the zone where it was created + .withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone + } + + /** + * Convert the specified database {@link java.sql.Timestamp} to a timestamp in the same time zone in which + * the database created the value. + * + * @param dbTimestamp the database-generated value; may not be null + * @return the timestamp in the same zone used by the database; never null + */ + public ZonedDateTime toZonedDateTime(java.sql.Timestamp dbTimestamp) { + return dbTimestamp.toInstant().atZone(UTC) // milliseconds is in terms of UTC + .withZoneSameInstant(sqlTimestampZoneId) // correct value in the zone where it was created + .withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone + } + + /** + * Create a new adapter that produces temporal values in the specified time zone. + * + * @param zoneId the zone in which all temporal values are to be defined; may not be null + * @return the new adapter + */ + public TimeZoneAdapter withTargetZone(ZoneId zoneId) { + if (targetZoneId.equals(zoneId)) return this; + return new TimeZoneAdapter(zoneId, utilDateZoneId, sqlDateZoneId, sqlTimeZoneId, sqlTimestampZoneId); + } + + /** + * Create a new adapter for a database that uses the specified zone for all temporal values and this adapter's target zone. + * + * @param zoneId the zone in which all temporal values are created by the database; may not be null + * @return the new adapter + */ + public TimeZoneAdapter withZoneForAll(ZoneId zoneId) { + return new TimeZoneAdapter(targetZoneId, zoneId, zoneId, zoneId, zoneId); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified + * zone for {@link java.util.Date} temporal values. + * + * @param zoneId the zone in which all {@link java.util.Date} values are created by the database; may not be null + * @return the new adapter; never null + */ + public TimeZoneAdapter withZoneForUtilDate(ZoneId zoneId) { + if (utilDateZoneId.equals(zoneId)) return this; + return new TimeZoneAdapter(targetZoneId, zoneId, sqlDateZoneId, sqlTimeZoneId, sqlTimestampZoneId); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified + * zone for {@link java.sql.Date} temporal values. + * + * @param zoneId the zone in which all {@link java.sql.Date} values are created by the database; may not be null + * @return the new adapter; never null + */ + public TimeZoneAdapter withZoneForSqlDate(ZoneId zoneId) { + if (sqlDateZoneId.equals(zoneId)) return this; + return new TimeZoneAdapter(targetZoneId, utilDateZoneId, zoneId, sqlTimeZoneId, sqlTimestampZoneId); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified + * zone for {@link java.sql.Time} temporal values. + * + * @param zoneId the zone in which all {@link java.sql.Time} values are created by the database; may not be null + * @return the new adapter; never null + */ + public TimeZoneAdapter withZoneForSqlTime(ZoneId zoneId) { + if (sqlTimeZoneId.equals(zoneId)) return this; + return new TimeZoneAdapter(targetZoneId, utilDateZoneId, sqlDateZoneId, zoneId, sqlTimestampZoneId); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified + * zone for {@link java.sql.Timestamp} temporal values. + * + * @param zoneId the zone in which all {@link java.sql.Timestamp} values are created by the database; may not be null + * @return the new adapter; never null + */ + public TimeZoneAdapter withZoneForSqlTimestamp(ZoneId zoneId) { + if (sqlTimestampZoneId.equals(zoneId)) return this; + return new TimeZoneAdapter(targetZoneId, utilDateZoneId, sqlDateZoneId, sqlTimeZoneId, zoneId); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for the target. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withUtcTargetZone() { + return withTargetZone(UTC); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.util.Date} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withUtcZoneForUtilDate() { + return withZoneForUtilDate(UTC); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.sql.Date} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withUtcZoneForSqlDate() { + return withZoneForSqlDate(UTC); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.sql.Time} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withUtcZoneForSqlTime() { + return withZoneForSqlTime(UTC); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.sql.Timestamp} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withUtcZoneForSqlTimestamp() { + return withZoneForSqlTimestamp(UTC); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for the target. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withLocalTargetZone() { + return withTargetZone(ZoneId.systemDefault()); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.util.Date} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withLocalZoneForUtilDate() { + return withZoneForUtilDate(ZoneId.systemDefault()); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.sql.Date} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withLocalZoneForSqlDate() { + return withZoneForSqlDate(ZoneId.systemDefault()); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.sql.Time} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withLocalZoneForSqlTime() { + return withZoneForSqlTime(ZoneId.systemDefault()); + } + + /** + * Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC + * zone for {@link java.sql.Timestamp} temporal values. + * + * @return the new adapter; never null + */ + public TimeZoneAdapter withLocalZoneForSqlTimestamp() { + return withZoneForSqlTimestamp(ZoneId.systemDefault()); + } +} \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index ade5ea3fb93..f03eb563261 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -10,11 +10,11 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; -import java.time.Instant; import java.time.LocalDate; import java.time.OffsetDateTime; import java.time.OffsetTime; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.temporal.ChronoField; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -43,6 +43,7 @@ import io.debezium.data.IsoTimestamp; import io.debezium.data.SchemaUtil; import io.debezium.jdbc.JdbcConnection; +import io.debezium.jdbc.TimeZoneAdapter; import io.debezium.relational.mapping.ColumnMapper; import io.debezium.relational.mapping.ColumnMappers; @@ -76,10 +77,21 @@ public class TableSchemaBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class); private static final LocalDate EPOCH_DAY = LocalDate.ofEpochDay(0); + private final TimeZoneAdapter timeZoneAdapter; + /** - * Create a new instance of the builder. + * Create a new instance of the builder that uses the {@link TimeZoneAdapter#create() default time zone adapter}. */ public TableSchemaBuilder() { + this(TimeZoneAdapter.create()); + } + + /** + * Create a new instance of the builder. + * @param timeZoneAdapter the adapter for temporal objects created by the source database; may not be null + */ + public TableSchemaBuilder(TimeZoneAdapter timeZoneAdapter) { + this.timeZoneAdapter = timeZoneAdapter; } /** @@ -606,6 +618,7 @@ protected Object handleUnknownData(Column column, Field fieldDefn, Object data) protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) { if (data == null) return null; OffsetDateTime dateTime = null; + LOGGER.debug("TimestampWithZone: " + data + " , class=" + data.getClass()); if (data instanceof OffsetDateTime) { // JDBC specification indicates that this will be the canonical object for this JDBC type. dateTime = (OffsetDateTime) data; @@ -667,6 +680,7 @@ protected OffsetDateTime unexpectedTimestampWithZone(Object value, Field fieldDe protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data) { if (data == null) return null; OffsetTime time = null; + LOGGER.debug("TimeWithZone: " + data + " , class=" + data.getClass()); if (data instanceof OffsetTime) { // JDBC specification indicates that this will be the canonical object for this JDBC type. time = (OffsetTime) data; @@ -721,15 +735,10 @@ protected OffsetTime unexpectedTimeWithZone(Object value, Field fieldDefn) { protected Object convertTimestamp(Column column, Field fieldDefn, Object data) { if (data == null) return null; java.util.Date date = null; - if (data instanceof java.sql.Timestamp) { - // JDBC specification indicates that this will be the canonical object for this JDBC type. - date = (java.util.Date) data; - } else if (data instanceof java.sql.Date) { - // This should still work, even though it should have just date info - date = (java.util.Date) data; - } else if (data instanceof java.util.Date) { - // Possible that some implementations might use this. - date = (java.util.Date) data; + LOGGER.debug("Timestamp: " + data + " , class=" + data.getClass()); + if (data instanceof java.util.Date) { + ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data); + date = java.util.Date.from(zdt.toInstant()); } else if (data instanceof java.time.LocalDate) { // If we get a local date (no TZ info), we need to just convert to a util.Date (no TZ info) ... java.time.LocalDate local = (java.time.LocalDate) data; @@ -774,16 +783,10 @@ protected java.util.Date unexpectedTimestamp(Object value, Field fieldDefn) { protected Object convertTime(Column column, Field fieldDefn, Object data) { if (data == null) return null; java.util.Date date = null; - if (data instanceof java.sql.Time) { - // JDBC specification indicates that this will be the canonical object for this JDBC type. - // Contains only time info, with the date set to the epoch day ... - date = (java.sql.Date) data; - } else if (data instanceof java.util.Date) { - // Possible that some implementations might use this. We ignore any date info by converting to an - // instant and changing the date to the epoch date, and finally creating a new java.util.Date ... - date = (java.util.Date) data; - Instant instant = Instant.ofEpochMilli(date.getTime()).with(ChronoField.EPOCH_DAY, 0); - date = new java.util.Date(instant.toEpochMilli()); + LOGGER.debug("Time: " + data + " , class=" + data.getClass()); + if (data instanceof java.util.Date) { + ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data); + date = java.util.Date.from(zdt.toInstant()); } else if (data instanceof java.time.LocalTime) { // If we get a local time (no TZ info), we need to just convert to a util.Date (no TZ info) ... java.time.LocalTime local = (java.time.LocalTime) data; @@ -828,21 +831,10 @@ protected java.util.Date unexpectedTime(Object value, Field fieldDefn) { protected Object convertDate(Column column, Field fieldDefn, Object data) { if (data == null) return null; java.util.Date date = null; - if (data instanceof java.sql.Date) { - // JDBC specification indicates that this will be the nominal object for this JDBC type. - // Contains only date info, with all time values set to all zeros (e.g. midnight). - // However, the java.sql.Date object *may* contain timezone information for some DBMS+Driver combinations. - // Therefore, first convert it to a local LocalDate, then to a LocalDateTime at midnight, and then to an - // instant in UTC ... - java.sql.Date sqlDate = (java.sql.Date) data; - LocalDate localDate = sqlDate.toLocalDate(); - date = java.util.Date.from(localDate.atStartOfDay().toInstant(ZoneOffset.UTC)); - } else if (data instanceof java.util.Date) { - // Possible that some implementations might use this. We should be prepared to ignore any time, - // information by truncating to days and creating a new java.util.Date ... - date = (java.util.Date) data; - Instant instant = Instant.ofEpochMilli(date.getTime()).truncatedTo(ChronoUnit.DAYS); - date = new java.util.Date(instant.toEpochMilli()); + LOGGER.debug("Date: " + data + " , class=" + data.getClass()); + if (data instanceof java.util.Date) { + ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data); + date = java.util.Date.from(zdt.toInstant()); } else if (data instanceof java.time.LocalDate) { // If we get a local date (no TZ info), we need to just convert to a util.Date (no TZ info) ... java.time.LocalDate local = (java.time.LocalDate) data; diff --git a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java index ec3d36ac237..b06cb83f206 100644 --- a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java +++ b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java @@ -207,8 +207,12 @@ public static void isValidTombstone(SourceRecord record, String pkField, int pk) */ public static void schemaMatchesStruct(SchemaAndValue value) { Object val = value.value(); - assertThat(val).isInstanceOf(Struct.class); - fieldsInSchema((Struct) val, value.schema()); + if (val == null) { + assertThat(value.schema()).isNull(); + } else { + assertThat(val).isInstanceOf(Struct.class); + fieldsInSchema((Struct) val, value.schema()); + } } /** @@ -327,9 +331,9 @@ public static void isValid(SourceRecord record) { msg = "deserializing value using JSON converter"; valueWithSchema = valueJsonConverter.toConnectData(record.topic(), valueBytes); msg = "comparing value schema to that serialized/deserialized with JSON converter"; - assertEquals(valueWithSchema.schema(),record.valueSchema()); + assertEquals(valueWithSchema.schema(), record.valueSchema()); msg = "comparing value to that serialized/deserialized with JSON converter"; - assertEquals(valueWithSchema.value(),record.value()); + assertEquals(valueWithSchema.value(), record.value()); msg = "comparing value to its schema"; schemaMatchesStruct(valueWithSchema); @@ -339,9 +343,9 @@ public static void isValid(SourceRecord record) { msg = "deserializing key using Avro converter"; avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes); msg = "comparing key schema to that serialized/deserialized with Avro converter"; - assertEquals(keyWithSchema.schema(),record.keySchema()); + assertEquals(keyWithSchema.schema(), record.keySchema()); msg = "comparing key to that serialized/deserialized with Avro converter"; - assertEquals(keyWithSchema.value(),record.key()); + assertEquals(keyWithSchema.value(), record.key()); msg = "comparing key to its schema"; schemaMatchesStruct(keyWithSchema); @@ -351,9 +355,9 @@ public static void isValid(SourceRecord record) { msg = "deserializing value using Avro converter"; avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes); msg = "comparing value schema to that serialized/deserialized with Avro converter"; - assertEquals(valueWithSchema.schema(),record.valueSchema()); + assertEquals(valueWithSchema.schema(), record.valueSchema()); msg = "comparing value to that serialized/deserialized with Avro converter"; - assertEquals(valueWithSchema.value(),record.value()); + assertEquals(valueWithSchema.value(), record.value()); msg = "comparing value to its schema"; schemaMatchesStruct(valueWithSchema); @@ -427,65 +431,65 @@ protected static String prettyJson(JsonNode json) { return null; } } - + // The remaining methods are needed simply because of the KAFKA-3803, so our comparisons cannot rely upon Struct.equals - - protected static void assertEquals( Object o1, Object o2 ) { + + protected static void assertEquals(Object o1, Object o2) { // assertThat(o1).isEqualTo(o2); - if ( !equals(o1,o2) ) { + if (!equals(o1, o2)) { fail(SchemaUtil.asString(o1) + " was not equal to " + SchemaUtil.asString(o2)); } } - + @SuppressWarnings("unchecked") - protected static boolean equals( Object o1, Object o2 ) { - if ( o1 == o2 ) return true; + protected static boolean equals(Object o1, Object o2) { + if (o1 == o2) return true; if (o1 == null) return o2 == null ? true : false; - if (o2 == null ) return false; - if ( o1 instanceof ByteBuffer ) { - o1 = ((ByteBuffer)o1).array(); + if (o2 == null) return false; + if (o1 instanceof ByteBuffer) { + o1 = ((ByteBuffer) o1).array(); } - if ( o2 instanceof ByteBuffer ) { - o2 = ((ByteBuffer)o2).array(); + if (o2 instanceof ByteBuffer) { + o2 = ((ByteBuffer) o2).array(); } - if ( o1 instanceof byte[] && o2 instanceof byte[] ) { - boolean result = Arrays.equals((byte[])o1,(byte[])o2); + if (o1 instanceof byte[] && o2 instanceof byte[]) { + boolean result = Arrays.equals((byte[]) o1, (byte[]) o2); return result; } - if ( o1 instanceof Object[] && o2 instanceof Object[] ) { - boolean result = deepEquals((Object[])o1,(Object[])o2); + if (o1 instanceof Object[] && o2 instanceof Object[]) { + boolean result = deepEquals((Object[]) o1, (Object[]) o2); return result; } - if ( o1 instanceof Map && o2 instanceof Map ) { - Map m1 = (Map)o1; - Map m2 = (Map)o2; - if ( !m1.keySet().equals(m2.keySet())) return false; - for ( Map.Entry entry : m1.entrySet()) { + if (o1 instanceof Map && o2 instanceof Map) { + Map m1 = (Map) o1; + Map m2 = (Map) o2; + if (!m1.keySet().equals(m2.keySet())) return false; + for (Map.Entry entry : m1.entrySet()) { Object v1 = entry.getValue(); Object v2 = m2.get(entry.getKey()); - if ( !equals(v1,v2) ) return false; + if (!equals(v1, v2)) return false; } return true; } - if ( o1 instanceof Collection && o2 instanceof Collection ) { - Collection m1 = (Collection)o1; - Collection m2 = (Collection)o2; - if ( m1.size() != m2.size() ) return false; + if (o1 instanceof Collection && o2 instanceof Collection) { + Collection m1 = (Collection) o1; + Collection m2 = (Collection) o2; + if (m1.size() != m2.size()) return false; Iterator iter1 = m1.iterator(); Iterator iter2 = m2.iterator(); - while ( iter1.hasNext() && iter2.hasNext() ) { - if ( !equals(iter1.next(),iter2.next()) ) return false; + while (iter1.hasNext() && iter2.hasNext()) { + if (!equals(iter1.next(), iter2.next())) return false; } return true; } - if ( o1 instanceof Struct && o2 instanceof Struct ) { + if (o1 instanceof Struct && o2 instanceof Struct) { // Unfortunately, the Struct.equals() method has a bug in that it is not using Arrays.deepEquals(...) to // compare values in two Struct objects. The result is that the equals only works if the values of the // first level Struct are non arrays; otherwise, the array values are compared using == and that obviously // does not work for non-primitive values. Struct struct1 = (Struct) o1; Struct struct2 = (Struct) o2; - if (! Objects.equals(struct1.schema(),struct2.schema()) ) { + if (!Objects.equals(struct1.schema(), struct2.schema())) { return false; } Object[] array1 = valuesFor(struct1); @@ -495,11 +499,11 @@ protected static boolean equals( Object o1, Object o2 ) { } return Objects.equals(o1, o2); } - - private static Object[] valuesFor( Struct struct ) { + + private static Object[] valuesFor(Struct struct) { Object[] array = new Object[struct.schema().fields().size()]; int index = 0; - for ( Field field : struct.schema().fields() ) { + for (Field field : struct.schema().fields()) { array[index] = struct.get(field); ++index; } @@ -509,7 +513,7 @@ private static Object[] valuesFor( Struct struct ) { private static boolean deepEquals(Object[] a1, Object[] a2) { if (a1 == a2) return true; - if (a1 == null || a2==null) + if (a1 == null || a2 == null) return false; int length = a1.length; if (a2.length != length) @@ -537,7 +541,7 @@ private static boolean deepEquals0(Object e1, Object e2) { assert e1 != null; boolean eq; if (e1 instanceof Object[] && e2 instanceof Object[]) - eq = deepEquals ((Object[]) e1, (Object[]) e2); + eq = deepEquals((Object[]) e1, (Object[]) e2); else if (e1 instanceof byte[] && e2 instanceof byte[]) eq = Arrays.equals((byte[]) e1, (byte[]) e2); else if (e1 instanceof short[] && e2 instanceof short[]) @@ -555,7 +559,7 @@ else if (e1 instanceof double[] && e2 instanceof double[]) else if (e1 instanceof boolean[] && e2 instanceof boolean[]) eq = Arrays.equals((boolean[]) e1, (boolean[]) e2); else - eq = equals(e1,e2); + eq = equals(e1, e2); return eq; } } diff --git a/debezium-core/src/test/java/io/debezium/jdbc/TimeZoneAdapterTest.java b/debezium-core/src/test/java/io/debezium/jdbc/TimeZoneAdapterTest.java new file mode 100644 index 00000000000..6da250b7396 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/jdbc/TimeZoneAdapterTest.java @@ -0,0 +1,208 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.jdbc; + +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.Month; +import java.time.ZonedDateTime; +import java.util.Calendar; + +import org.junit.Before; +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +/** + * @author Randall Hauch + * + */ +public class TimeZoneAdapterTest { + + private TimeZoneAdapter adapter; + + @Before + public void beforeEach() { + adapter = TimeZoneAdapter.create() + .withLocalZoneForUtilDate() + .withLocalZoneForSqlDate() + .withLocalZoneForSqlTime() + .withLocalZoneForSqlTimestamp() + .withUtcTargetZone(); + } + + @Test + public void shouldAdaptSqlDate() { + // '2014-09-08', '17:51:04.777', '2014-09-08 17:51:04.777', '2014-09-08 17:51:04.777' + java.sql.Date sqlDate = createSqlDate(2014, Month.SEPTEMBER, 8); + ZonedDateTime zdt = adapter.toZonedDateTime(sqlDate); + // The date should match ... + LocalDate date = zdt.toLocalDate(); + assertThat(date.getYear()).isEqualTo(2014); + assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(date.getDayOfMonth()).isEqualTo(8); + // There should be no time component ... + LocalTime time = zdt.toLocalTime(); + assertThat(time.getHour()).isEqualTo(0); + assertThat(time.getMinute()).isEqualTo(0); + assertThat(time.getSecond()).isEqualTo(0); + assertThat(time.getNano()).isEqualTo(0); + // The zone should be our target ... + assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId()); + } + + @Test + public void shouldAdaptSqlTime() { + // '17:51:04.777' + java.sql.Time sqlTime = createSqlTime(17, 51, 04, 777); + ZonedDateTime zdt = adapter.toZonedDateTime(sqlTime); + // The date should be at epoch ... + LocalDate date = zdt.toLocalDate(); + assertThat(date.getYear()).isEqualTo(1970); + assertThat(date.getMonth()).isEqualTo(Month.JANUARY); + assertThat(date.getDayOfMonth()).isEqualTo(1); + // The time should match exactly ... + LocalTime time = zdt.toLocalTime(); + assertThat(time.getHour()).isEqualTo(17); + assertThat(time.getMinute()).isEqualTo(51); + assertThat(time.getSecond()).isEqualTo(4); + assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000); + // The zone should be our target ... + assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId()); + } + + @Test + public void shouldAdaptSqlTimestamp() { + adapter = TimeZoneAdapter.create() + .withLocalZoneForSqlTimestamp() + .withUtcTargetZone(); + + // '2014-09-08 17:51:04.777' + // This technique creates the timestamp using the milliseconds from epoch in terms of the local zone ... + java.sql.Timestamp sqlTimestamp = createSqlTimestamp(2014, Month.SEPTEMBER, 8, 17, 51, 04, 777); + ZonedDateTime zdt = adapter.toZonedDateTime(sqlTimestamp); + // The date should match ... + LocalDate date = zdt.toLocalDate(); + assertThat(date.getYear()).isEqualTo(2014); + assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(date.getDayOfMonth()).isEqualTo(8); + // The time should match exactly ... + LocalTime time = zdt.toLocalTime(); + assertThat(time.getHour()).isEqualTo(17); + assertThat(time.getMinute()).isEqualTo(51); + assertThat(time.getSecond()).isEqualTo(4); + assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000); + // The zone should be our target ... + assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId()); + } + + @Test + public void shouldAdaptSqlTimestampViaSecondsAndMillis() { + adapter = TimeZoneAdapter.create() + .withUtcZoneForSqlTimestamp() + .withUtcTargetZone(); + + // '2014-09-08 17:51:04.777' + // This technique creates the timestamp using the milliseconds from epoch in terms of UTC ... + java.sql.Timestamp sqlTimestamp = createSqlTimestamp(1410198664L, 777); + ZonedDateTime zdt = adapter.toZonedDateTime(sqlTimestamp); + // The date should match ... + LocalDate date = zdt.toLocalDate(); + assertThat(date.getYear()).isEqualTo(2014); + assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(date.getDayOfMonth()).isEqualTo(8); + // The time should match exactly ... + LocalTime time = zdt.toLocalTime(); + assertThat(time.getHour()).isEqualTo(17); + assertThat(time.getMinute()).isEqualTo(51); + assertThat(time.getSecond()).isEqualTo(4); + assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000); + // The zone should be our target ... + assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId()); + } + + @Test + public void shouldAdaptUtilDate() { + // '2014-09-08 17:51:04.777' + java.util.Date utilDate = createUtilDate(2014, Month.SEPTEMBER, 8, 17, 51, 04, 777); + ZonedDateTime zdt = adapter.toZonedDateTime(utilDate); + // The date should match ... + LocalDate date = zdt.toLocalDate(); + assertThat(date.getYear()).isEqualTo(2014); + assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(date.getDayOfMonth()).isEqualTo(8); + // The time should match exactly ... + LocalTime time = zdt.toLocalTime(); + assertThat(time.getHour()).isEqualTo(17); + assertThat(time.getMinute()).isEqualTo(51); + assertThat(time.getSecond()).isEqualTo(4); + assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000); + // The zone should be our target ... + assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId()); + } + + protected java.sql.Date createSqlDate(int year, Month month, int dayOfMonth) { + Calendar cal = Calendar.getInstance(); + cal.clear(); + cal.set(Calendar.YEAR, year); + cal.set(Calendar.MONTH, month.getValue() - 1); + cal.set(Calendar.DATE, dayOfMonth); + return new java.sql.Date(cal.getTimeInMillis()); + } + + protected java.sql.Time createSqlTime(int hourOfDay, int minute, int second, int milliseconds) { + Calendar c = Calendar.getInstance(); + c.clear(); + c.set(Calendar.HOUR_OF_DAY, hourOfDay); + c.set(Calendar.MINUTE, minute); + c.set(Calendar.SECOND, second); + c.set(Calendar.MILLISECOND, milliseconds); + return new java.sql.Time(c.getTimeInMillis()); + } + + /** + * This sets the calendar via the milliseconds past epoch, and this behaves differently than actually setting the various + * components of the calendar (see {@link #createSqlTimestamp(int, Month, int, int, int, int, int)}). This is how the + * MySQL Binary Log client library creates timestamps (v2). + * + * @param secondsFromEpoch the number of seconds since epoch + * @param millis the number of milliseconds + * @return the SQL timestamp + */ + protected java.sql.Timestamp createSqlTimestamp(long secondsFromEpoch, int millis) { + Calendar c = Calendar.getInstance(); + c.setTimeInMillis(secondsFromEpoch * 1000); + c.set(Calendar.MILLISECOND, millis); + return new java.sql.Timestamp(c.getTimeInMillis()); + } + + protected java.sql.Timestamp createSqlTimestamp(int year, Month month, int dayOfMonth, int hourOfDay, int minute, int second, + int milliseconds) { + Calendar c = Calendar.getInstance(); + c.set(Calendar.YEAR, year); + c.set(Calendar.MONTH, month.getValue() - 1); + c.set(Calendar.DAY_OF_MONTH, dayOfMonth); + c.set(Calendar.HOUR_OF_DAY, hourOfDay); + c.set(Calendar.MINUTE, minute); + c.set(Calendar.SECOND, second); + c.set(Calendar.MILLISECOND, milliseconds); + return new java.sql.Timestamp(c.getTimeInMillis()); + } + + protected java.util.Date createUtilDate(int year, Month month, int dayOfMonth, int hourOfDay, int minute, int second, + int milliseconds) { + Calendar c = Calendar.getInstance(); + c.set(Calendar.YEAR, year); + c.set(Calendar.MONTH, month.getValue() - 1); + c.set(Calendar.DAY_OF_MONTH, dayOfMonth); + c.set(Calendar.HOUR_OF_DAY, hourOfDay); + c.set(Calendar.MINUTE, minute); + c.set(Calendar.SECOND, second); + c.set(Calendar.MILLISECOND, milliseconds); + return c.getTime(); + } + +} diff --git a/pom.xml b/pom.xml index 057df0a1fdd..f3a2ef07210 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ 9.4 5.7 5.1.39 - 0.3.1 + 0.3.2 4.12