Skip to content

Commit

Permalink
[HUDI-3478][HUDI-4887] Use Avro as the format of persisted cdc data
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Sep 21, 2022
1 parent 61ed292 commit 3ac0808
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,11 @@ public HoodieCDCLogger(
this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));

if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING;
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING;
} else {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING;
}
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
);
this.cdcSchemaString = this.cdcSchema.toString();

this.cdcData = new ExternalSpillableMap<>(
maxInMemorySizeInBytes,
Expand Down Expand Up @@ -158,18 +153,21 @@ private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
GenericRecord newRecord) {
GenericData.Record record;
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime,
removeCommitMetadata(oldRecord), newRecord);
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey,
removeCommitMetadata(oldRecord));
} else {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey);
}
return record;
}

private GenericRecord removeCommitMetadata(GenericRecord record) {
if (record == null) {
return null;
}
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
}

Expand Down Expand Up @@ -221,18 +219,6 @@ public void close() {
}
}

public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
long recordsWritten,
long insertRecordsWritten) {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
return cdcLogger.writeCDCData();
}

public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,16 @@ protected void writeIncomingRecords() throws IOException {
}
}

private Option<AppendResult> writeCDCDataIfNeeded() {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
return cdcLogger.writeCDCData();
}

@Override
public List<WriteStatus> close() {
try {
Expand All @@ -445,8 +455,7 @@ public List<WriteStatus> close() {
}

// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult =
HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public static Schema createNullableSchema(Schema.Type avroType) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType));
}

public static Schema createNullableSchema(Schema schema) {
checkState(schema.getType() != Schema.Type.NULL);
return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
}

/**
* Returns true in case when schema contains the field w/ provided name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

package org.apache.hudi.common.table.cdc;

import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieException;

import java.util.Arrays;
import java.util.List;

public class HoodieCDCUtils {

public static final String CDC_LOGFILE_SUFFIX = "-cdc";
Expand All @@ -50,33 +56,6 @@ public class HoodieCDCUtils {
CDC_AFTER_IMAGE
};

/**
* This is the standard CDC output format.
* Also, this is the schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
*/
public static final String CDC_SCHEMA_STRING = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
+ "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
+ "]}";

public static final Schema CDC_SCHEMA = new Schema.Parser().parse(CDC_SCHEMA_STRING);

/**
* The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
*/
public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
+ "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
+ "]}";

public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);

/**
* The schema of cdc log file in the case `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
*/
Expand All @@ -89,32 +68,50 @@ public class HoodieCDCUtils {
public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);

public static final Schema schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode supplementalLoggingMode) {
switch (supplementalLoggingMode) {
case WITH_BEFORE_AFTER:
return CDC_SCHEMA;
case WITH_BEFORE:
return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
case OP_KEY:
return CDC_SCHEMA_OP_AND_RECORDKEY;
default:
throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
public static Schema schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
Schema tableSchema) {
if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
return CDC_SCHEMA_OP_AND_RECORDKEY;
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
return createCDCSchema(tableSchema, false);
} else if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
return createCDCSchema(tableSchema, true);
} else {
throw new HoodieException("not support this supplemental logging mode: " + supplementalLoggingMode);
}
}

