Skip to content

Commit

Permalink
[HUDI-5107] Fix hadoop config in DirectWriteMarkers, HoodieFlinkEngin…
Browse files Browse the repository at this point in the history
…eContext and StreamerUtil are not consistent issue (#7094)

Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>
  • Loading branch information
TJX2014 and xiaoxingstack authored Nov 1, 2022
1 parent 79ad357 commit 2f53be3
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths");
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
FileSystem fileSystem = FSUtils.getFs(path, serializedConf.get());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
List<String> result = new ArrayList<>();
while (itr.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private HoodieFlinkEngineContext() {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier());
}

public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration hadoopConf) {
this(new SerializableConfiguration(hadoopConf), new DefaultTaskContextSupplier());
}

public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), writeConfig);
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
// create the filesystem view storage properties for client
final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
Expand Down

0 comments on commit 2f53be3

Please sign in to comment.