diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java index e81338207963..b5c9b3004f1c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java @@ -106,7 +106,7 @@ public Set createdAndMergedDataPaths(HoodieEngineContext context, int pa context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths"); dataFiles.addAll(context.flatMap(subDirectories, directory -> { Path path = new Path(directory); - FileSystem fileSystem = path.getFileSystem(serializedConf.get()); + FileSystem fileSystem = FSUtils.getFs(path, serializedConf.get()); RemoteIterator itr = fileSystem.listFiles(path, true); List result = new ArrayList<>(); while (itr.hasNext()) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 5e38c24d3091..c9136da6bb45 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -67,6 +67,10 @@ private HoodieFlinkEngineContext() { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier()); } + public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration hadoopConf) { + this(new SerializableConfiguration(hadoopConf), new DefaultTaskContextSupplier()); + } + public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index cdacefbf173e..35446ce76862 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -423,7 +423,7 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); // build the write client to start the embedded timeline server - final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), writeConfig); writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); // create the filesystem view storage properties for client final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();