Skip to content

Commit

Permalink
[HUDI-4572] Fix 'Not a valid schema field: ts' error in HoodieFlinkCo…
Browse files Browse the repository at this point in the history
…mpactor if precombine field is not ts (#6331)

Co-authored-by: jian.feng <jian.feng@shopee.com>
  • Loading branch information
2 people authored and codope committed Aug 9, 2022
1 parent 449ba61 commit 528aaf7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, Str
// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);

CompactionUtil.setPreCombineField(conf, metaClient);

// infer changelog mode
CompactionUtil.inferChangelogMode(conf, metaClient);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaC
writeConfig.setSchema(tableAvroSchema.toString());
}

/**
* Sets up the preCombine field into the given configuration {@code conf}
* through reading from the hoodie table metadata.
*
* This value is non-null as compaction can only be performed on MOR tables.
* Of which, MOR tables will have non-null precombine fields.
*
* @param conf The configuration
*/
public static void setPreCombineField(Configuration conf, HoodieTableMetaClient metaClient) {
String preCombineField = metaClient.getTableConfig().getPreCombineField();
conf.setString(FlinkOptions.PRECOMBINE_FIELD, preCombineField);
}

/**
* Infers the changelog mode based on the data file schema(including metadata fields).
*
Expand Down

0 comments on commit 528aaf7

Please sign in to comment.