Skip to content

Commit

Permalink
[HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using…
Browse files Browse the repository at this point in the history
… MOR snapshot query after delete operations with test (apache#6688)

Co-authored-by: Rahil Chertara <rchertar@amazon.com>
(cherry picked from commit 21cbfce)
  • Loading branch information
rahil-c authored and neverdizzy committed Dec 1, 2022
1 parent 44df5d4 commit 2f0acae
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
}

public AWSDmsAvroPayload(Option<GenericRecord> record) {
this(record.get(), 0); // natural order
this(record.isPresent() ? record.get() : null, 0); // natural order
}

/**
Expand Down Expand Up @@ -87,7 +87,10 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue);
Option<IndexedRecord> insertValue = super.getInsertValue(schema);
if (!insertValue.isPresent()) {
return Option.empty();
}
return handleDeleteOperation(insertValue.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,27 @@ public void testDelete() {

}

@Test
public void testDeleteWithEmptyPayLoad() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
Properties properties = new Properties();

GenericRecord oldRecord = new GenericData.Record(avroSchema);
oldRecord.put("field1", 2);
oldRecord.put("Op", "U");

AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty());

try {
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
// expect nothing to be committed to table
assertFalse(outputPayload.isPresent());
} catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
}

@Test
public void testPreCombineWithDelete() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
Expand Down

0 comments on commit 2f0acae

Please sign in to comment.