Skip to content

Commit

Permalink
[HUDI-3336][HUDI-FLINK]Support custom hadoop config for flink (#5528)
Browse files Browse the repository at this point in the history
* [HUDI-3336][HUDI-FLINK]Support custom hadoop config for flink
  • Loading branch information
cuibo01 authored May 13, 2022
1 parent 0cec955 commit 701f8c0
Show file tree
Hide file tree
Showing 26 changed files with 126 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,25 +709,17 @@ private FlinkOptions() {
// Prefix for Hoodie specific properties.
private static final String PROPERTIES_PREFIX = "properties.";

/**
* Collects the config options that start with 'properties.' into a 'key'='value' list.
*/
public static Map<String, String> getHoodieProperties(Map<String, String> options) {
return getHoodiePropertiesWithPrefix(options, PROPERTIES_PREFIX);
}

/**
* Collects the config options that start with specified prefix {@code prefix} into a 'key'='value' list.
*/
public static Map<String, String> getHoodiePropertiesWithPrefix(Map<String, String> options, String prefix) {
public static Map<String, String> getPropertiesWithPrefix(Map<String, String> options, String prefix) {
final Map<String, String> hoodieProperties = new HashMap<>();

if (hasPropertyOptions(options)) {
if (hasPropertyOptions(options, prefix)) {
options.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.filter(key -> key.startsWith(prefix))
.forEach(key -> {
final String value = options.get(key);
final String subKey = key.substring((prefix).length());
final String subKey = key.substring(prefix.length());
hoodieProperties.put(subKey, value);
});
}
Expand All @@ -749,8 +741,8 @@ public static Configuration flatOptions(Configuration conf) {
return fromMap(propsMap);
}

private static boolean hasPropertyOptions(Map<String, String> options) {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
private static boolean hasPropertyOptions(Map<String, String> options, String prefix) {
return options.keySet().stream().anyMatch(k -> k.startsWith(prefix));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.configuration;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.util.FlinkClientUtil;

import java.util.Map;

public class HadoopConfigurations {
private static final String HADOOP_PREFIX = "hadoop.";
private static final String PARQUET_PREFIX = "parquet.";

public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getPropertiesWithPrefix(options.toMap(), PARQUET_PREFIX);
parquetOptions.forEach((k, v) -> copy.set(PARQUET_PREFIX + k, v));
return copy;
}

/**
* Create a new hadoop configuration that is initialized with the given flink configuration.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
options.forEach((k, v) -> hadoopConf.set(k, v));
return hadoopConf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -49,9 +50,10 @@ public static class Config {

private Schema targetSchema;

@Deprecated
public FilebasedSchemaProvider(TypedProperties props) {
StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), StreamerUtil.getHadoopConf());
FileSystem fs = FSUtils.getFs(props.getString(Config.SOURCE_SCHEMA_FILE_PROP), HadoopConfigurations.getHadoopConf(new Configuration()));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.SOURCE_SCHEMA_FILE_PROP))));
if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
Expand All @@ -65,7 +67,7 @@ public FilebasedSchemaProvider(TypedProperties props) {

public FilebasedSchemaProvider(Configuration conf) {
final String sourceSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, StreamerUtil.getHadoopConf());
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, HadoopConfigurations.getHadoopConf(conf));
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.sink.meta.CkpMetadata;
Expand Down Expand Up @@ -122,7 +123,7 @@ public void initializeState(StateInitializationContext context) throws Exception
}
}

this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) {
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH));
this.ckpMetadata = CkpMetadata.getInstance(config);
this.initInstant = lastPendingInstant();
sendBootstrapEvent();
initWriterHelper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.hudi.sink.meta;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.StreamerUtil;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -70,8 +72,8 @@ public class CkpMetadata implements Serializable {
private List<CkpMessage> messages;
private List<String> instantCache;

private CkpMetadata(String basePath) {
this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath);
private CkpMetadata(Configuration config) {
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
}

private CkpMetadata(FileSystem fs, String basePath) {
Expand Down Expand Up @@ -196,8 +198,8 @@ public boolean isAborted(String instant) {
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
public static CkpMetadata getInstance(String basePath) {
return new CkpMetadata(basePath);
public static CkpMetadata getInstance(Configuration config) {
return new CkpMetadata(config);
}

public static CkpMetadata getInstance(FileSystem fs, String basePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void open(Configuration parameters) throws Exception {
super.open(parameters);
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
new SerializableConfiguration(HadoopConfigurations.getHadoopConf(this.conf)),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -60,7 +60,7 @@ public HiveSyncTool hiveSyncTool() {

public static HiveSyncContext create(Configuration conf) {
HiveSyncConfig syncConfig = buildSyncConfig(conf);
org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
String path = conf.getString(FlinkOptions.PATH);
FileSystem fs = FSUtils.getFs(path, hadoopConf);
HiveConf hiveConf = new HiveConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class FileIndex {
private FileIndex(Path path, Configuration conf) {
this.path = path;
this.metadataConfig = metadataConfig(conf);
this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf());
this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
}

public static FileIndex instance(Path path, Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -157,7 +158,7 @@ public void initializeState(FunctionInitializationContext context) throws Except
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
Expand Down Expand Up @@ -92,7 +93,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hudi.table.format.FormatUtils.getParquetConf;
import static org.apache.hudi.configuration.HadoopConfigurations.getParquetConf;

/**
* Hoodie batch table source that always read the latest snapshot of the underneath table.
Expand Down Expand Up @@ -155,7 +156,7 @@ public HoodieTableSource(
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.filters = filters == null ? Collections.emptyList() : filters;
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -93,7 +94,7 @@ public class HoodieCatalog extends AbstractCatalog {
public HoodieCatalog(String name, Configuration options) {
super(name, options.get(DEFAULT_DATABASE));
this.catalogPathStr = options.get(CATALOG_PATH);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(options);
this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
Expand All @@ -49,7 +48,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -253,14 +251,4 @@ public static HoodieMergedLogRecordScanner logScanner(
private static Boolean string2Boolean(String s) {
return "true".equals(s.toLowerCase(Locale.ROOT));
}

public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
final String prefix = "parquet.";
org.apache.hadoop.conf.Configuration copy = new org.apache.hadoop.conf.Configuration(hadoopConf);
Map<String, String> parquetOptions = FlinkOptions.getHoodiePropertiesWithPrefix(options.toMap(), prefix);
parquetOptions.forEach((k, v) -> copy.set(prefix + k, v));
return copy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils;
Expand All @@ -36,7 +37,6 @@
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.StringToRowDataConverter;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -167,7 +167,7 @@ public static Builder builder() {
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.closed = false;
this.hadoopConf = StreamerUtil.getHadoopConf();
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
if (split.getInstantRange() != null) {
// base file only with commit time filtering
Expand Down Expand Up @@ -306,7 +306,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos)
return ParquetSplitReaderUtil.genPartColumnarRowReader(
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
true,
FormatUtils.getParquetConf(this.conf, hadoopConf),
HadoopConfigurations.getParquetConf(this.conf, hadoopConf),
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new DataType[0]),
partObjects,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;

import static org.apache.hudi.util.StreamerUtil.getHadoopConf;
import static org.apache.hudi.configuration.HadoopConfigurations.getHadoopConf;
import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig;

/**
Expand All @@ -44,7 +44,7 @@ private FlinkTables() {
*/
public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(getHadoopConf()),
new SerializableConfiguration(getHadoopConf(conf)),
new FlinkTaskContextSupplier(runtimeContext));
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
return HoodieFlinkTable.create(writeConfig, context);
Expand Down
Loading

0 comments on commit 701f8c0

Please sign in to comment.