Skip to content

Commit

Permalink
Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using MOR snapsho…
Browse files Browse the repository at this point in the history
…t query after delete operations with test
  • Loading branch information
Rahil Chertara committed Sep 16, 2022
1 parent a3921a8 commit f31dbd8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
}

public AWSDmsAvroPayload(Option<GenericRecord> record) {
this(record.get(), 0); // natural order
this(record.isPresent() ? record.get() : null, 0); // natural order
}

/**
Expand Down Expand Up @@ -87,7 +87,10 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue);
Option<IndexedRecord> insertValue = super.getInsertValue(schema);
if (!insertValue.isPresent()) {
return Option.empty();
}
return handleDeleteOperation(insertValue.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,27 @@ public void testDelete() {

}

@Test
public void testDeleteWithEmptyPayLoad() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
Properties properties = new Properties();

GenericRecord oldRecord = new GenericData.Record(avroSchema);
oldRecord.put("field1", 2);
oldRecord.put("Op", "U");

AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty());

try {
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
// expect nothing to be committed to table
//assertFalse(outputPayload.isPresent());
} catch (Exception e) {
e.printStackTrace();
fail("Unexpected exception");
}
}

@Test
public void testPreCombineWithDelete() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
Expand Down

0 comments on commit f31dbd8

Please sign in to comment.