Skip to content

Commit

Permalink
Test getting Hive partitions by a filter
Browse files Browse the repository at this point in the history
Including nullable values.
  • Loading branch information
findepi authored and ebyhr committed Jul 20, 2022
1 parent effccfc commit f01510d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreConfig;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig;
import io.trino.plugin.hive.orc.OrcPageSource;
import io.trino.plugin.hive.parquet.ParquetPageSource;
import io.trino.plugin.hive.rcfile.RcFilePageSource;
Expand Down Expand Up @@ -615,6 +616,7 @@ private static RowType toRowType(List<ColumnMetadata> columns)
protected String database;
protected SchemaTableName tablePartitionFormat;
protected SchemaTableName tableUnpartitioned;
protected SchemaTableName tablePartitionedWithNull;
protected SchemaTableName tableOffline;
protected SchemaTableName tableOfflinePartition;
protected SchemaTableName tableNotReadable;
Expand All @@ -634,6 +636,8 @@ private static RowType toRowType(List<ColumnMetadata> columns)
protected ColumnHandle dummyColumn;
protected ColumnHandle intColumn;
protected ColumnHandle invalidColumnHandle;
protected ColumnHandle pStringColumn;
protected ColumnHandle pIntegerColumn;

protected ConnectorTableProperties tablePartitionFormatProperties;
protected ConnectorTableProperties tableUnpartitionedProperties;
Expand Down Expand Up @@ -692,6 +696,7 @@ protected void setupHive(String databaseName)
database = databaseName;
tablePartitionFormat = new SchemaTableName(database, "trino_test_partition_format");
tableUnpartitioned = new SchemaTableName(database, "trino_test_unpartitioned");
tablePartitionedWithNull = new SchemaTableName(database, "trino_test_partitioned_with_null");
tableOffline = new SchemaTableName(database, "trino_test_offline");
tableOfflinePartition = new SchemaTableName(database, "trino_test_offline_partition");
tableNotReadable = new SchemaTableName(database, "trino_test_not_readable");
Expand All @@ -711,6 +716,8 @@ protected void setupHive(String databaseName)
dummyColumn = createBaseColumn("dummy", -1, HIVE_INT, INTEGER, PARTITION_KEY, Optional.empty());
intColumn = createBaseColumn("t_int", -1, HIVE_INT, INTEGER, PARTITION_KEY, Optional.empty());
invalidColumnHandle = createBaseColumn(INVALID_COLUMN, 0, HIVE_STRING, VARCHAR, REGULAR, Optional.empty());
pStringColumn = createBaseColumn("p_string", -1, HIVE_STRING, VARCHAR, PARTITION_KEY, Optional.empty());
pIntegerColumn = createBaseColumn("p_integer", -1, HIVE_INT, INTEGER, PARTITION_KEY, Optional.empty());

List<ColumnHandle> partitionColumns = ImmutableList.of(dsColumn, fileFormatColumn, dummyColumn);
tablePartitionFormatPartitions = ImmutableList.<HivePartition>builder()
Expand Down Expand Up @@ -783,6 +790,8 @@ protected final void setup(String host, int port, String databaseName, String ti
new BridgingHiveMetastore(testingThriftHiveMetastoreBuilder()
.metastoreClient(HostAndPort.fromParts(host, port))
.hiveConfig(hiveConfig)
.thriftMetastoreConfig(new ThriftMetastoreConfig()
.setAssumeCanonicalPartitionKeys(true))
.hdfsEnvironment(hdfsEnvironment)
.build()),
executor,
Expand Down Expand Up @@ -1114,6 +1123,80 @@ public void testGetPartitionsWithBindings()
}
}

@Test
public void testGetPartitionsWithFilter()
{
try (Transaction transaction = newTransaction()) {
ConnectorMetadata metadata = transaction.getMetadata();
ConnectorTableHandle tableHandle = getTableHandle(metadata, tablePartitionedWithNull);

Domain varcharSomeValue = Domain.singleValue(VARCHAR, utf8Slice("abc"));
Domain varcharOnlyNull = Domain.onlyNull(VARCHAR);
Domain varcharNotNull = Domain.notNull(VARCHAR);

Domain integerSomeValue = Domain.singleValue(INTEGER, 123L);
Domain integerOnlyNull = Domain.onlyNull(INTEGER);
Domain integerNotNull = Domain.notNull(INTEGER);

// all
assertThat(getPartitionNamesByFilter(metadata, tableHandle, new Constraint(TupleDomain.all())))
.containsOnly(
"p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__",
"p_string=abc/p_integer=123",
"p_string=def/p_integer=456");

// is some value
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pStringColumn, varcharSomeValue))
.containsOnly("p_string=abc/p_integer=123");
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pIntegerColumn, integerSomeValue))
.containsOnly("p_string=abc/p_integer=123");

// IS NULL
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pStringColumn, varcharOnlyNull))
.containsOnly("p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__");
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pIntegerColumn, integerOnlyNull))
.containsOnly("p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__");

// IS NOT NULL
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pStringColumn, varcharNotNull))
.containsOnly("p_string=abc/p_integer=123", "p_string=def/p_integer=456");
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pIntegerColumn, integerNotNull))
.containsOnly("p_string=abc/p_integer=123", "p_string=def/p_integer=456");

// IS NULL OR is some value
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pStringColumn, varcharOnlyNull.union(varcharSomeValue)))
.containsOnly("p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__", "p_string=abc/p_integer=123");
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pIntegerColumn, integerOnlyNull.union(integerSomeValue)))
.containsOnly("p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__", "p_string=abc/p_integer=123");

