Skip to content

Commit

Permalink
Fixing schemas used for bootstrap reader
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin committed Dec 14, 2022
1 parent 292630b commit ee8c9df
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public static HoodieMergeHelper newInstance() {
public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();

Configuration hadoopConf = new Configuration(table.getHadoopConf());
Expand Down Expand Up @@ -134,9 +133,16 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
// NOTE: It's important for us to rely on writer's schema here
// - When records will be read by Parquet reader, if schema will be decoded from the
// file itself by taking its Parquet one and converting it to Avro. This will be problematic
// w/ schema validations of the records since Avro's schemas also validate corresponding
// qualified names of the structs, which could not be reconstructed when converting from
// Parquet to Avro (b/c Parquet doesn't bear these)
Schema bootstrapSchema = externalSchemaTransformation ? bootstrapFileReader.getSchema() : mergeHandle.getWriterSchema();
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
bootstrapFileReader.getRecordIterator(bootstrapSchema),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
if (needToReWriteRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,17 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
if (baseFile.getBootstrapBaseFile().isPresent()) {
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
// NOTE: It's important for us to rely on writer's schema here
// - When records will be read by Parquet reader, if schema will be decoded from the
// file itself by taking its Parquet one and converting it to Avro. This will be problematic
// w/ schema validations of the records since Avro's schemas also validate corresponding
// qualified names of the structs, which could not be reconstructed when converting from
// Parquet to Avro (b/c Parquet doesn't bear these)
Schema bootstrapSchema = externalSchemaTransformation ? bootstrapFileReader.getSchema() : mergeHandle.getWriterSchema();
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
bootstrapFileReader.getRecordIterator(bootstrapSchema),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
readerIterator = baseFileReader.getRecordIterator(readSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,16 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
// NOTE: It's important for us to rely on writer's schema here
// - When records will be read by Parquet reader, if schema will be decoded from the
// file itself by taking its Parquet one and converting it to Avro. This will be problematic
// w/ schema validations of the records since Avro's schemas also validate corresponding
// qualified names of the structs, which could not be reconstructed when converting from
// Parquet to Avro (b/c Parquet doesn't bear these)
Schema bootstrapSchema = externalSchemaTransformation ? bootstrapFileReader.getSchema() : mergeHandle.getWriterSchema();
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
bootstrapFileReader.getRecordIterator(bootstrapSchema),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
readerIterator = baseFileReader.getRecordIterator(readSchema);
Expand Down

0 comments on commit ee8c9df

Please sign in to comment.