Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Hive connector test with HMS #14743

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.s3.HiveS3Config;
import io.trino.plugin.hive.s3.S3HiveQueryRunner;
import io.trino.testing.QueryRunner;
import org.testng.SkipException;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.OptionalInt;

import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Similar to {@link TestHiveConnectorTest} but uses Hive metastore (HMS) and MinIO
* (S3-compatible storage) instead of file metastore and local file system.
*/
public class TestHiveHmsMetastoreMinioConnectorTest
extends BaseHiveConnectorTest
{
private final String bucketName = "test-hive-metastore-minio-" + randomTableSuffix();

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName));
hiveMinioDataLake.start();
return S3HiveQueryRunner.builder(hiveMinioDataLake)
.setHiveProperties(Map.of(
// Reduce memory pressure when writing large files
"hive.s3.streaming.part-size", new HiveS3Config().getS3MultipartMinPartSize().toString()))
.setInitialTables(REQUIRED_TPCH_TABLES)
.build();
}

@Test
public void verifyTestDataSetup()
{
assertThat(computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*') FROM nation"))
.isEqualTo(format("s3a://%s/tpch/nation", bucketName));
}

@Override
protected String createSchemaSql(String schemaName)
{
return format("CREATE SCHEMA %1$s WITH (location='s3a://%2$s/%1$s')", schemaName, bucketName);
}

@Override
public void testAddColumnConcurrently()
{
// TODO (https://github.com/trinodb/trino/issues/14745) adding columns may overwrite concurrent addition of columns (or some other operations)
// because adding columns currently consist of "read table, derive new table object, persist" with no locking (pessimistic nor optimistic)
throw new SkipException("The test may or may not fail, because there is a concurrency issue");
Comment on lines +69 to +71
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

@Override
protected OptionalInt maxTableNameLength()
{
return OptionalInt.of(128);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testDropNonEmptySchemaWithTable()
}

try {
assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));
assertUpdate("CREATE TABLE " + schemaName + ".t(x int)");
assertQueryFails("DROP SCHEMA " + schemaName, ".*Cannot drop non-empty schema '\\Q" + schemaName + "\\E'");
}
Expand All @@ -239,7 +239,7 @@ public void testDropNonEmptySchemaWithView()
String schemaName = "test_drop_non_empty_schema_view_" + randomTableSuffix();

try {
assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));
assertUpdate("CREATE VIEW " + schemaName + ".v_t AS SELECT 123 x");

assertQueryFails("DROP SCHEMA " + schemaName, ".*Cannot drop non-empty schema '\\Q" + schemaName + "\\E'");
Expand All @@ -263,7 +263,7 @@ public void testDropNonEmptySchemaWithMaterializedView()
String schemaName = "test_drop_non_empty_schema_mv_" + randomTableSuffix();

try {
assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));
assertUpdate("CREATE MATERIALIZED VIEW " + schemaName + ".mv_t AS SELECT 123 x");

assertQueryFails("DROP SCHEMA " + schemaName, ".*Cannot drop non-empty schema '\\Q" + schemaName + "\\E'");
Expand Down Expand Up @@ -1281,10 +1281,11 @@ public void testRenameMaterializedView()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW));

String schema = "rename_mv_test";
String schema = "rename_mv_test_" + randomTableSuffix();
Session session = Session.builder(getSession())
.setSchema(schema)
.build();
assertUpdate(createSchemaSql(schema));

