Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[HUDI-4437] Fix CI issue with TestHiveSyncTool" #6192

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ stages:
- stage: test
jobs:
- job: UT_FT_1
condition: false
displayName: UT FT common & flink & UT client/spark-client
timeoutInMinutes: '120'
steps:
Expand Down Expand Up @@ -119,7 +118,6 @@ stages:
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- job: UT_FT_2
condition: false
displayName: FT client/spark-client
timeoutInMinutes: '120'
steps:
Expand Down Expand Up @@ -171,7 +169,6 @@ stages:
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- job: UT_FT_4
condition: false
displayName: UT FT other modules
timeoutInMinutes: '120'
steps:
Expand Down Expand Up @@ -202,7 +199,6 @@ stages:
jdkVersionOption: '1.8'
mavenOptions: '-Xmx4g'
- job: IT
condition: false
displayName: IT modules
timeoutInMinutes: '120'
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.stream.Stream;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
Expand Down Expand Up @@ -181,7 +180,7 @@ private static TypedProperties getProperties() {
// Make path selection test suite specific
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
// Hive Configs
props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
props.setProperty(META_SYNC_TABLE_NAME.key(), "table1");
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public HiveSyncConfig(Properties props) {

public HiveSyncConfig(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
HiveConf hiveConf = hadoopConf instanceof HiveConf
? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class);
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
hiveConf.addResource(getHadoopFileSystem().getConf());
setHadoopConf(hiveConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.hive.replication;

import org.apache.hudi.hive.testutils.HiveTestCluster;
import org.apache.hudi.hive.testutils.TestCluster;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -53,9 +53,9 @@
public class TestHiveSyncGlobalCommitTool {

@RegisterExtension
public static HiveTestCluster localCluster = new HiveTestCluster();
public static TestCluster localCluster = new TestCluster();
@RegisterExtension
public static HiveTestCluster remoteCluster = new HiveTestCluster();
public static TestCluster remoteCluster = new TestCluster();

private static final String DB_NAME = "foo";
private static final String TBL_NAME = "bar";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.hive.testutils;

import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -62,40 +62,71 @@
public class HiveTestService {

private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
private static final int CONNECTION_TIMEOUT_MS = 30000;
private static final String BIND_HOST = "127.0.0.1";
private static final int HS2_THRIFT_PORT = 9999;
public static final String HS2_JDBC_URL = String.format("jdbc:hive2://%s:%s/", BIND_HOST, HS2_THRIFT_PORT);

private final Configuration hadoopConf;
private final String workDir;
private final Map<String, String> sysProps = new HashMap<>();

private static final int CONNECTION_TIMEOUT = 30000;

/**
* Configuration settings.
*/
private Configuration hadoopConf;
private String workDir;
private String bindIP = "127.0.0.1";
private int metastorePort = 9083;
private int serverPort = 9999;
private boolean clean = true;

private Map<String, String> sysProps = new HashMap<>();
private ExecutorService executorService;
private TServer tServer;
private HiveServer2 hiveServer;
private HiveConf hiveConf;
private HiveConf serverConf;

public HiveTestService(Configuration hadoopConf) throws IOException {
this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
this.hadoopConf = hadoopConf;
}

public Configuration getHadoopConf() {
return hadoopConf;
}

public TServer getHiveMetaStore() {
return tServer;
}

public HiveConf getServerConf() {
return serverConf;
}

public HiveServer2 start() throws IOException {
Objects.requireNonNull(workDir, "The work dir must be set before starting cluster.");

if (hadoopConf == null) {
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
}

String localHiveLocation = getHiveLocation(workDir);
LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
File file = new File(localHiveLocation);
FileIOUtils.deleteDirectory(file);
if (clean) {
LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
File file = new File(localHiveLocation);
FileIOUtils.deleteDirectory(file);
}

hiveConf = configureHive(hadoopConf, localHiveLocation);
serverConf = configureHive(hadoopConf, localHiveLocation);

executorService = Executors.newSingleThreadExecutor();
tServer = startMetaStore(hiveConf);
tServer = startMetaStore(bindIP, serverConf);

hiveServer = startHiveServer(hiveConf);
serverConf.set("hive.in.test", "true");
hiveServer = startHiveServer(serverConf);

if (!waitForServerUp(hiveConf)) {
String serverHostname;
if (bindIP.equals("0.0.0.0")) {
serverHostname = "localhost";
} else {
serverHostname = bindIP;
}
if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}

Expand Down Expand Up @@ -125,69 +156,76 @@ public void stop() {
LOG.info("Hive Minicluster service shut down.");
tServer = null;
hiveServer = null;
hadoopConf = null;
}

public HiveServer2 getHiveServer() {
return hiveServer;
}

public HiveConf getHiveConf() {
return hiveConf;
}

public int getHiveServerPort() {
return hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
return serverPort;
}

public String getJdbcHive2Url() {
return String.format("jdbc:hive2://%s:%s/",
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST), hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT));
return String.format("jdbc:hive2://%s:%s/default", bindIP, serverPort);
}

public HiveConf configureHive(Configuration hadoopConf, String localHiveLocation) throws IOException {
hadoopConf.set("hive.metastore.local", "false");
hadoopConf.set("datanucleus.schema.autoCreateTables", "true");
hadoopConf.set("datanucleus.autoCreateSchema", "true");
hadoopConf.set("datanucleus.fixedDatastore", "false");
HiveConf conf = new HiveConf(hadoopConf, HiveConf.class);
conf.setBoolVar(ConfVars.HIVE_IN_TEST, true);
conf.setBoolVar(ConfVars.METASTORE_SCHEMA_VERIFICATION, false);
conf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, HS2_THRIFT_PORT);
conf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, BIND_HOST);
final int metastoreServerPort = NetworkTestUtils.nextFreePort();
conf.setIntVar(ConfVars.METASTORE_SERVER_PORT, metastoreServerPort);
conf.setVar(ConfVars.METASTOREURIS, "thrift://" + BIND_HOST + ":" + metastoreServerPort);
public HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException {
conf.set("hive.metastore.local", "false");
int port = metastorePort;
if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) == null) {
conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
} else {
port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
}
if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) == null) {
conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
}
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + port);
conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
// The following line to turn of SASL has no effect since HiveAuthFactory calls
// 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657,
// in Hive 0.14.
// As a workaround, the property is set in hive-site.xml in this module.
// conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL");
File localHiveDir = new File(localHiveLocation);
localHiveDir.mkdirs();
File metastoreDbDir = new File(localHiveDir, "metastore_db");
conf.setVar(ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
"jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
File derbyLogFile = new File(localHiveDir, "derby.log");
derbyLogFile.createNewFile();
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
setSystemProperty("derby.system.home", localHiveDir.getAbsolutePath());
File metastoreWarehouseDir = new File(localHiveDir, "warehouse");
metastoreWarehouseDir.mkdir();
conf.setVar(ConfVars.METASTOREWAREHOUSE, metastoreWarehouseDir.getAbsolutePath());
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath());
conf.set("datanucleus.schema.autoCreateTables", "true");
conf.set("hive.metastore.schema.verification", "false");
conf.set("datanucleus.autoCreateSchema", "true");
conf.set("datanucleus.fixedDatastore", "false");
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());

