Skip to content

Commit

Permalink
Fixing NPE with row writer path and with OCC
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Jun 13, 2022
1 parent 264b15d commit fe1e2fe
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
* @param writeOperationType
* @param metaClient
*/
protected void preWrite(String instantTime, WriteOperationType writeOperationType,
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
}

@Override
protected void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
// Note: the code to read the commit metadata is not thread safe for JSON deserialization,
// remove the table metadata sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writ
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.metaClient.validateTableProperties(writeConfig.getProps());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
writeClient.preWrite(instantTime, WriteOperationType.BULK_INSERT, metaClient);
}

public boolean useCommitCoordinator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,21 @@ class TestHoodieSparkSqlWriter {
* @param sortMode Bulk insert sort mode
* @param populateMetaFields Flag for populating meta fields
*/
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true): Unit = {
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, populateMetaFields: Boolean = true, enableOCCConfigs: Boolean = false): Unit = {
//create a new table
val fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
var fooTableModifier = commonTableModifier.updated("hoodie.bulkinsert.shuffle.parallelism", "4")
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
.updated(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields))
.updated(HoodieWriteConfig.BULK_INSERT_SORT_MODE.key(), sortMode.name())

if (enableOCCConfigs) {
fooTableModifier = fooTableModifier
.updated("hoodie.write.concurrency.mode","optimistic_concurrency_control")
.updated("hoodie.cleaner.policy.failed.writes","LAZY")
.updated("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.InProcessLockProvider")
}

// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
Expand Down Expand Up @@ -306,6 +313,11 @@ class TestHoodieSparkSqlWriter {
testBulkInsertWithSortMode(sortMode, populateMetaFields = true)
}

@Test
def testBulkInsertForSortModeWithOCC(): Unit = {
testBulkInsertWithSortMode(BulkInsertSortMode.GLOBAL_SORT, populateMetaFields = true, true)
}

/**
* Test case for Bulk insert with populating meta fields or
* without populating meta fields.
Expand Down

0 comments on commit fe1e2fe

Please sign in to comment.