Skip to content

Commit

Permalink
[HUDI-5010] Fix flink hive catalog external config not work (apache#6923
Browse files Browse the repository at this point in the history
)

* fix flink catalog external config not work
  • Loading branch information
wxplovecc authored and satishkotha committed Dec 11, 2022
1 parent 8b1d5c2 commit a972325
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ public Catalog createCatalog(Context context) {
case "hms":
return new HoodieHiveCatalog(
context.getName(),
helper.getOptions().get(CatalogOptions.CATALOG_PATH),
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
(Configuration) helper.getOptions());
case "dfs":
return new HoodieCatalog(
context.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
Expand All @@ -117,21 +116,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {

// optional catalog base path: used for db/table path inference.
private final String catalogPath;
private final boolean external;

public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
public HoodieHiveCatalog(String catalogName, Configuration options) {
this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR)), false);
}

public HoodieHiveCatalog(
String catalogName,
String catalogPath,
String defaultDatabase,
Configuration options,
HiveConf hiveConf,
boolean allowEmbedded) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
super(catalogName, options.getString(CatalogOptions.DEFAULT_DATABASE));
// fallback to hive.metastore.warehouse.dir if catalog path is not specified
this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
this.hiveConf = hiveConf;
this.catalogPath = options.getString(CatalogOptions.CATALOG_PATH, hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE));
this.external = options.getBoolean(CatalogOptions.TABLE_EXTERNAL);
if (!allowEmbedded) {
checkArgument(
!HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf),
Expand Down Expand Up @@ -512,7 +512,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,

Map<String, String> properties = new HashMap<>(table.getOptions());

if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
if (external) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
properties.put("EXTERNAL", "TRUE");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.hadoop.hive.conf.HiveConf;

Expand All @@ -42,10 +43,15 @@ public static HoodieHiveCatalog createHiveCatalog() {
}

public static HoodieHiveCatalog createHiveCatalog(String name) {
return createHiveCatalog(name, false);
}

public static HoodieHiveCatalog createHiveCatalog(String name, boolean external) {
Configuration options = new Configuration();
options.setBoolean(CatalogOptions.TABLE_EXTERNAL, external);
return new HoodieHiveCatalog(
name,
null,
null,
options,
createHiveConf(),
true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void testCreateHMSCatalog() {
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
options.put(CatalogOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath());
options.put(CatalogOptions.MODE.key(), "hms");
options.put(CatalogOptions.TABLE_EXTERNAL.key(), "false");

final Catalog actualCatalog =
FactoryUtil.createCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieCatalogException;
Expand All @@ -35,7 +36,6 @@
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -44,7 +44,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -55,7 +54,6 @@

import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -149,28 +147,23 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except
assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateExternalTable(boolean isExternal) throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
@Test
public void testCreateExternalTable() throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
HoodieHiveCatalog catalog = HoodieCatalogTestUtils.createHiveCatalog("myCatalog", true);
catalog.open();
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
originOptions.put(CatalogOptions.TABLE_EXTERNAL.key(), String.valueOf(isExternal));
CatalogTable table =
new CatalogTableImpl(schema, originOptions, "hudi table");
hoodieCatalog.createTable(tablePath, table, false);
Table table1 = hoodieCatalog.getHiveTable(tablePath);
if (isExternal) {
assertTrue(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
assertEquals("EXTERNAL_TABLE", table1.getTableType());
} else {
assertFalse(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
assertEquals("MANAGED_TABLE", table1.getTableType());
}
catalog.createTable(tablePath, table, false);
Table table1 = catalog.getHiveTable(tablePath);
assertTrue(Boolean.parseBoolean(table1.getParameters().get("EXTERNAL")));
assertEquals("EXTERNAL_TABLE", table1.getTableType());

hoodieCatalog.dropTable(tablePath, false);
catalog.dropTable(tablePath, false);
Path path = new Path(table1.getParameters().get(FlinkOptions.PATH.key()));
boolean exists = StreamerUtil.fileExists(FileSystem.getLocal(new Configuration()), path);
assertTrue(isExternal && exists || !isExternal && !exists);
boolean created = StreamerUtil.fileExists(FSUtils.getFs(path, new Configuration()), path);
assertTrue(created, "Table should have been created");
}

@Test
Expand Down

0 comments on commit a972325

Please sign in to comment.