QualifiedObjectName originalMaterializedView = new QualifiedObjectName(
session.getCatalog().orElseThrow(),
Expand Down Expand Up @@ -1318,8 +1319,8 @@ public void testRenameMaterializedView()
assertUpdate(session, "ALTER MATERIALIZED VIEW " + testExistsMaterializedViewName + " RENAME TO " + uppercaseName);
assertTestingMaterializedViewQuery(schema, uppercaseName.toLowerCase(ENGLISH)); // Ensure select allows for lower-case, not delimited identifier

String otherSchema = "rename_mv_other_schema";
assertUpdate(format("CREATE SCHEMA IF NOT EXISTS %s", otherSchema));
String otherSchema = "rename_mv_other_schema_" + randomTableSuffix();
assertUpdate(createSchemaSql(otherSchema));
if (hasBehavior(SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS)) {
assertUpdate(session, "ALTER MATERIALIZED VIEW " + uppercaseName + " RENAME TO " + otherSchema + "." + originalMaterializedView.getObjectName());
assertTestingMaterializedViewQuery(otherSchema, originalMaterializedView.getObjectName());
Expand Down Expand Up @@ -1353,7 +1354,6 @@ private void assertTestingMaterializedViewQuery(String schema, String materializ

private void createTestingMaterializedView(QualifiedObjectName view, Optional<String> comment)
{
assertUpdate(format("CREATE SCHEMA IF NOT EXISTS %s", view.getSchemaName()));
assertUpdate(format(
"CREATE MATERIALIZED VIEW %s %s AS SELECT * FROM nation",
view,
Expand Down Expand Up @@ -1867,7 +1867,7 @@ public void testRenameSchema()

String schemaName = "test_rename_schema_" + randomTableSuffix();
try {
assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));
assertUpdate("ALTER SCHEMA " + schemaName + " RENAME TO " + schemaName + "_renamed");
assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(schemaName + "_renamed");
}
Expand Down Expand Up @@ -2127,7 +2127,7 @@ public void testCreateSchemaWithLongName()
.orElse(65536 + 5);

String validSchemaName = baseSchemaName + "z".repeat(maxLength - baseSchemaName.length());
assertUpdate("CREATE SCHEMA " + validSchemaName);
assertUpdate(createSchemaSql(validSchemaName));
assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(validSchemaName);
assertUpdate("DROP SCHEMA " + validSchemaName);

Expand All @@ -2136,7 +2136,7 @@ public void testCreateSchemaWithLongName()
}

String invalidSchemaName = validSchemaName + "z";
assertThatThrownBy(() -> assertUpdate("CREATE SCHEMA " + invalidSchemaName))
assertThatThrownBy(() -> assertUpdate(createSchemaSql(invalidSchemaName)))
.satisfies(this::verifySchemaNameLengthFailurePermissible);
assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).doesNotContain(invalidSchemaName);
}
Expand All @@ -2146,8 +2146,8 @@ public void testRenameSchemaToLongName()
{
skipTestUnless(hasBehavior(SUPPORTS_RENAME_SCHEMA));

String sourceTableName = "test_rename_source_" + randomTableSuffix();
assertUpdate("CREATE SCHEMA " + sourceTableName);
String sourceSchemaName = "test_rename_source_" + randomTableSuffix();
assertUpdate(createSchemaSql(sourceSchemaName));

String baseSchemaName = "test_rename_target_" + randomTableSuffix();

Expand All @@ -2156,17 +2156,17 @@ public void testRenameSchemaToLongName()
.orElse(65536 + 5);

String validTargetSchemaName = baseSchemaName + "z".repeat(maxLength - baseSchemaName.length());
assertUpdate("ALTER SCHEMA " + sourceTableName + " RENAME TO " + validTargetSchemaName);
assertUpdate("ALTER SCHEMA " + sourceSchemaName + " RENAME TO " + validTargetSchemaName);
assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(validTargetSchemaName);
assertUpdate("DROP SCHEMA " + validTargetSchemaName);

if (maxSchemaNameLength().isEmpty()) {
return;
}

assertUpdate("CREATE SCHEMA " + sourceTableName);
assertUpdate(createSchemaSql(sourceSchemaName));
String invalidTargetSchemaName = validTargetSchemaName + "z";
assertThatThrownBy(() -> assertUpdate("ALTER SCHEMA " + sourceTableName + " RENAME TO " + invalidTargetSchemaName))
assertThatThrownBy(() -> assertUpdate("ALTER SCHEMA " + sourceSchemaName + " RENAME TO " + invalidTargetSchemaName))
.satisfies(this::verifySchemaNameLengthFailurePermissible);
assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).doesNotContain(invalidTargetSchemaName);
}
Expand Down Expand Up @@ -2395,14 +2395,18 @@ public void testCreateTableWithColumnComment()
@Test
public void testCreateTableSchemaNotFound()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE));

String schemaName = "test_schema_" + randomTableSuffix();
String tableName = "test_create_no_schema_" + randomTableSuffix();
try {
assertQueryFails(
format("CREATE TABLE %s.%s (a bigint)", schemaName, tableName),
format("Schema %s not found", schemaName));
String createSql = format("CREATE TABLE %s.%s (a bigint)", schemaName, tableName);
if (!hasBehavior(SUPPORTS_CREATE_TABLE)) {
assertQueryFails(createSql, "This connector does not support creating tables");
}
else {
assertQueryFails(createSql, format("Schema %s not found", schemaName));
}
// Validate that table, nor schema, was not created
assertQueryFails(format("TABLE %s.%s", schemaName, tableName), "line 1:1: Schema '" + schemaName + "' does not exist");
}
finally {
assertUpdate(format("DROP TABLE IF EXISTS %s.%s", schemaName, tableName));
Expand Down Expand Up @@ -2647,7 +2651,7 @@ public void testRenameTableAcrossSchema()
assertUpdate("CREATE TABLE " + tableName + " AS SELECT 123 x", 1);

String schemaName = "test_schema_" + randomTableSuffix();
assertUpdate("CREATE SCHEMA " + schemaName);
assertUpdate(createSchemaSql(schemaName));

String renamedTable = "test_rename_new_" + randomTableSuffix();
try {
Expand Down