From 8f03fb165023a7c80b7516263161c54f477d6b9e Mon Sep 17 00:00:00 2001 From: Daniel Osvath Date: Mon, 22 Mar 2021 19:48:14 -0400 Subject: [PATCH] KAFKA-12522: Cast SMT should allow null value records to pass through --- .../apache/kafka/connect/transforms/Cast.java | 4 +++ .../kafka/connect/transforms/CastTest.java | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java index 0a763cc24f85b..cf31a0082ed1f 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java @@ -120,6 +120,10 @@ public void configure(Map props) { @Override public R apply(R record) { + if (operatingValue(record) == null) { + return record; + } + if (operatingSchema(record) == null) { return applySchemaless(record); } else { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java index 764b904ea3e24..60744b275e42f 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java @@ -89,6 +89,42 @@ public void testConfigMixWholeAndFieldTransformation() { assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"))); } + @Test + public void castNullValueRecordWithSchema() { + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null); + SourceRecord transformed = xformValue.apply(original); + assertEquals(original, transformed); + } + + @Test + public void castNullValueRecordSchemaless() { + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + Schema.STRING_SCHEMA, "key", null, null); + SourceRecord transformed = xformValue.apply(original); + assertEquals(original, transformed); + } + + @Test + public void castNullKeyRecordWithSchema() { + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value"); + SourceRecord transformed = xformKey.apply(original); + assertEquals(original, transformed); + } + + @Test + public void castNullKeyRecordSchemaless() { + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + null, null, Schema.STRING_SCHEMA, "value"); + SourceRecord transformed = xformKey.apply(original); + assertEquals(original, transformed); + } + @Test public void castWholeRecordKeyWithSchema() { xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));