From b5071b8cf7fee4ebfe7ce8c866f7bacd42e26444 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 22 Jul 2022 12:19:04 -0700 Subject: [PATCH] Revert "[MINOR] Fix CI issue with TestHiveSyncTool (#6110)" This reverts commit d5c904e10e04980d360129a2ed6b73432b1d2206. --- azure-pipelines.yml | 4 - .../testsuite/job/TestHoodieTestSuiteJob.java | 3 +- .../org/apache/hudi/hive/HiveSyncConfig.java | 3 +- .../TestHiveSyncGlobalCommitTool.java | 6 +- .../hudi/hive/testutils/HiveTestService.java | 159 +++++++++++------- .../hudi/hive/testutils/HiveTestUtil.java | 4 +- ...{HiveTestCluster.java => TestCluster.java} | 61 ++++--- .../HoodieDeltaStreamerTestBase.java | 5 +- .../functional/TestHoodieDeltaStreamer.java | 2 +- .../testutils/UtilitiesTestBase.java | 7 +- 10 files changed, 152 insertions(+), 102 deletions(-) rename hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/{HiveTestCluster.java => TestCluster.java} (86%) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 056f97edf3ce7..dee3e326a9659 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -88,7 +88,6 @@ stages: - stage: test jobs: - job: UT_FT_1 - condition: false displayName: UT FT common & flink & UT client/spark-client timeoutInMinutes: '120' steps: @@ -119,7 +118,6 @@ stages: jdkVersionOption: '1.8' mavenOptions: '-Xmx4g' - job: UT_FT_2 - condition: false displayName: FT client/spark-client timeoutInMinutes: '120' steps: @@ -171,7 +169,6 @@ stages: jdkVersionOption: '1.8' mavenOptions: '-Xmx4g' - job: UT_FT_4 - condition: false displayName: UT FT other modules timeoutInMinutes: '120' steps: @@ -202,7 +199,6 @@ stages: jdkVersionOption: '1.8' mavenOptions: '-Xmx4g' - job: IT - condition: false displayName: IT modules timeoutInMinutes: '120' steps: diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java index ddf5b07247c0d..485c43d4ebfe5 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java @@ -55,7 +55,6 @@ import java.util.stream.Stream; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; -import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; @@ -181,7 +180,7 @@ private static TypedProperties getProperties() { // Make path selection test suite specific props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName()); // Hive Configs - props.setProperty(HIVE_URL.key(), HS2_JDBC_URL); + props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/"); props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1"); props.setProperty(META_SYNC_TABLE_NAME.key(), "table1"); props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 3dc0e4496c0ec..cdb192f9fedd5 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -63,8 +63,7 @@ public HiveSyncConfig(Properties props) { public HiveSyncConfig(Properties props, Configuration hadoopConf) { super(props, hadoopConf); - HiveConf hiveConf = hadoopConf instanceof HiveConf - ? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class); + HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class); // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory hiveConf.addResource(getHadoopFileSystem().getConf()); setHadoopConf(hiveConf); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java index 02c44f586f8d4..9dffdd0444d94 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java @@ -19,7 +19,7 @@ package org.apache.hudi.hive.replication; -import org.apache.hudi.hive.testutils.HiveTestCluster; +import org.apache.hudi.hive.testutils.TestCluster; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; @@ -53,9 +53,9 @@ public class TestHiveSyncGlobalCommitTool { @RegisterExtension - public static HiveTestCluster localCluster = new HiveTestCluster(); + public static TestCluster localCluster = new TestCluster(); @RegisterExtension - public static HiveTestCluster remoteCluster = new HiveTestCluster(); + public static TestCluster remoteCluster = new TestCluster(); private static final String DB_NAME = "foo"; private static final String TBL_NAME = "bar"; diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java index 16f6bfe53dbba..66343bfd19de1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java @@ -18,7 +18,7 @@ package org.apache.hudi.hive.testutils; -import org.apache.hudi.common.testutils.NetworkTestUtils; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hadoop.conf.Configuration; @@ -62,40 +62,71 @@ public class HiveTestService { private static final Logger LOG = LogManager.getLogger(HiveTestService.class); - private static final int CONNECTION_TIMEOUT_MS = 30000; - private static final String BIND_HOST = "127.0.0.1"; - private static final int HS2_THRIFT_PORT = 9999; - public static final String HS2_JDBC_URL = String.format("jdbc:hive2://%s:%s/", BIND_HOST, HS2_THRIFT_PORT); - - private final Configuration hadoopConf; - private final String workDir; - private final Map sysProps = new HashMap<>(); + + private static final int CONNECTION_TIMEOUT = 30000; + + /** + * Configuration settings. + */ + private Configuration hadoopConf; + private String workDir; + private String bindIP = "127.0.0.1"; + private int metastorePort = 9083; + private int serverPort = 9999; + private boolean clean = true; + + private Map sysProps = new HashMap<>(); private ExecutorService executorService; private TServer tServer; private HiveServer2 hiveServer; - private HiveConf hiveConf; + private HiveConf serverConf; public HiveTestService(Configuration hadoopConf) throws IOException { this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath(); this.hadoopConf = hadoopConf; } + public Configuration getHadoopConf() { + return hadoopConf; + } + + public TServer getHiveMetaStore() { + return tServer; + } + + public HiveConf getServerConf() { + return serverConf; + } + public HiveServer2 start() throws IOException { Objects.requireNonNull(workDir, "The work dir must be set before starting cluster."); + if (hadoopConf == null) { + hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); + } + String localHiveLocation = getHiveLocation(workDir); - LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh."); - File file = new File(localHiveLocation); - FileIOUtils.deleteDirectory(file); + if (clean) { + LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh."); + File file = new File(localHiveLocation); + FileIOUtils.deleteDirectory(file); + } - hiveConf = configureHive(hadoopConf, localHiveLocation); + serverConf = configureHive(hadoopConf, localHiveLocation); executorService = Executors.newSingleThreadExecutor(); - tServer = startMetaStore(hiveConf); + tServer = startMetaStore(bindIP, serverConf); - hiveServer = startHiveServer(hiveConf); + serverConf.set("hive.in.test", "true"); + hiveServer = startHiveServer(serverConf); - if (!waitForServerUp(hiveConf)) { + String serverHostname; + if (bindIP.equals("0.0.0.0")) { + serverHostname = "localhost"; + } else { + serverHostname = bindIP; + } + if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) { throw new IOException("Waiting for startup of standalone server"); } @@ -125,69 +156,76 @@ public void stop() { LOG.info("Hive Minicluster service shut down."); tServer = null; hiveServer = null; + hadoopConf = null; } public HiveServer2 getHiveServer() { return hiveServer; } - public HiveConf getHiveConf() { - return hiveConf; - } - public int getHiveServerPort() { - return hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); + return serverPort; } public String getJdbcHive2Url() { - return String.format("jdbc:hive2://%s:%s/", - hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST), hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT)); + return String.format("jdbc:hive2://%s:%s/default", bindIP, serverPort); } - public HiveConf configureHive(Configuration hadoopConf, String localHiveLocation) throws IOException { - hadoopConf.set("hive.metastore.local", "false"); - hadoopConf.set("datanucleus.schema.autoCreateTables", "true"); - hadoopConf.set("datanucleus.autoCreateSchema", "true"); - hadoopConf.set("datanucleus.fixedDatastore", "false"); - HiveConf conf = new HiveConf(hadoopConf, HiveConf.class); - conf.setBoolVar(ConfVars.HIVE_IN_TEST, true); - conf.setBoolVar(ConfVars.METASTORE_SCHEMA_VERIFICATION, false); - conf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, HS2_THRIFT_PORT); - conf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, BIND_HOST); - final int metastoreServerPort = NetworkTestUtils.nextFreePort(); - conf.setIntVar(ConfVars.METASTORE_SERVER_PORT, metastoreServerPort); - conf.setVar(ConfVars.METASTOREURIS, "thrift://" + BIND_HOST + ":" + metastoreServerPort); + public HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException { + conf.set("hive.metastore.local", "false"); + int port = metastorePort; + if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) == null) { + conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort); + } else { + port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort); + } + if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) == null) { + conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort); + } + conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + port); + conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP); + // The following line to turn of SASL has no effect since HiveAuthFactory calls + // 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657, + // in Hive 0.14. + // As a workaround, the property is set in hive-site.xml in this module. + // conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL"); File localHiveDir = new File(localHiveLocation); localHiveDir.mkdirs(); File metastoreDbDir = new File(localHiveDir, "metastore_db"); - conf.setVar(ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:" + metastoreDbDir.getPath() + ";create=true"); + conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, + "jdbc:derby:" + metastoreDbDir.getPath() + ";create=true"); File derbyLogFile = new File(localHiveDir, "derby.log"); derbyLogFile.createNewFile(); setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); setSystemProperty("derby.system.home", localHiveDir.getAbsolutePath()); - File metastoreWarehouseDir = new File(localHiveDir, "warehouse"); - metastoreWarehouseDir.mkdir(); - conf.setVar(ConfVars.METASTOREWAREHOUSE, metastoreWarehouseDir.getAbsolutePath()); + conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath()); + conf.set("datanucleus.schema.autoCreateTables", "true"); + conf.set("hive.metastore.schema.verification", "false"); + conf.set("datanucleus.autoCreateSchema", "true"); + conf.set("datanucleus.fixedDatastore", "false"); + setSystemProperty("derby.stream.error.file", derbyLogFile.getPath()); - return conf; + return new HiveConf(conf, this.getClass()); } - private boolean waitForServerUp(HiveConf serverConf) { - LOG.info("waiting for " + serverConf.getVar(ConfVars.METASTOREURIS)); - final long start = System.currentTimeMillis(); + private boolean waitForServerUp(HiveConf serverConf, String hostname, int timeout) { + long start = System.currentTimeMillis(); + int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT); while (true) { try { new HiveMetaStoreClient(serverConf); return true; } catch (MetaException e) { // ignore as this is expected + LOG.info("server " + hostname + ":" + port + " not up " + e); } - if (System.currentTimeMillis() > start + CONNECTION_TIMEOUT_MS) { + if (System.currentTimeMillis() > start + timeout) { break; } try { - Thread.sleep(CONNECTION_TIMEOUT_MS / 10); + Thread.sleep(250); } catch (InterruptedException e) { // ignore } @@ -269,23 +307,28 @@ protected TSocket acceptImpl() throws TTransportException { } } - private TServer startMetaStore(HiveConf conf) throws IOException { + public TServer startMetaStore(String forceBindIP, HiveConf conf) throws IOException { try { // Server will create new threads up to max as necessary. After an idle // period, it will destory threads to keep the number of threads in the // pool to min. - String host = conf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); - int port = conf.getIntVar(ConfVars.METASTORE_SERVER_PORT); - int minWorkerThreads = conf.getIntVar(ConfVars.METASTORESERVERMINTHREADS); - int maxWorkerThreads = conf.getIntVar(ConfVars.METASTORESERVERMAXTHREADS); - boolean tcpKeepAlive = conf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE); - boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); + int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT); + int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS); + int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS); + boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE); + boolean useFramedTransport = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT); // don't support SASL yet - // boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL); + // boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL); + + TServerTransport serverTransport; + if (forceBindIP != null) { + InetSocketAddress address = new InetSocketAddress(forceBindIP, port); + serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address); - InetSocketAddress address = new InetSocketAddress(host, port); - TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address); + } else { + serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port); + } TProcessor processor; TTransportFactory transFactory; @@ -293,7 +336,7 @@ private TServer startMetaStore(HiveConf conf) throws IOException { HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, false); IHMSHandler handler = RetryingHMSHandler.getProxy(conf, baseHandler, true); - if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) { + if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) { transFactory = useFramedTransport ? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory()) : new TUGIContainingTransport.Factory(); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 6cae616e601ff..9687e557928bd 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -125,6 +125,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti hiveTestService = new HiveTestService(configuration); hiveServer = hiveTestService.start(); } + fileSystem = FileSystem.get(configuration); basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString(); @@ -140,8 +141,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); - hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, hiveTestService.getHiveConf()); - fileSystem = hiveSyncConfig.getHadoopFileSystem(); + hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, configuration); dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd"); ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java similarity index 86% rename from hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java rename to hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java index 39813394d2cf7..c1f891fce8431 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; @@ -56,6 +57,7 @@ import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.runners.model.InitializationError; import java.io.File; import java.io.FileOutputStream; @@ -63,7 +65,6 @@ import java.io.OutputStream; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; @@ -74,15 +75,16 @@ import static org.junit.jupiter.api.Assertions.fail; -public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback, - BeforeEachCallback, AfterEachCallback { - public MiniDFSCluster dfsCluster; +public class TestCluster implements BeforeAllCallback, AfterAllCallback, + BeforeEachCallback, AfterEachCallback { private HdfsTestService hdfsTestService; - private HiveTestService hiveTestService; - private HiveConf conf; - private HiveServer2 server2; - private DateTimeFormatter dtfOut; - private File hiveSiteXml; + public HiveTestService hiveTestService; + private Configuration conf; + public HiveServer2 server2; + private static volatile int port = 9083; + public MiniDFSCluster dfsCluster; + DateTimeFormatter dtfOut; + public File hiveSiteXml; private IMetaStoreClient client; @Override @@ -107,18 +109,24 @@ public void setup() throws Exception { hdfsTestService = new HdfsTestService(); dfsCluster = hdfsTestService.start(true); - Configuration hadoopConf = hdfsTestService.getHadoopConf(); - hiveTestService = new HiveTestService(hadoopConf); + conf = hdfsTestService.getHadoopConf(); + conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++); + conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++); + conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++); + hiveTestService = new HiveTestService(conf); server2 = hiveTestService.start(); dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd"); hiveSiteXml = File.createTempFile("hive-site", ".xml"); hiveSiteXml.deleteOnExit(); - conf = hiveTestService.getHiveConf(); try (OutputStream os = new FileOutputStream(hiveSiteXml)) { - conf.writeXml(os); + hiveTestService.getServerConf().writeXml(os); } client = HiveMetaStoreClient.newSynchronizedClient( - RetryingMetaStoreClient.getProxy(conf, true)); + RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(), true)); + } + + public Configuration getConf() { + return this.conf; } public String getHiveSiteXmlLocation() { @@ -130,7 +138,7 @@ public IMetaStoreClient getHMSClient() { } public String getHiveJdBcUrl() { - return hiveTestService.getJdbcHive2Url(); + return "jdbc:hive2://127.0.0.1:" + conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + ""; } public String tablePath(String dbName, String tableName) throws Exception { @@ -143,12 +151,12 @@ private String dbPath(String dbName) throws Exception { public void forceCreateDb(String dbName) throws Exception { try { - client.dropDatabase(dbName); - } catch (NoSuchObjectException ignored) { - // expected + getHMSClient().dropDatabase(dbName); + } catch (NoSuchObjectException e) { + System.out.println("db does not exist but its ok " + dbName); } Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>()); - client.createDatabase(db); + getHMSClient().createDatabase(db); } public void createCOWTable(String commitTime, int numberOfPartitions, String dbName, String tableName) @@ -161,7 +169,10 @@ public void createCOWTable(String commitTime, int numberOfPartitions, String dbN .setTableName(tableName) .setPayloadClass(HoodieAvroPayload.class) .initTable(conf, path.toString()); - dfsCluster.getFileSystem().mkdirs(path); + boolean result = dfsCluster.getFileSystem().mkdirs(path); + if (!result) { + throw new InitializationError("cannot initialize table"); + } ZonedDateTime dateTime = ZonedDateTime.now(); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString()); createCommitFile(commitMetadata, commitTime, path.toString()); @@ -228,7 +239,7 @@ private void generateParquetData(Path filePath, boolean isParquetSchemaSimple) try { writer.write(s); } catch (IOException e) { - fail("IOException while writing test records as parquet", e); + fail("IOException while writing test records as parquet" + e.toString()); } }); writer.close(); @@ -248,15 +259,15 @@ public void stopHiveServer2() { public void startHiveServer2() { if (server2 == null) { server2 = new HiveServer2(); - server2.init(hiveTestService.getHiveConf()); + server2.init(hiveTestService.getServerConf()); server2.start(); } } - public void shutDown() throws IOException { - Files.deleteIfExists(hiveSiteXml.toPath()); + public void shutDown() { + stopHiveServer2(); Hive.closeCurrent(); - hiveTestService.stop(); + hiveTestService.getHiveMetaStore().stop(); hdfsTestService.stop(); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index ad74235ae0c03..b4497289fd34a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -48,7 +48,6 @@ import java.util.Random; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; -import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; @@ -187,7 +186,7 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath) props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); // Hive Configs - props.setProperty(HIVE_URL.key(), HS2_JDBC_URL); + props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/"); props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1"); props.setProperty(META_SYNC_TABLE_NAME.key(), "hive_trips"); props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); @@ -247,7 +246,7 @@ protected static void populateCommonKafkaProps(TypedProperties props, String bro protected static void populateCommonHiveProps(TypedProperties props) { // Hive Configs - props.setProperty(HIVE_URL.key(), HS2_JDBC_URL); + props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/"); props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb2"); props.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "false"); props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 850b0d1d609e3..dde0e5f73fc4d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1359,7 +1359,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t // Test Hive integration HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips"); hiveSyncConfig.setValue(META_SYNC_PARTITION_FIELDS, "year,month,day"); - hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf()); + hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf()); HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig); final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME); assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index ff7d6cc2ed2db..67a002c3bac79 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.server.HiveServer2; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -196,7 +197,7 @@ public void teardown() throws Exception { */ protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) { Properties props = new Properties(); - props.setProperty(HIVE_URL.key(), hiveTestService.getJdbcHive2Url()); + props.setProperty(HIVE_URL.key(),"jdbc:hive2://127.0.0.1:9999/"); props.setProperty(HIVE_USER.key(), ""); props.setProperty(HIVE_PASS.key(), ""); props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1"); @@ -214,9 +215,11 @@ protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableN * @throws IOException */ private static void clearHiveDb() throws Exception { + HiveConf hiveConf = new HiveConf(); // Create Dummy hive sync config HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy"); - hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf()); + hiveConf.addResource(hiveServer.getHiveConf()); + hiveSyncConfig.setHadoopConf(hiveConf); HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.COPY_ON_WRITE) .setTableName(hiveSyncConfig.getString(META_SYNC_TABLE_NAME))