From 20c1e7f1a017db7f78d3335331558775128bbaae Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Thu, 17 Jun 2021 09:00:14 +1000 Subject: [PATCH] fix: fixes npe in timestamp encoding (#12) * fix: fix npe in timestamp encoding Handle the case when the timestamp written / read is null. * refactor: remove read/write null from ts encoding Removes the calls to read/write null from timestamp encoding. --- .../io/gcp/spanner/cdc/TimestampEncoding.java | 23 ++++++++++++++----- .../spanner/cdc/TimestampEncodingTest.java | 16 +++++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncoding.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncoding.java index e6eed28ca4d8a..16d6ce96ef6ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncoding.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncoding.java @@ -36,6 +36,7 @@ public class TimestampEncoding extends CustomEncoding { .builder() .record("timestamp") .fields() + .requiredBoolean("isNull") .requiredLong("seconds") .requiredInt("nanos") .endRecord(); @@ -45,15 +46,25 @@ public class TimestampEncoding extends CustomEncoding { @Override protected void write(Object datum, Encoder out) throws IOException { final Timestamp timestamp = (Timestamp) datum; - out.writeLong(timestamp.getSeconds()); - out.writeInt(timestamp.getNanos()); + if (timestamp == null) { + out.writeBoolean(true); + } else { + out.writeBoolean(false); + out.writeLong(timestamp.getSeconds()); + out.writeInt(timestamp.getNanos()); + } } @Override protected Timestamp read(Object reuse, Decoder in) throws IOException { - return Timestamp.ofTimeSecondsAndNanos( - in.readLong(), - in.readInt() - ); + final boolean isNull = in.readBoolean(); + if (isNull) { + return null; + } else { + return Timestamp.ofTimeSecondsAndNanos( + in.readLong(), + in.readInt() + ); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncodingTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncodingTest.java index ffbe5022602ef..9c6f445d2a459 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncodingTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/TimestampEncodingTest.java @@ -1,6 +1,7 @@ package org.apache.beam.sdk.io.gcp.spanner.cdc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import com.google.cloud.Timestamp; import java.io.ByteArrayInputStream; @@ -57,6 +58,21 @@ public void testWriteAndReadReuseTimestamp() throws IOException { assertEquals(expectedTimestamp, actualTimestamp); } + @Test + public void testWriteAndReadNullTimestamp() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(outputStream, null); + + encoding.write(null, encoder); + + final byte[] bytes = outputStream.toByteArray(); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + final Timestamp actualTimestamp = encoding.read(null, decoder); + + assertNull(actualTimestamp); + } + @Test(expected = ClassCastException.class) public void testThrowsExceptionWhenWritingNonTimestamp() throws IOException { encoding.write(1L, null);