Skip to content

Commit

Permalink
[HUDI-7050] Flink HoodieHiveCatalog supports hadoop parameters (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
waywtdcc authored Nov 15, 2023
1 parent 19b3e7f commit 424e0ce
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Utilities for fetching hadoop configurations.
*/
public class HadoopConfigurations {
private static final String HADOOP_PREFIX = "hadoop.";
public static final String HADOOP_PREFIX = "hadoop.";
private static final String PARQUET_PREFIX = "parquet.";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieCatalogException;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -50,7 +51,7 @@ public String factoryIdentifier() {
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
helper.validateExcept(HadoopConfigurations.HADOOP_PREFIX);
String mode = helper.getOptions().get(CatalogOptions.MODE);
switch (mode.toLowerCase(Locale.ROOT)) {
case "hms":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class HoodieCatalogUtil {
* @param hiveConfDir Hive conf directory path.
* @return A HiveConf instance.
*/
public static HiveConf createHiveConf(@Nullable String hiveConfDir) {
public static HiveConf createHiveConf(@Nullable String hiveConfDir, org.apache.flink.configuration.Configuration flinkConf) {
// create HiveConf from hadoop configuration with hadoop conf directory configured.
Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new org.apache.flink.configuration.Configuration());
Configuration hadoopConf = HadoopConfigurations.getHadoopConf(flinkConf);

// ignore all the static conf file URLs that HiveConf may have set
HiveConf.setHiveSiteLocation(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
private final boolean external;

public HoodieHiveCatalog(String catalogName, Configuration options) {
this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR)), false);
this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR), options), false);
}

public HoodieHiveCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static HoodieHiveCatalog createHiveCatalog(String name) {

public static HoodieHiveCatalog createHiveCatalog(String name, boolean external) {
Configuration options = new Configuration();
options.setString("hadoop.dfs.client.block.write.replace-datanode-on-failure.enable", "true");
options.setBoolean(CatalogOptions.TABLE_EXTERNAL, external);
return new HoodieHiveCatalog(
name,
Expand Down

0 comments on commit 424e0ce

Please sign in to comment.