Skip to content

Commit

Permalink
fix ut resource clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jun 7, 2022
1 parent 69894b3 commit b2e05af
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.util.ConfigUtils;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
Expand Down Expand Up @@ -60,7 +61,6 @@
import java.util.stream.Collectors;

import static org.apache.hudi.hive.testutils.HiveTestUtil.basePath;
import static org.apache.hudi.hive.testutils.HiveTestUtil.ddlExecutor;
import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem;
import static org.apache.hudi.hive.testutils.HiveTestUtil.getHiveConf;
import static org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncProps;
Expand Down Expand Up @@ -94,9 +94,10 @@ private static Iterable<Object[]> syncModeAndSchemaFromCommitMetadata() {

private HiveSyncTool hiveSyncTool;
private HoodieHiveClient hiveClient;
private QueryBasedDDLExecutor ddlExecutor;

@AfterAll
public static void cleanUpClass() {
public static void cleanUpClass() throws Exception {
HiveTestUtil.shutdown();
}

Expand All @@ -122,11 +123,18 @@ private static Iterable<Object[]> syncDataSourceTableParams() {
@BeforeEach
public void setUp() throws Exception {
HiveTestUtil.setUp();
reinitHiveSyncClient();
}

@AfterEach
public void teardown() throws Exception {
HiveTestUtil.clear();
if (ddlExecutor != null) {
ddlExecutor.close();
}
if (hiveClient != null) {
hiveClient.close();
}
}

@ParameterizedTest
Expand Down Expand Up @@ -526,7 +534,7 @@ public void testUpdateTableComments(String syncMode) throws Exception {
}
}

ddlExecutor.updateTableComments(HiveTestUtil.TABLE_NAME, alterCommentSchema);
hiveClient.ddlExecutor.updateTableComments(HiveTestUtil.TABLE_NAME, alterCommentSchema);

List<FieldSchema> fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(HiveTestUtil.TABLE_NAME);
int commentCnt = 0;
Expand Down Expand Up @@ -1129,16 +1137,17 @@ public void testSyncWithoutDiffs(String syncMode) throws Exception {
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());
}

private void reSyncHiveTable() {
private void reSyncHiveTable() throws HiveException, MetaException {
hiveSyncTool.syncHoodieTable();
// we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive
// session, then lead to connection retry, we can see there is a exception at log.
reinitHiveSyncClient();
}

private void reinitHiveSyncClient() {
private void reinitHiveSyncClient() throws HiveException, MetaException {
hiveSyncTool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf(), fileSystem);
hiveClient = (HoodieHiveClient) hiveSyncTool.hoodieHiveClient;
ddlExecutor = HiveTestUtil.getQueryBasedDDLExecutor();
}

private int getPartitionFieldSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public class HiveTestUtil {
public static TypedProperties hiveSyncProps;
public static HiveTestService hiveTestService;
public static FileSystem fileSystem;
public static QueryBasedDDLExecutor ddlExecutor;

private static ZooKeeperServer zkServer;
private static HiveServer2 hiveServer;
Expand All @@ -105,7 +104,7 @@ public class HiveTestUtil {
private static DateTimeFormatter dtfOut;
private static Set<String> createdTablesSet = new HashSet<>();

public static void setUp() throws IOException, InterruptedException, HiveException, MetaException {
public static void setUp() throws Exception {
configuration = new Configuration();
if (zkServer == null) {
zkService = new ZookeeperTestService(configuration);
Expand All @@ -115,7 +114,9 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveTestService = new HiveTestService(configuration);
hiveServer = hiveTestService.start();
}
fileSystem = FileSystem.get(configuration);
if (fileSystem == null) {
fileSystem = FileSystem.get(configuration);
}

basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();

Expand All @@ -134,39 +135,46 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveSyncConfig = new HiveSyncConfig(hiveSyncProps);

dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, getHiveConf());

clear();
}

public static void clearIncrementalPullSetup(String path1, String path2) throws IOException, HiveException, MetaException {
public static void clearIncrementalPullSetup(String path1, String path2) throws Exception {
fileSystem.delete(new Path(path1), true);
if (path2 != null) {
fileSystem.delete(new Path(path2), true);
}
clear();
}

public static void clear() throws IOException, HiveException, MetaException {
public static void clear() throws Exception {
fileSystem.delete(new Path(basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(TABLE_NAME)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, basePath);

for (String tableName : createdTablesSet) {
ddlExecutor.runSQL("drop table if exists " + tableName);
try (QueryBasedDDLExecutor ddlExecutor = getQueryBasedDDLExecutor()) {
for (String tableName : createdTablesSet) {
ddlExecutor.runSQL("drop table if exists " + tableName);
}
createdTablesSet.clear();
ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade");
}
createdTablesSet.clear();
ddlExecutor.runSQL("drop database if exists " + DB_NAME + " cascade");
}

public static QueryBasedDDLExecutor getQueryBasedDDLExecutor() throws HiveException, MetaException {
HiveConf hiveConf = new HiveConf(configuration, HiveConf.class);
hiveConf.addResource(fileSystem.getConf());
return new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, hiveConf);
}

public static HiveConf getHiveConf() {
return hiveServer.getHiveConf();
}

public static void shutdown() {
public static void shutdown() throws Exception {
if (hiveServer != null) {
hiveServer.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

package org.apache.hudi.utilities;

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -52,7 +50,7 @@ public class TestHiveIncrementalPuller {
private String targetBasePath = null;

@BeforeEach
public void setup() throws HiveException, IOException, InterruptedException, MetaException {
public void setup() throws Exception {
config = new HiveIncrementalPuller.Config();
HiveTestUtil.setUp();
}
Expand Down

0 comments on commit b2e05af

Please sign in to comment.