diff --git a/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java b/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java index 76e06a0..8c87252 100644 --- a/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java +++ b/src/main/java/oryanmoshe/kafka/connect/util/TimestampConverter.java @@ -36,7 +36,7 @@ public class TimestampConverter implements CustomConverter s.equalsIgnoreCase(column.typeName()))) { boolean isTime = "time".equalsIgnoreCase(column.typeName()); - registration.register(datetimeSchema, rawValue -> { + // Use a new SchemaBuilder every time in order to avoid changing "Already set" options + // in the schema builder between tables. + registration.register(SchemaBuilder.string().optional(), rawValue -> { if (rawValue == null) { + // DEBUG + if (this.debug) { + System.out.printf("[TimestampConverter.converterFor] rawValue of %s is null.%n", column.name()); + } + if (column.isOptional()) { return null; } diff --git a/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java b/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java index 6ca02ad..5a01f37 100644 --- a/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java +++ b/src/test/java/oryanmoshe/kafka/connect/util/TimestampConverterTests.java @@ -139,7 +139,7 @@ public String typeName() { @Override public String name() { - return null; + return "datecolumn"; } @Override @@ -199,7 +199,7 @@ public String typeName() { @Override public String name() { - return null; + return "timecolumn"; } @Override @@ -259,7 +259,7 @@ public String typeName() { @Override public String name() { - return null; + return "datetimecolumn"; } @Override @@ -319,7 +319,7 @@ public String typeName() { @Override public String name() { - return null; + return "datetime2column"; } @Override @@ -379,7 +379,7 @@ public String typeName() { @Override public String name() { - return null; + return "timestampcolumn"; } @Override