Skip to content

Commit

Permalink
DBZ-85 Added test case and made correction to temporal values
Browse files Browse the repository at this point in the history
Added an integration test case to diagnose the loss of the fractional seconds from MySQL temporal values. The problem appears to be a bug in the MySQL Binary Log Connector library that we used, and this bug was reported as shyiko/mysql-binlog-connector-java#103. That was fixed in version 0.3.2 of the library, which Stanley was kind enough to release for us.

During testing, though, several issues were discovered in how temporal values are handled and converted from the MySQL events, through the MySQL Binary Log client library, and through the Debezium MySQL connector to conform with Kafka Connect's various temporal logical schema types. Most of the issues involved converting most of the temporal values from local time zone (which is how they are created by the MySQL Binary Log client) into UTC (which is how Kafka Connect expects them). Really, java.util.Date doesn't have time zone information and instead tracks the number of milliseconds past epoch, but the conversion of normal timestamp information to the milliseconds past epoch in UTC depends on the time zone in which that conversion happens.
  • Loading branch information
rhauch committed Jul 25, 2016
1 parent 31c4e26 commit b4b20d1
Show file tree
Hide file tree
Showing 9 changed files with 743 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import java.util.Set;
import java.util.concurrent.Callable;

import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TimeZoneAdapter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
Expand Down Expand Up @@ -68,6 +74,18 @@ public class MySqlSchema {

/**
* Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}.
* <p>
* This component sets up a {@link TimeZoneAdapter} that is specific to how the MySQL Binary Log client library
* works. The {@link AbstractRowsEventDataDeserializer} class has various methods to instantiate the
* {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and {@link java.sql.Timestamp} temporal values,
* where the values for {@link java.util.Date}, {@link java.sql.Date}, and {@link java.sql.Time} are all in terms of
* the <em>local time zone</em> (since it uses {@link java.util.Calendar#getInstance()}), but where the
* {@link java.sql.Timestamp} values are created differently using the milliseconds past epoch and therefore in terms of
* the <em>UTC time zone</em>.
* <p>
* And, because Kafka Connect {@link Time}, {@link Date}, and {@link Timestamp} logical
* schema types all expect the {@link java.util.Date} to be in terms of the <em>UTC time zone</em>, the
* {@link TimeZoneAdapter} also needs to produce {@link java.util.Date} values that will be correct in UTC.
*
* @param config the connector configuration, which is presumed to be valid
* @param serverName the name of the server
Expand All @@ -78,10 +96,18 @@ public MySqlSchema(Configuration config, String serverName) {
this.tables = new Tables();
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
this.schemaBuilder = new TableSchemaBuilder();
if ( serverName != null ) serverName = serverName.trim();

// Specific to how the MySQL Binary Log client library creates temporal values ...
TimeZoneAdapter tzAdapter = TimeZoneAdapter.create()
.withLocalZoneForUtilDate()
.withLocalZoneForSqlDate()
.withLocalZoneForSqlTime()
.withUtcZoneForSqlTimestamp()
.withUtcTargetZone();
this.schemaBuilder = new TableSchemaBuilder(tzAdapter);
if (serverName != null) serverName = serverName.trim();
this.serverName = serverName;
if ( this.serverName == null || serverName.isEmpty() ) {
if (this.serverName == null || serverName.isEmpty()) {
this.schemaPrefix = "";
} else {
this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + ".";
Expand All @@ -95,7 +121,7 @@ public MySqlSchema(Configuration config, String serverName) {
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false);
this.dbHistory.configure(dbHistoryConfig,HISTORY_COMPARATOR); // validates
this.dbHistory.configure(dbHistoryConfig, HISTORY_COMPARATOR); // validates
}

/**
Expand Down Expand Up @@ -210,8 +236,8 @@ protected void changeTablesAndRecordInHistory(SourceInfo source, Callable<Void>
changeFunction.call();
} catch (Exception e) {
this.tables = copy;
if ( e instanceof SQLException) throw (SQLException)e;
this.logger.error("Unexpected error whle changing model of MySQL schemas: {}",e.getMessage(),e);
if (e instanceof SQLException) throw (SQLException) e;
this.logger.error("Unexpected error whle changing model of MySQL schemas: {}", e.getMessage(), e);
}

// At least one table has changed or was removed, so first refresh the Kafka Connect schemas ...
Expand Down Expand Up @@ -307,12 +333,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// to the same _affected_ database...
ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> {
if (filters.databaseFilter().test(dbName)) {
if ( dbName == null ) dbName = "";
if (dbName == null) dbName = "";
statementConsumer.consume(dbName, ddlStatements);
}
});
} else if (filters.databaseFilter().test(databaseName)) {
if ( databaseName == null ) databaseName = "";
if (databaseName == null) databaseName = "";
statementConsumer.consume(databaseName, ddlStatements);
}
}
Expand Down
9 changes: 9 additions & 0 deletions debezium-connector-mysql/src/test/docker/init/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,12 @@ CREATE TABLE dbz84_integer_types_table (
);
INSERT INTO dbz84_integer_types_table
VALUES(127,-128,128,255, default,201,202,203, default,301,302,303, default,401,402,403, default,501,502,503);

-- DBZ-85 handle fractional part of seconds
CREATE TABLE dbz_85_fractest (
c1 DATE,
c2 TIME(2),
c3 DATETIME(2),
c4 TIMESTAMP(2)
);
INSERT INTO dbz_85_fractest VALUES ('2014-09-08', '17:51:04.777', '2014-09-08 17:51:04.777', '2014-09-08 17:51:04.777');
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@

import java.nio.file.Path;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Month;
import java.time.ZoneId;

import org.apache.kafka.connect.data.Struct;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -16,6 +22,7 @@

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
Expand Down Expand Up @@ -70,21 +77,74 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(3 + 2); // 3 schema change record, 2 inserts
SourceRecords records = consumeRecordsByTopic(4 + 3); // 4 schema change record, 3 inserts
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(4);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(4);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(3);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(4);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);

// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
records.forEach(record->{
Struct value = (Struct)record.value();
if ( record.topic().endsWith("dbz_85_fractest")) {
// The microseconds of all three should be exactly 780
Struct after = value.getStruct(Envelope.FieldName.AFTER);
java.util.Date c1 = (java.util.Date)after.get("c1");
java.util.Date c2 = (java.util.Date)after.get("c2");
java.util.Date c3 = (java.util.Date)after.get("c3");
java.util.Date c4 = (java.util.Date)after.get("c4");
Testing.debug("c1 = " + c1.getTime());
Testing.debug("c2 = " + c2.getTime());
Testing.debug("c3 = " + c3.getTime());
Testing.debug("c4 = " + c4.getTime());
assertThat(c1.getTime() % 1000).isEqualTo(0); // date only, no time
assertThat(c2.getTime() % 1000).isEqualTo(780);
assertThat(c3.getTime() % 1000).isEqualTo(780);
assertThat(c4.getTime() % 1000).isEqualTo(780);
assertThat(c1.getTime()).isEqualTo(1410134400000L);
assertThat(c2.getTime()).isEqualTo(64264780L);
assertThat(c3.getTime()).isEqualTo(1410198664780L);
assertThat(c4.getTime()).isEqualTo(1410198664780L);
// None of these Dates have timezone information, so to convert to locals we have to use our local timezone ...
ZoneId utc = ZoneId.of("UTC");
LocalDate localC1 = c1.toInstant().atZone(utc).toLocalDate();
LocalTime localC2 = c2.toInstant().atZone(utc).toLocalTime();
LocalDateTime localC3 = c3.toInstant().atZone(utc).toLocalDateTime();
LocalDateTime localC4 = c4.toInstant().atZone(utc).toLocalDateTime();
// row is ('2014-09-08', '17:51:04.78', '2014-09-08 17:51:04.78', '2014-09-08 17:51:04.78')
final int expectedNanos = 780 * 1000 * 1000;
assertThat(localC1.getYear()).isEqualTo(2014);
assertThat(localC1.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(localC1.getDayOfMonth()).isEqualTo(8);
assertThat(localC2.getHour()).isEqualTo(17);
assertThat(localC2.getMinute()).isEqualTo(51);
assertThat(localC2.getSecond()).isEqualTo(4);
assertThat(localC2.getNano()).isEqualTo(expectedNanos);
assertThat(localC3.getYear()).isEqualTo(2014);
assertThat(localC3.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(localC3.getDayOfMonth()).isEqualTo(8);
assertThat(localC3.getHour()).isEqualTo(17);
assertThat(localC3.getMinute()).isEqualTo(51);
assertThat(localC3.getSecond()).isEqualTo(4);
assertThat(localC3.getNano()).isEqualTo(expectedNanos);
assertThat(localC4.getYear()).isEqualTo(2014);
assertThat(localC4.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(localC4.getDayOfMonth()).isEqualTo(8);
assertThat(localC4.getHour()).isEqualTo(17);
assertThat(localC4.getMinute()).isEqualTo(51);
assertThat(localC4.getSecond()).isEqualTo(4);
assertThat(localC4.getNano()).isEqualTo(expectedNanos);
}
});
}

}
23 changes: 21 additions & 2 deletions debezium-core/src/main/java/io/debezium/data/SchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package io.debezium.data;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.Base64;
import java.util.List;
import java.util.Map;
Expand All @@ -22,7 +25,7 @@
* @author Randall Hauch
*/
public class SchemaUtil {

private SchemaUtil() {
}

Expand Down Expand Up @@ -228,8 +231,24 @@ public RecordWriter append(Object obj) {
}
appendAdditional("value", record.value());
sb.append('}');
} else if ( obj instanceof java.sql.Time ){
java.sql.Time time = (java.sql.Time)obj;
append(DateTimeFormatter.ISO_LOCAL_TIME.format(time.toLocalTime()));
} else if ( obj instanceof java.sql.Date ){
java.sql.Date date = (java.sql.Date)obj;
append(DateTimeFormatter.ISO_DATE.format(date.toLocalDate()));
} else if ( obj instanceof java.sql.Timestamp ){
java.sql.Timestamp ts = (java.sql.Timestamp)obj;
Instant instant = ts.toInstant();
append(DateTimeFormatter.ISO_INSTANT.format(instant));
} else if ( obj instanceof java.util.Date ){
java.util.Date date = (java.util.Date)obj;
append(DateTimeFormatter.ISO_INSTANT.format(date.toInstant()));
} else if ( obj instanceof TemporalAccessor ){
TemporalAccessor temporal = (TemporalAccessor)obj;
append(DateTimeFormatter.ISO_INSTANT.format(temporal));
} else {
sb.append(obj.toString());
append(obj.toString());
}
return this;
}
Expand Down
Loading

0 comments on commit b4b20d1

Please sign in to comment.