Skip to content

Commit

Permalink
[HUDI-5107] Fix hadoop config in DirectWriteMarkers, HoodieFlinkEngin…
Browse files Browse the repository at this point in the history
…eContext and StreamerUtil are not consistent issue (apache#7094)

Co-authored-by: xiaoxingstack <xiaoxingstack@didiglobal.com>
  • Loading branch information
2 people authored and satishkotha committed Dec 11, 2022
1 parent a2228f7 commit eb86d5d
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker files for all created, merged paths");
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
FileSystem fileSystem = FSUtils.getFs(path, serializedConf.get());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
List<String> result = new ArrayList<>();
while (itr.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ private HoodieFlinkEngineContext() {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier());
}

public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration hadoopConf) {
this(new SerializableConfiguration(hadoopConf), new DefaultTaskContextSupplier());
}

public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand Down Expand Up @@ -419,7 +420,8 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), writeConfig);
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
// create the filesystem view storage properties for client
final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
// rebuild the view storage config with simplified options.
Expand Down

0 comments on commit eb86d5d

Please sign in to comment.