// IS NOT NULL AND is NOT some value
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pStringColumn, varcharSomeValue.complement().intersect(varcharNotNull)))
.containsOnly("p_string=def/p_integer=456");
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pIntegerColumn, integerSomeValue.complement().intersect(integerNotNull)))
.containsOnly("p_string=def/p_integer=456");

// IS NULL OR is NOT some value
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pStringColumn, varcharSomeValue.complement()))
.containsOnly("p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__", "p_string=def/p_integer=456");
assertThat(getPartitionNamesByFilter(metadata, tableHandle, pIntegerColumn, integerSomeValue.complement()))
.containsOnly("p_string=__HIVE_DEFAULT_PARTITION__/p_integer=__HIVE_DEFAULT_PARTITION__", "p_string=def/p_integer=456");
}
}

private Set<String> getPartitionNamesByFilter(ConnectorMetadata metadata, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, Domain domain)
{
return getPartitionNamesByFilter(metadata, tableHandle, new Constraint(TupleDomain.withColumnDomains(ImmutableMap.of(columnHandle, domain))));
}

private Set<String> getPartitionNamesByFilter(ConnectorMetadata metadata, ConnectorTableHandle tableHandle, Constraint constraint)
{
return applyFilter(metadata, tableHandle, constraint)
.getPartitions().orElseThrow(() -> new IllegalStateException("No partitions"))
.stream()
.map(HivePartition::getPartitionId)
.collect(toImmutableSet());
}

@Test
public void testMismatchSchemaTable()
throws Exception
Expand Down Expand Up @@ -5090,10 +5173,11 @@ protected ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, Schema
return handle;
}

private ConnectorTableHandle applyFilter(ConnectorMetadata metadata, ConnectorTableHandle tableHandle, Constraint constraint)
private HiveTableHandle applyFilter(ConnectorMetadata metadata, ConnectorTableHandle tableHandle, Constraint constraint)
{
return metadata.applyFilter(newSession(), tableHandle, constraint)
.map(ConstraintApplicationResult::getHandle)
.map(HiveTableHandle.class::cast)
.orElseThrow(AssertionError::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public final class TestingThriftHiveMetastoreBuilder

private MetastoreLocator metastoreLocator;
private HiveConfig hiveConfig = new HiveConfig();
private ThriftMetastoreConfig thriftMetastoreConfig = new ThriftMetastoreConfig();
private HdfsEnvironment hdfsEnvironment = HDFS_ENVIRONMENT;

public static TestingThriftHiveMetastoreBuilder testingThriftHiveMetastoreBuilder()
Expand Down Expand Up @@ -85,6 +86,12 @@ public TestingThriftHiveMetastoreBuilder hiveConfig(HiveConfig hiveConfig)
return this;
}

public TestingThriftHiveMetastoreBuilder thriftMetastoreConfig(ThriftMetastoreConfig thriftMetastoreConfig)
{
this.thriftMetastoreConfig = requireNonNull(thriftMetastoreConfig, "thriftMetastoreConfig is null");
return this;
}

public TestingThriftHiveMetastoreBuilder hdfsEnvironment(HdfsEnvironment hdfsEnvironment)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand All @@ -97,12 +104,12 @@ public ThriftMetastore build()
ThriftHiveMetastoreFactory metastoreFactory = new ThriftHiveMetastoreFactory(
new TokenDelegationThriftMetastoreFactory(
metastoreLocator,
new ThriftMetastoreConfig(),
thriftMetastoreConfig,
new ThriftMetastoreAuthenticationConfig(),
hdfsEnvironment),
new HiveMetastoreConfig().isHideDeltaLakeTables(),
hiveConfig.isTranslateHiveViews(),
new ThriftMetastoreConfig(),
thriftMetastoreConfig,
hdfsEnvironment);
return metastoreFactory.createMetastore(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.statistics.ColumnStatisticMetadata;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.statistics.TableStatisticType;
Expand Down
17 changes: 17 additions & 0 deletions plugin/trino-hive/src/test/sql/create-test.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
set hive.exec.dynamic.partition.mode=nonstrict;

CREATE TABLE dummy (dummy varchar(1));

CREATE TABLE trino_test_sequence (
n INT
)
Expand Down Expand Up @@ -26,6 +30,13 @@ COMMENT 'Presto test data'
STORED AS TEXTFILE
;

CREATE TABLE trino_test_partitioned_with_null (
a_value STRING
)
PARTITIONED BY (p_string STRING, p_integer int)
STORED AS TEXTFILE
;

CREATE TABLE trino_test_offline (
t_string STRING
)
Expand Down Expand Up @@ -124,6 +135,8 @@ LOAD DATA LOCAL INPATH '/docker/files/words'
INTO TABLE tmp_trino_test_load
;

INSERT INTO dummy VALUES ('x');

INSERT OVERWRITE TABLE trino_test_sequence
SELECT TRANSFORM(word)
USING 'awk "BEGIN { n = 0 } { print ++n }"' AS n
Expand Down Expand Up @@ -193,6 +206,10 @@ SELECT
, 1 + n
FROM trino_test_sequence LIMIT 100;

INSERT INTO TABLE trino_test_partitioned_with_null PARTITION (p_string, p_integer) SELECT 'NULL row', NULL, NULL FROM dummy;
INSERT INTO TABLE trino_test_partitioned_with_null PARTITION (p_string, p_integer) SELECT 'value row', 'abc', 123 FROM dummy;
INSERT INTO TABLE trino_test_partitioned_with_null PARTITION (p_string, p_integer) SELECT 'another value row', 'def', 456 FROM dummy;

INSERT INTO TABLE trino_test_offline_partition PARTITION (ds='2012-12-29')
SELECT 'test' FROM trino_test_sequence LIMIT 100;

Expand Down

0 comments on commit f01510d

Please sign in to comment.