Skip to content

Commit

Permalink
[HUDI-4831] Fix AWSDmsAvroPayload#getInsertValue,combineAndGetUpdateV…
Browse files Browse the repository at this point in the history
…alue to invoke correct api (apache#6637)

Co-authored-by: Rahil Chertara <rchertar@amazon.com>
(cherry picked from commit 8dea9cf)
  • Loading branch information
rahil-c authored and neverdizzy committed Dec 1, 2022
1 parent c0c1b6c commit 647d881
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) t

@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
return handleDeleteOperation(insertValue);
return getInsertValue(schema);
}

@Override
Expand All @@ -82,8 +81,7 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties)
throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema, properties).get();
return handleDeleteOperation(insertValue);
return combineAndGetUpdateValue(currentValue, schema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -42,13 +44,14 @@ public void testInsert() {

Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
GenericRecord record = new GenericData.Record(avroSchema);
Properties properties = new Properties();
record.put("field1", 0);
record.put("Op", "I");

AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record));

try {
Option<IndexedRecord> outputPayload = payload.getInsertValue(avroSchema);
Option<IndexedRecord> outputPayload = payload.getInsertValue(avroSchema, properties);
assertTrue((int) outputPayload.get().get(0) == 0);
assertTrue(outputPayload.get().get(1).toString().equals("I"));
} catch (Exception e) {
Expand All @@ -61,6 +64,7 @@ public void testInsert() {
public void testUpdate() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
GenericRecord newRecord = new GenericData.Record(avroSchema);
Properties properties = new Properties();
newRecord.put("field1", 1);
newRecord.put("Op", "U");

Expand All @@ -71,7 +75,7 @@ public void testUpdate() {
AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord));

try {
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema);
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
assertTrue((int) outputPayload.get().get(0) == 1);
assertTrue(outputPayload.get().get(1).toString().equals("U"));
} catch (Exception e) {
Expand All @@ -84,6 +88,7 @@ public void testUpdate() {
public void testDelete() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
GenericRecord deleteRecord = new GenericData.Record(avroSchema);
Properties properties = new Properties();
deleteRecord.put("field1", 2);
deleteRecord.put("Op", "D");

Expand All @@ -94,7 +99,7 @@ public void testDelete() {
AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord));

try {
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema);
Option<IndexedRecord> outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
// expect nothing to be committed to table
assertFalse(outputPayload.isPresent());
} catch (Exception e) {
Expand All @@ -107,6 +112,7 @@ public void testDelete() {
public void testPreCombineWithDelete() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
GenericRecord deleteRecord = new GenericData.Record(avroSchema);
Properties properties = new Properties();
deleteRecord.put("field1", 4);
deleteRecord.put("Op", "D");

Expand All @@ -119,7 +125,7 @@ public void testPreCombineWithDelete() {

try {
OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload);
Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema);
Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema, properties);
// expect nothing to be committed to table
assertFalse(outputPayload.isPresent());
} catch (Exception e) {
Expand Down

0 comments on commit 647d881

Please sign in to comment.