Skip to content

Commit

Permalink
reuse schemaResolver and remove workaround when last insertion may ge…
Browse files Browse the repository at this point in the history
…t lost
  • Loading branch information
trushev committed Jul 1, 2022
1 parent 33182d1 commit 88ce744
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ protected void loadRecords(String partitionPath) throws Exception {

if (latestCommitTime.isPresent()) {
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
TableSchemaResolver schemaResolver = new TableSchemaResolver(this.hoodieTable.getMetaClient());
Schema schema = schemaResolver.getTableAvroSchema();

List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
Expand Down Expand Up @@ -230,9 +231,8 @@ protected void loadRecords(String partitionPath) throws Exception {
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
InternalSchema internalSchema = new TableSchemaResolver(this.hoodieTable.getMetaClient())
.getTableInternalSchemaFromCommitMetadata()
.orElse(InternalSchema.getEmptyInternalSchema());
InternalSchema internalSchema = schemaResolver.getTableInternalSchemaFromCommitMetadata()
.orElse(InternalSchema.getEmptyInternalSchema());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, internalSchema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,6 @@ private void checkAnswer(TableResult actualResult, String... expectedResult) {
} catch (Exception e) {
throw new RuntimeException(e);
}

// The last insertion may get lost due to org.apache.hudi.sink.StreamWriteOperatorCoordinator#handleEventFromOperator
// can handle EndInputEvent before WriteMetaEvent
if (!actual.contains("+I[Julian, 30000.3, 53]")
&& actual.contains("+I[Julian, null, 53]")
&& expected.equals(new HashSet<>(Arrays.asList(expectedMergedResult)))) {
actual.remove("+I[Julian, null, 53]");
actual.add("+I[Julian, 30000.3, 53]");
}

assertEquals(expected, actual);
}

Expand Down

0 comments on commit 88ce744

Please sign in to comment.