From bec05ff4c54f22d4e143c008ce38e5a16205fc9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= <408317717@qq.com> Date: Fri, 14 Oct 2022 09:57:41 +0800 Subject: [PATCH] [HUDI-5010] Fix flink hive catalog external config not work (#6923) * fix flink catalog external config not work --- .../table/catalog/HoodieCatalogFactory.java | 4 +-- .../hudi/table/catalog/HoodieHiveCatalog.java | 16 +++++----- .../table/catalog/HoodieCatalogTestUtils.java | 10 ++++-- .../catalog/TestHoodieCatalogFactory.java | 1 + .../table/catalog/TestHoodieHiveCatalog.java | 31 +++++++------------ 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java index ef33b1c991d5..436b836eff46 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java @@ -56,9 +56,7 @@ public Catalog createCatalog(Context context) { case "hms": return new HoodieHiveCatalog( context.getName(), - helper.getOptions().get(CatalogOptions.CATALOG_PATH), - helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE), - helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR)); + (Configuration) helper.getOptions()); case "dfs": return new HoodieCatalog( context.getName(), diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 89172f22f5d9..01b73b8605a3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -101,7 +101,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; import static org.apache.hudi.configuration.FlinkOptions.PATH; -import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB; import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT; import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME; import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER; @@ -117,21 +116,22 @@ public class HoodieHiveCatalog extends AbstractCatalog { // optional catalog base path: used for db/table path inference. private final String catalogPath; + private final boolean external; - public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) { - this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false); + public HoodieHiveCatalog(String catalogName, Configuration options) { + this(catalogName, options, HoodieCatalogUtil.createHiveConf(options.getString(CatalogOptions.HIVE_CONF_DIR)), false); } public HoodieHiveCatalog( String catalogName, - String catalogPath, - String defaultDatabase, + Configuration options, HiveConf hiveConf, boolean allowEmbedded) { - super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase); + super(catalogName, options.getString(CatalogOptions.DEFAULT_DATABASE)); // fallback to hive.metastore.warehouse.dir if catalog path is not specified - this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath; this.hiveConf = hiveConf; + this.catalogPath = options.getString(CatalogOptions.CATALOG_PATH, hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)); + this.external = options.getBoolean(CatalogOptions.TABLE_EXTERNAL); if (!allowEmbedded) { checkArgument( !HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf), @@ -512,7 +512,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, Map properties = new HashMap<>(table.getOptions()); - if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) { + if (external) { hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString()); properties.put("EXTERNAL", "TRUE"); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java index 04bae7376817..c98b4ac0da29 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.catalog; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,10 +43,15 @@ public static HoodieHiveCatalog createHiveCatalog() { } public static HoodieHiveCatalog createHiveCatalog(String name) { + return createHiveCatalog(name, false); + } + + public static HoodieHiveCatalog createHiveCatalog(String name, boolean external) { + Configuration options = new Configuration(); + options.setBoolean(CatalogOptions.TABLE_EXTERNAL, external); return new HoodieHiveCatalog( name, - null, - null, + options, createHiveConf(), true); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java index 961a340e1963..6e7ee2e8f84b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java @@ -78,6 +78,7 @@ void testCreateHMSCatalog() { options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER); options.put(CatalogOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath()); options.put(CatalogOptions.MODE.key(), "hms"); + options.put(CatalogOptions.TABLE_EXTERNAL.key(), "false"); final Catalog actualCatalog = FactoryUtil.createCatalog( diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 014933f39e1b..ffae71d6b249 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.catalog; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieCatalogException; @@ -35,7 +36,6 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.factories.FactoryUtil; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.jupiter.api.AfterAll; @@ -44,7 +44,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.Collections; @@ -55,7 +54,6 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -149,28 +147,23 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key())); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testCreateExternalTable(boolean isExternal) throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException { + @Test + public void testCreateExternalTable() throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException { + HoodieHiveCatalog catalog = HoodieCatalogTestUtils.createHiveCatalog("myCatalog", true); + catalog.open(); Map originOptions = new HashMap<>(); originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi"); - originOptions.put(CatalogOptions.TABLE_EXTERNAL.key(), String.valueOf(isExternal)); CatalogTable table = new CatalogTableImpl(schema, originOptions, "hudi table"); - hoodieCatalog.createTable(tablePath, table, false); - Table table1 = hoodieCatalog.getHiveTable(tablePath); - if (isExternal) { - assertTrue(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key()))); - assertEquals("EXTERNAL_TABLE", table1.getTableType()); - } else { - assertFalse(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key()))); - assertEquals("MANAGED_TABLE", table1.getTableType()); - } + catalog.createTable(tablePath, table, false); + Table table1 = catalog.getHiveTable(tablePath); + assertTrue(Boolean.parseBoolean(table1.getParameters().get("EXTERNAL"))); + assertEquals("EXTERNAL_TABLE", table1.getTableType()); - hoodieCatalog.dropTable(tablePath, false); + catalog.dropTable(tablePath, false); Path path = new Path(table1.getParameters().get(FlinkOptions.PATH.key())); - boolean exists = StreamerUtil.fileExists(FileSystem.getLocal(new Configuration()), path); - assertTrue(isExternal && exists || !isExternal && !exists); + boolean created = StreamerUtil.fileExists(FSUtils.getFs(path, new Configuration()), path); + assertTrue(created, "Table should have been created"); } @Test