Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small cleanup in BaseHiveConnectorTest / TestHiveConnectorTest #18669

Merged
merged 6 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,6 @@ protected BaseHiveConnectorTest()
this.bucketedSession = createBucketedSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin"))));
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return createHiveQueryRunner(HiveQueryRunner.builder());
}

protected static QueryRunner createHiveQueryRunner(HiveQueryRunner.Builder<?> builder)
throws Exception
{
Expand Down Expand Up @@ -2136,18 +2129,17 @@ public void testCreateTableNonSupportedVarcharColumn()
@Test
public void testEmptyBucketedTable()
{
// create empty bucket files for all storage formats and compression codecs
for (HiveStorageFormat storageFormat : HiveStorageFormat.values()) {
if (storageFormat == REGEX) {
// REGEX format is readonly
continue;
}
testEmptyBucketedTable(storageFormat, true);
testEmptyBucketedTable(storageFormat, false);
}
// go through all storage formats to make sure the empty buckets are correctly created
testWithAllStorageFormats(this::testEmptyBucketedTable);
}

private void testEmptyBucketedTable(HiveStorageFormat storageFormat, boolean createEmpty)
private void testEmptyBucketedTable(Session session, HiveStorageFormat storageFormat)
{
testEmptyBucketedTable(session, storageFormat, true);
testEmptyBucketedTable(session, storageFormat, false);
}

private void testEmptyBucketedTable(Session baseSession, HiveStorageFormat storageFormat, boolean createEmpty)
{
String tableName = "test_empty_bucketed_table";

Expand All @@ -2172,7 +2164,7 @@ private void testEmptyBucketedTable(HiveStorageFormat storageFormat, boolean cre
assertEquals(computeActual("SELECT * from " + tableName).getRowCount(), 0);

// make sure that we will get one file per bucket regardless of writer count configured
Session session = Session.builder(getSession())
Session session = Session.builder(baseSession)
.setSystemProperty("task_writer_count", "4")
.setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty))
.build();
Expand Down Expand Up @@ -2219,7 +2211,7 @@ private void testBucketedTable(Session session, HiveStorageFormat storageFormat,
") t (bucket_key, col_1, col_2)";

// make sure that we will get one file per bucket regardless of writer count configured
Session parallelWriter = Session.builder(getParallelWriteSession())
Session parallelWriter = Session.builder(getParallelWriteSession(session))
.setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty))
.build();
assertUpdate(parallelWriter, createTable, 3);
Expand Down Expand Up @@ -2448,7 +2440,7 @@ private void testCreatePartitionedBucketedTableAsFewRows(Session session, HiveSt

assertUpdate(
// make sure that we will get one file per bucket regardless of writer count configured
Session.builder(getParallelWriteSession())
Session.builder(getParallelWriteSession(session))
.setCatalogSessionProperty(catalog, "create_empty_bucket_files", String.valueOf(createEmpty))
.build(),
createTable,
Expand Down Expand Up @@ -2484,7 +2476,7 @@ private void testCreatePartitionedBucketedTableAs(HiveStorageFormat storageForma

assertUpdate(
// make sure that we will get one file per bucket regardless of writer count configured
getParallelWriteSession(),
getParallelWriteSession(getSession()),
createTable,
"SELECT count(*) FROM orders");

Expand Down Expand Up @@ -2517,7 +2509,7 @@ private void testCreatePartitionedBucketedTableWithNullsAs(HiveStorageFormat sto
"FROM tpch.tiny.orders";

assertUpdate(
getParallelWriteSession(),
getParallelWriteSession(getSession()),
createTable,
"SELECT count(*) FROM orders");

Expand Down Expand Up @@ -2609,11 +2601,12 @@ private void testInsertIntoPartitionedBucketedTableFromBucketedTable(HiveStorage
"SELECT custkey, comment, orderstatus " +
"FROM tpch.tiny.orders";

assertUpdate(getParallelWriteSession(), createSourceTable, "SELECT count(*) FROM orders");
assertUpdate(getParallelWriteSession(), createTargetTable, "SELECT count(*) FROM orders");
Session session = getParallelWriteSession(getSession());
assertUpdate(session, createSourceTable, "SELECT count(*) FROM orders");
assertUpdate(session, createTargetTable, "SELECT count(*) FROM orders");

transaction(getQueryRunner().getTransactionManager(), getQueryRunner().getAccessControl()).execute(
getParallelWriteSession(),
session,
transactionalSession -> {
assertUpdate(
transactionalSession,
Expand Down Expand Up @@ -2658,7 +2651,7 @@ private void testCreatePartitionedBucketedTableAsWithUnionAll(HiveStorageFormat

assertUpdate(
// make sure that we will get one file per bucket regardless of writer count configured
getParallelWriteSession(),
getParallelWriteSession(getSession()),
createTable,
"SELECT count(*) FROM orders");

Expand Down Expand Up @@ -2822,7 +2815,7 @@ private void testInsertPartitionedBucketedTableFewRows(Session session, HiveStor

assertUpdate(
// make sure that we will get one file per bucket regardless of writer count configured
getParallelWriteSession(),
getParallelWriteSession(session),
"INSERT INTO " + tableName + " " +
"VALUES " +
" (VARCHAR 'a', VARCHAR 'b', VARCHAR 'c'), " +
Expand Down Expand Up @@ -2967,29 +2960,28 @@ public void testRegisterPartitionWithNullArgument()
@Test
public void testCreateEmptyBucketedPartition()
{
for (TestingHiveStorageFormat storageFormat : getAllTestingHiveStorageFormat()) {
testCreateEmptyBucketedPartition(storageFormat.getFormat());
}
testWithAllStorageFormats(this::testCreateEmptyBucketedPartition);
}

private void testCreateEmptyBucketedPartition(HiveStorageFormat storageFormat)
private void testCreateEmptyBucketedPartition(Session session, HiveStorageFormat storageFormat)
{
String tableName = "test_insert_empty_partitioned_bucketed_table";
createPartitionedBucketedTable(tableName, storageFormat);
createPartitionedBucketedTable(session, tableName, storageFormat);

List<String> orderStatusList = ImmutableList.of("F", "O", "P");
for (int i = 0; i < orderStatusList.size(); i++) {
String sql = format("CALL system.create_empty_partition('%s', '%s', ARRAY['orderstatus'], ARRAY['%s'])", TPCH_SCHEMA, tableName, orderStatusList.get(i));
assertUpdate(sql);
assertUpdate(session, sql);
assertQuery(
session,
format("SELECT count(*) FROM \"%s$partitions\"", tableName),
"SELECT " + (i + 1));

assertQueryFails(sql, "Partition already exists.*");
assertQueryFails(session, sql, "Partition already exists.*");
}

assertUpdate("DROP TABLE " + tableName);
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
assertUpdate(session, "DROP TABLE " + tableName);
assertFalse(getQueryRunner().tableExists(session, tableName));
}

@Test
Expand Down Expand Up @@ -3018,14 +3010,14 @@ public void testInsertPartitionedBucketedTable()
private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat)
{
String tableName = "test_insert_partitioned_bucketed_table";
createPartitionedBucketedTable(tableName, storageFormat);
createPartitionedBucketedTable(getSession(), tableName, storageFormat);

List<String> orderStatusList = ImmutableList.of("F", "O", "P");
for (int i = 0; i < orderStatusList.size(); i++) {
String orderStatus = orderStatusList.get(i);
assertUpdate(
// make sure that we will get one file per bucket regardless of writer count configured
getParallelWriteSession(),
getParallelWriteSession(getSession()),
format(
"INSERT INTO " + tableName + " " +
"SELECT custkey, custkey AS custkey2, comment, orderstatus " +
Expand All @@ -3041,19 +3033,20 @@ private void testInsertPartitionedBucketedTable(HiveStorageFormat storageFormat)
assertFalse(getQueryRunner().tableExists(getSession(), tableName));
}

private void createPartitionedBucketedTable(String tableName, HiveStorageFormat storageFormat)
private void createPartitionedBucketedTable(Session session, String tableName, HiveStorageFormat storageFormat)
{
assertUpdate("" +
assertUpdate(
session,
"CREATE TABLE " + tableName + " (" +
" custkey bigint," +
" custkey2 bigint," +
" comment varchar," +
" orderstatus varchar)" +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'orderstatus' ], " +
"bucketed_by = ARRAY[ 'custkey', 'custkey2' ], " +
"bucket_count = 11)");
" custkey bigint," +
" custkey2 bigint," +
" comment varchar," +
" orderstatus varchar)" +
"WITH (" +
"format = '" + storageFormat + "', " +
"partitioned_by = ARRAY[ 'orderstatus' ], " +
"bucketed_by = ARRAY[ 'custkey', 'custkey2' ], " +
"bucket_count = 11)");
}

@Test
Expand Down Expand Up @@ -3083,7 +3076,7 @@ private void testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat st
String orderStatus = orderStatusList.get(i);
assertUpdate(
// make sure that we will get one file per bucket regardless of writer count configured
getParallelWriteSession(),
getParallelWriteSession(getSession()),
format(
"INSERT INTO " + tableName + " " +
"SELECT custkey, custkey AS custkey2, comment, orderstatus " +
Expand All @@ -3107,7 +3100,7 @@ private void testInsertPartitionedBucketedTableWithUnionAll(HiveStorageFormat st
public void testInsertTwiceToSamePartitionedBucket()
{
String tableName = "test_insert_twice_to_same_partitioned_bucket";
createPartitionedBucketedTable(tableName, HiveStorageFormat.RCBINARY);
createPartitionedBucketedTable(getSession(), tableName, HiveStorageFormat.RCBINARY);

String insert = "INSERT INTO " + tableName +
" VALUES (1, 1, 'first_comment', 'F'), (2, 2, 'second_comment', 'G')";
Expand Down Expand Up @@ -5528,54 +5521,52 @@ public void testBucketFilteringByInPredicate()
}

@Test
public void schemaMismatchesWithDereferenceProjections()
public void testSchemaMismatchesWithDereferenceProjections()
{
for (TestingHiveStorageFormat format : getAllTestingHiveStorageFormat()) {
schemaMismatchesWithDereferenceProjections(format.getFormat());
}
testWithAllStorageFormats(this::testSchemaMismatchesWithDereferenceProjections);
}

private void schemaMismatchesWithDereferenceProjections(HiveStorageFormat format)
private void testSchemaMismatchesWithDereferenceProjections(Session session, HiveStorageFormat format)
{
// Verify reordering of subfields between a partition column and a table column is not supported
// eg. table column: a row(c varchar, b bigint), partition column: a row(b bigint, c varchar)
try {
assertUpdate("CREATE TABLE evolve_test (dummy bigint, a row(b bigint, c varchar), d bigint) with (format = '" + format + "', partitioned_by=array['d'])");
assertUpdate("INSERT INTO evolve_test values (10, row(1, 'abc'), 1)", 1);
assertUpdate("ALTER TABLE evolve_test DROP COLUMN a");
assertUpdate("ALTER TABLE evolve_test ADD COLUMN a row(c varchar, b bigint)");
assertUpdate("INSERT INTO evolve_test values (20, row('def', 2), 2)", 1);
assertQueryFails("SELECT a.b FROM evolve_test where d = 1", ".*There is a mismatch between the table and partition schemas.*");
assertUpdate(session, "CREATE TABLE evolve_test (dummy bigint, a row(b bigint, c varchar), d bigint) with (format = '" + format + "', partitioned_by=array['d'])");
assertUpdate(session, "INSERT INTO evolve_test values (10, row(1, 'abc'), 1)", 1);
assertUpdate(session, "ALTER TABLE evolve_test DROP COLUMN a");
assertUpdate(session, "ALTER TABLE evolve_test ADD COLUMN a row(c varchar, b bigint)");
assertUpdate(session, "INSERT INTO evolve_test values (20, row('def', 2), 2)", 1);
assertQueryFails(session, "SELECT a.b FROM evolve_test where d = 1", ".*There is a mismatch between the table and partition schemas.*");
}
finally {
assertUpdate("DROP TABLE IF EXISTS evolve_test");
assertUpdate(session, "DROP TABLE IF EXISTS evolve_test");
}

// Subfield absent in partition schema is reported as null
// i.e. "a.c" produces null for rows that were inserted before type of "a" was changed
try {
assertUpdate("CREATE TABLE evolve_test (dummy bigint, a row(b bigint), d bigint) with (format = '" + format + "', partitioned_by=array['d'])");
assertUpdate("INSERT INTO evolve_test values (10, row(1), 1)", 1);
assertUpdate("ALTER TABLE evolve_test DROP COLUMN a");
assertUpdate("ALTER TABLE evolve_test ADD COLUMN a row(b bigint, c varchar)");
assertUpdate("INSERT INTO evolve_test values (20, row(2, 'def'), 2)", 1);
assertQuery("SELECT a.c FROM evolve_test", "SELECT 'def' UNION SELECT null");
assertUpdate(session, "CREATE TABLE evolve_test (dummy bigint, a row(b bigint), d bigint) with (format = '" + format + "', partitioned_by=array['d'])");
assertUpdate(session, "INSERT INTO evolve_test values (10, row(1), 1)", 1);
assertUpdate(session, "ALTER TABLE evolve_test DROP COLUMN a");
assertUpdate(session, "ALTER TABLE evolve_test ADD COLUMN a row(b bigint, c varchar)");
assertUpdate(session, "INSERT INTO evolve_test values (20, row(2, 'def'), 2)", 1);
assertQuery(session, "SELECT a.c FROM evolve_test", "SELECT 'def' UNION SELECT null");
}
finally {
assertUpdate("DROP TABLE IF EXISTS evolve_test");
assertUpdate(session, "DROP TABLE IF EXISTS evolve_test");
}

// Verify field access when the row evolves without changes to field type
try {
assertUpdate("CREATE TABLE evolve_test (dummy bigint, a row(b bigint, c varchar), d bigint) with (format = '" + format + "', partitioned_by=array['d'])");
assertUpdate("INSERT INTO evolve_test values (10, row(1, 'abc'), 1)", 1);
assertUpdate("ALTER TABLE evolve_test DROP COLUMN a");
assertUpdate("ALTER TABLE evolve_test ADD COLUMN a row(b bigint, c varchar, e int)");
assertUpdate("INSERT INTO evolve_test values (20, row(2, 'def', 2), 2)", 1);
assertQuery("SELECT a.b FROM evolve_test", "VALUES 1, 2");
assertUpdate(session, "CREATE TABLE evolve_test (dummy bigint, a row(b bigint, c varchar), d bigint) with (format = '" + format + "', partitioned_by=array['d'])");
assertUpdate(session, "INSERT INTO evolve_test values (10, row(1, 'abc'), 1)", 1);
assertUpdate(session, "ALTER TABLE evolve_test DROP COLUMN a");
assertUpdate(session, "ALTER TABLE evolve_test ADD COLUMN a row(b bigint, c varchar, e int)");
assertUpdate(session, "INSERT INTO evolve_test values (20, row(2, 'def', 2), 2)", 1);
assertQuery(session, "SELECT a.b FROM evolve_test", "VALUES 1, 2");
}
finally {
assertUpdate("DROP TABLE IF EXISTS evolve_test");
assertUpdate(session, "DROP TABLE IF EXISTS evolve_test");
}
}

Expand Down Expand Up @@ -8716,9 +8707,9 @@ public Object[][] legalUseColumnNamesProvider()
};
}

private Session getParallelWriteSession()
private Session getParallelWriteSession(Session baseSession)
{
return Session.builder(getSession())
return Session.builder(baseSession)
.setSystemProperty("task_writer_count", "4")
.setSystemProperty("task_partitioned_writer_count", "4")
.setSystemProperty("task_scale_writers_enabled", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@
*/
package io.trino.plugin.hive;

import io.trino.testing.QueryRunner;

public class TestHiveConnectorTest
extends BaseHiveConnectorTest
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return createHiveQueryRunner(HiveQueryRunner.builder());
}
}