Skip to content

Commit

Permalink
Fixing non serializable path used in engineContext with metadata tabl…
Browse files Browse the repository at this point in the history
…e intialization
  • Loading branch information
nsivabalan committed Oct 22, 2022
1 parent cd67964 commit 59bfa92
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -616,23 +618,24 @@ private HoodieTableMetaClient initializeMetaClient(boolean populateMetaFields) t
* @return Map of partition names to a list of FileStatus for all the files in the partition
*/
private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaClient) {
List<Path> pathsToList = new LinkedList<>();
pathsToList.add(new Path(dataWriteConfig.getBasePath()));
List<SerializablePath> pathsToList = new LinkedList<>();
pathsToList.add(new SerializablePath(new CachingPath(dataWriteConfig.getBasePath())));

List<DirectoryInfo> partitionsToBootstrap = new LinkedList<>();
final int fileListingParallelism = metadataWriteConfig.getFileListingParallelism();
SerializableConfiguration conf = new SerializableConfiguration(datasetMetaClient.getHadoopConf());
final String dirFilterRegex = dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
final String datasetBasePath = datasetMetaClient.getBasePath();
SerializablePath serializableBasePath = new SerializablePath(new CachingPath(datasetBasePath));

while (!pathsToList.isEmpty()) {
// In each round we will list a section of directories
int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
FileSystem fs = path.get().getFileSystem(conf.get());
String relativeDirPath = FSUtils.getRelativePartitionPath(serializableBasePath.get(), path.get());
return new DirectoryInfo(relativeDirPath, fs.listStatus(path.get()));
}, numDirsToList);

pathsToList = new LinkedList<>(pathsToList.subList(numDirsToList, pathsToList.size()));
Expand All @@ -656,7 +659,9 @@ private List<DirectoryInfo> listAllPartitions(HoodieTableMetaClient datasetMetaC
partitionsToBootstrap.add(dirInfo);
} else {
// Add sub-dirs to the queue
pathsToList.addAll(dirInfo.getSubDirectories());
pathsToList.addAll(dirInfo.getSubDirectories().stream()
.map(path -> new SerializablePath(new CachingPath(path.toUri())))
.collect(Collectors.toList()));
}
}
}
Expand Down

0 comments on commit 59bfa92

Please sign in to comment.