return conf;
return new HiveConf(conf, this.getClass());
}

private boolean waitForServerUp(HiveConf serverConf) {
LOG.info("waiting for " + serverConf.getVar(ConfVars.METASTOREURIS));
final long start = System.currentTimeMillis();
private boolean waitForServerUp(HiveConf serverConf, String hostname, int timeout) {
long start = System.currentTimeMillis();
int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
while (true) {
try {
new HiveMetaStoreClient(serverConf);
return true;
} catch (MetaException e) {
// ignore as this is expected
LOG.info("server " + hostname + ":" + port + " not up " + e);
}

if (System.currentTimeMillis() > start + CONNECTION_TIMEOUT_MS) {
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(CONNECTION_TIMEOUT_MS / 10);
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
Expand Down Expand Up @@ -269,31 +307,36 @@ protected TSocket acceptImpl() throws TTransportException {
}
}

private TServer startMetaStore(HiveConf conf) throws IOException {
public TServer startMetaStore(String forceBindIP, HiveConf conf) throws IOException {
try {
// Server will create new threads up to max as necessary. After an idle
// period, it will destory threads to keep the number of threads in the
// pool to min.
String host = conf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
int port = conf.getIntVar(ConfVars.METASTORE_SERVER_PORT);
int minWorkerThreads = conf.getIntVar(ConfVars.METASTORESERVERMINTHREADS);
int maxWorkerThreads = conf.getIntVar(ConfVars.METASTORESERVERMAXTHREADS);
boolean tcpKeepAlive = conf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE);
boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
boolean useFramedTransport = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);

// don't support SASL yet
// boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
// boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);

TServerTransport serverTransport;
if (forceBindIP != null) {
InetSocketAddress address = new InetSocketAddress(forceBindIP, port);
serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);

InetSocketAddress address = new InetSocketAddress(host, port);
TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);
} else {
serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port);
}

TProcessor processor;
TTransportFactory transFactory;

HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, false);
IHMSHandler handler = RetryingHMSHandler.getProxy(conf, baseHandler, true);

if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) {
if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
transFactory = useFramedTransport
? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory())
: new TUGIContainingTransport.Factory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveTestService = new HiveTestService(configuration);
hiveServer = hiveTestService.start();
}
fileSystem = FileSystem.get(configuration);

basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();

Expand All @@ -140,8 +141,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3");

hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, hiveTestService.getHiveConf());
fileSystem = hiveSyncConfig.getHadoopFileSystem();
hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, configuration);

dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig);
Expand Down
Loading