private static Schema createCDCSchema(Schema tableSchema, boolean withAfterImage) {
Schema imageSchema = AvroSchemaUtils.createNullableSchema(tableSchema);
Schema.Field opField = new Schema.Field(CDC_OPERATION_TYPE,
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
Schema.Field beforeField = new Schema.Field(
CDC_BEFORE_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
List<Schema.Field> fields;
if (withAfterImage) {
Schema.Field tsField = new Schema.Field(CDC_COMMIT_TIMESTAMP,
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
Schema.Field afterField = new Schema.Field(
CDC_AFTER_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
fields = Arrays.asList(opField, tsField, beforeField, afterField);
} else {
Schema.Field keyField = new Schema.Field(CDC_RECORD_KEY,
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE);
fields = Arrays.asList(opField, keyField, beforeField);
}

Schema mergedSchema = Schema.createRecord("CDC", null, tableSchema.getNamespace(), false);
mergedSchema.setFields(fields);
return mergedSchema;
}

/**
* Build the cdc record which has all the cdc fields when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
*/
public static GenericData.Record cdcRecord(
String op, String commitTime, GenericRecord before, GenericRecord after) {
String beforeJsonStr = recordToJson(before);
String afterJsonStr = recordToJson(after);
return cdcRecord(op, commitTime, beforeJsonStr, afterJsonStr);
}

public static GenericData.Record cdcRecord(
String op, String commitTime, String before, String after) {
GenericData.Record record = new GenericData.Record(CDC_SCHEMA);
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String commitTime,
GenericRecord before, GenericRecord after) {
GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_COMMIT_TIMESTAMP, commitTime);
record.put(CDC_BEFORE_IMAGE, before);
Expand All @@ -125,20 +122,20 @@ public static GenericData.Record cdcRecord(
/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
*/
public static GenericData.Record cdcRecord(String op, String recordKey, GenericRecord before) {
GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_RECORDKEY_BEFORE);
public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
String recordKey, GenericRecord before) {
GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_RECORD_KEY, recordKey);
String beforeJsonStr = recordToJson(before);
record.put(CDC_BEFORE_IMAGE, beforeJsonStr);
record.put(CDC_BEFORE_IMAGE, before);
return record;
}

/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
*/
public static GenericData.Record cdcRecord(String op, String recordKey) {
GenericData.Record record = new GenericData.Record(CDC_SCHEMA_OP_AND_RECORDKEY);
public static GenericData.Record cdcRecord(Schema cdcSchema, String op, String recordKey) {
GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_RECORD_KEY, recordKey);
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

Expand All @@ -37,6 +38,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
Expand Down Expand Up @@ -561,46 +563,79 @@ public void testCDCBlock() throws IOException, InterruptedException {
.withFs(fs)
.build();

GenericRecord record1 = HoodieCDCUtils.cdcRecord("i", "100",
null, "{\"uuid\": 1, \"name\": \"apple\"}, \"ts\": 1100}");
GenericRecord record2 = HoodieCDCUtils.cdcRecord("u", "100",
"{\"uuid\": 2, \"name\": \"banana\"}, \"ts\": 1000}",
"{\"uuid\": 2, \"name\": \"blueberry\"}, \"ts\": 1100}");
GenericRecord record3 = HoodieCDCUtils.cdcRecord("d", "100",
"{\"uuid\": 3, \"name\": \"cherry\"}, \"ts\": 1000}", null);
String dataSchameString = "{\"type\":\"record\",\"name\":\"Record\","
+ "\"fields\":["
+ "{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]},"
+ "{\"name\":\"name\",\"type\":[\"string\",\"null\"]},"
+ "{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}"
+ "]}";
Schema dataSchema = new Schema.Parser().parse(dataSchameString);
Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
GenericRecord insertedRecord = new GenericData.Record(dataSchema);
insertedRecord.put("uuid", 1);
insertedRecord.put("name", "apple");
insertedRecord.put("ts", 1100L);

GenericRecord updateBeforeImageRecord = new GenericData.Record(dataSchema);
updateBeforeImageRecord.put("uuid", 2);
updateBeforeImageRecord.put("name", "banana");
updateBeforeImageRecord.put("ts", 1000L);
GenericRecord updateAfterImageRecord = new GenericData.Record(dataSchema);
updateAfterImageRecord.put("uuid", 2);
updateAfterImageRecord.put("name", "blueberry");
updateAfterImageRecord.put("ts", 1100L);

GenericRecord deletedRecord = new GenericData.Record(dataSchema);
deletedRecord.put("uuid", 3);
deletedRecord.put("name", "cherry");
deletedRecord.put("ts", 1000L);

GenericRecord record1 = HoodieCDCUtils.cdcRecord(cdcSchema, "i", "100",
null, insertedRecord);
GenericRecord record2 = HoodieCDCUtils.cdcRecord(cdcSchema, "u", "100",
updateBeforeImageRecord, updateAfterImageRecord);
GenericRecord record3 = HoodieCDCUtils.cdcRecord(cdcSchema, "d", "100",
deletedRecord, null);
List<IndexedRecord> records = new ArrayList<>(Arrays.asList(record1, record2, record3));
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, HoodieCDCUtils.CDC_SCHEMA_STRING);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());
HoodieDataBlock dataBlock = getDataBlock(HoodieLogBlockType.CDC_DATA_BLOCK, records, header);
writer.appendBlock(dataBlock);
writer.close();

Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), HoodieCDCUtils.CDC_SCHEMA);
Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), cdcSchema);
assertTrue(reader.hasNext());
HoodieLogBlock block = reader.next();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) block;
List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(3, recordsRead.size(),
"Read records size should be equal to the written records size");
assertEquals(dataBlockRead.getSchema(), HoodieCDCUtils.CDC_SCHEMA);
assertEquals(dataBlockRead.getSchema(), cdcSchema);

GenericRecord insert = (GenericRecord) recordsRead.stream()
.filter(record -> record.get(0).toString().equals("i")).findFirst().get();
assertNull(insert.get("before"));
assertNotNull(insert.get("after"));
assertEquals(((GenericRecord) insert.get("after")).get("name").toString(), "apple");

GenericRecord update = (GenericRecord) recordsRead.stream()
.filter(record -> record.get(0).toString().equals("u")).findFirst().get();
assertNotNull(update.get("before"));
assertNotNull(update.get("after"));
assertTrue(update.get("before").toString().contains("banana"));
assertTrue(update.get("after").toString().contains("blueberry"));
GenericRecord uBefore = (GenericRecord) update.get("before");
GenericRecord uAfter = (GenericRecord) update.get("after");
assertEquals(String.valueOf(uBefore.get("name")), "banana");
assertEquals(Long.valueOf(uBefore.get("ts").toString()), 1000L);
assertEquals(String.valueOf(uAfter.get("name")), "blueberry");
assertEquals(Long.valueOf(uAfter.get("ts").toString()), 1100L);

GenericRecord delete = (GenericRecord) recordsRead.stream()
.filter(record -> record.get(0).toString().equals("d")).findFirst().get();
assertNotNull(delete.get("before"));
assertNull(delete.get("after"));
assertEquals(((GenericRecord) delete.get("before")).get("name").toString(), "cherry");

reader.close();
}
Expand Down
Loading

0 comments on commit 3ac0808

Please sign in to comment.