From 542efdd9e7b8d51f9e71ea75d2a571a1c842bc92 Mon Sep 17 00:00:00 2001 From: Lanyuanxiaoyao Date: Wed, 11 May 2022 06:45:53 +0800 Subject: [PATCH] [HUDI-4003] Try to read all the log file to parse schema (#5473) --- .../common/table/TableSchemaResolver.java | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index f178a23eeec7..b76f71161d32 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -61,6 +61,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; @@ -98,8 +99,8 @@ private MessageType getTableParquetSchemaFromDataFile() { // For COW table, the file has data written must be in parquet or orc format currently. if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get(); - return readSchemaFromBaseFile(filePath); + Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator(); + return fetchSchemaFromFiles(filePaths); } else { throw new IllegalArgumentException("Could not find any data file written for commit, " + "so could not get schema for table " + metaClient.getBasePath()); @@ -109,13 +110,8 @@ private MessageType getTableParquetSchemaFromDataFile() { // Determine the file format based on the file name, and then extract schema from it. if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get(); - if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { - // this is a log file - return readSchemaFromLogFile(new Path(filePath)); - } else { - return readSchemaFromBaseFile(filePath); - } + Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator(); + return fetchSchemaFromFiles(filePaths); } else { throw new IllegalArgumentException("Could not find any data file written for commit, " + "so could not get schema for table " + metaClient.getBasePath()); @@ -129,6 +125,20 @@ private MessageType getTableParquetSchemaFromDataFile() { } } + private MessageType fetchSchemaFromFiles(Iterator filePaths) throws IOException { + MessageType type = null; + while (filePaths.hasNext() && type == null) { + String filePath = filePaths.next(); + if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { + // this is a log file + type = readSchemaFromLogFile(new Path(filePath)); + } else { + type = readSchemaFromBaseFile(filePath); + } + } + return type; + } + private MessageType readSchemaFromBaseFile(String filePath) throws IOException { if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { // this is a parquet file