From 2f0acae450e368f1c9ac1f34198f86a3cf94006b Mon Sep 17 00:00:00 2001 From: Rahil C <32500120+rahil-c@users.noreply.github.com> Date: Fri, 16 Sep 2022 18:47:29 -0700 Subject: [PATCH] [HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using MOR snapshot query after delete operations with test (#6688) Co-authored-by: Rahil Chertara (cherry picked from commit 21cbfce617689a1eff4d405bf5b19639d16e1c68) --- .../hudi/common/model/AWSDmsAvroPayload.java | 9 +++++--- .../common/model/TestAWSDmsAvroPayload.java | 21 +++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java index 7153ea069d8d9..fe044e0b431f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java @@ -49,7 +49,7 @@ public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) { } public AWSDmsAvroPayload(Option record) { - this(record.get(), 0); // natural order + this(record.isPresent() ? record.get() : null, 0); // natural order } /** @@ -87,7 +87,10 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { - IndexedRecord insertValue = super.getInsertValue(schema).get(); - return handleDeleteOperation(insertValue); + Option insertValue = super.getInsertValue(schema); + if (!insertValue.isPresent()) { + return Option.empty(); + } + return handleDeleteOperation(insertValue.get()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java index a60f4ff6a763b..07bc1d6f43e1f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java @@ -108,6 +108,27 @@ public void testDelete() { } + @Test + public void testDeleteWithEmptyPayLoad() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + Properties properties = new Properties(); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 2); + oldRecord.put("Op", "U"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty()); + + try { + Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties); + // expect nothing to be committed to table + assertFalse(outputPayload.isPresent()); + } catch (Exception e) { + e.printStackTrace(); + fail("Unexpected exception"); + } + } + @Test public void testPreCombineWithDelete() { Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);