From 2e1c530d9c9d459ffb72f06525efffb489d6c1a3 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Thu, 6 Feb 2020 15:43:08 -0800 Subject: [PATCH] Partition predicate allowed (#26) * Revert "Fail explain with partition filter check (#21)" This reverts commit 528c3f9d2c740d6bc3ec9764ebd8f35b4fa94ddf. * Partition column predicate should pass partition filter check * Remove unrelated changes --- .../io/prestosql/plugin/hive/HiveConfig.java | 15 ++ .../prestosql/plugin/hive/HiveMetadata.java | 4 +- .../plugin/hive/HivePartitionManager.java | 5 +- .../plugin/hive/HiveSessionProperties.java | 11 ++ .../plugin/hive/HiveTableHandle.java | 42 +++++ .../prestosql/plugin/hive/TestHiveConfig.java | 7 +- .../hive/TestHiveIntegrationSmokeTest.java | 148 +++++++++--------- .../io/prestosql/SystemSessionProperties.java | 13 +- .../sql/analyzer/FeaturesConfig.java | 13 -- .../planner/planprinter/IoPlanPrinter.java | 2 +- .../sanity/PartitionFilterChecker.java | 46 +----- .../sql/analyzer/TestFeaturesConfig.java | 7 +- .../spi/connector/ConnectorTableHandle.java | 1 + 13 files changed, 158 insertions(+), 156 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java index 8fee483d7712..290484ba85c2 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java @@ -161,6 +161,8 @@ public class HiveConfig private long fileStatusCacheMaxSize = 1000 * 1000; private List fileStatusCacheTables = ImmutableList.of(); + private boolean queryPartitionFilterRequired; + public int getMaxInitialSplits() { return maxInitialSplits; @@ -1264,4 +1266,17 @@ public String getTemporaryStagingDirectoryPath() { return temporaryStagingDirectoryPath; } + + @Config("hive.query-partition-filter-required") + @ConfigDescription("Require filter on at least one partition column") + public HiveConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired) + { + this.queryPartitionFilterRequired = queryPartitionFilterRequired; + return this; + } + + public boolean isQueryPartitionFilterRequired() + { + return queryPartitionFilterRequired; + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java index 15a10064e453..c9dfc657869c 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java @@ -358,7 +358,7 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi HiveTableHandle table = handle; return partitionValuesList .map(values -> partitionManager.getPartitions(table, values)) - .map(result -> partitionManager.applyPartitionResult(table, result)) + .map(result -> partitionManager.applyPartitionResult(table, result, Optional.empty())) .orElse(table); } @@ -1848,7 +1848,7 @@ public Optional> applyFilter(C checkArgument(!handle.getAnalyzePartitionValues().isPresent() || constraint.getSummary().isAll(), "Analyze should not have a constraint"); HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, handle, constraint); - HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult); + HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, Optional.of(constraint)); if (handle.getPartitions().equals(newHandle.getPartitions()) && handle.getCompactEffectivePredicate().equals(newHandle.getCompactEffectivePredicate()) && diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePartitionManager.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePartitionManager.java index 625451554cfe..bf4c30fc3421 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePartitionManager.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePartitionManager.java @@ -217,7 +217,7 @@ public List getPartitionsAsList(HivePartitionResult partitionResu return partitionList.build(); } - public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions) + public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Optional constraint) { return new HiveTableHandle( handle.getSchemaName(), @@ -228,7 +228,8 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio partitions.getEnforcedConstraint(), partitions.getBucketHandle(), partitions.getBucketFilter(), - handle.getAnalyzePartitionValues()); + handle.getAnalyzePartitionValues(), + constraint); } public List getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java index 6f4da60fbf5d..0d1ed50ebacd 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveSessionProperties.java @@ -78,6 +78,7 @@ public final class HiveSessionProperties private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled"; private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled"; private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path"; + private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private final List> sessionProperties; @@ -308,6 +309,11 @@ public HiveSessionProperties(HiveConfig hiveConfig, OrcFileWriterConfig orcFileW TEMPORARY_STAGING_DIRECTORY_PATH, "Temporary staging directory location", hiveConfig.getTemporaryStagingDirectoryPath(), + false), + booleanProperty( + QUERY_PARTITION_FILTER_REQUIRED, + "Require filter on partition column", + hiveConfig.isQueryPartitionFilterRequired(), false)); } @@ -516,4 +522,9 @@ public static String getTemporaryStagingDirectoryPath(ConnectorSession session) { return session.getProperty(TEMPORARY_STAGING_DIRECTORY_PATH, String.class); } + + public static boolean isQueryPartitionFilterRequired(ConnectorSession session) + { + return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableHandle.java index b35515c53649..4b0e19d90805 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableHandle.java @@ -18,14 +18,19 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.prestosql.plugin.hive.HiveBucketing.HiveBucketFilter; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.StandardErrorCode; import io.prestosql.spi.connector.ColumnHandle; +import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.ConnectorTableHandle; +import io.prestosql.spi.connector.Constraint; import io.prestosql.spi.connector.SchemaTableName; import io.prestosql.spi.predicate.TupleDomain; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -41,6 +46,7 @@ public class HiveTableHandle private final Optional bucketHandle; private final Optional bucketFilter; private final Optional>> analyzePartitionValues; + private final Optional partitionConstraint; @JsonCreator public HiveTableHandle( @@ -91,6 +97,21 @@ public HiveTableHandle( Optional bucketHandle, Optional bucketFilter, Optional>> analyzePartitionValues) + { + this(schemaName, tableName, partitionColumns, partitions, compactEffectivePredicate, enforcedConstraint, bucketHandle, bucketFilter, analyzePartitionValues, Optional.empty()); + } + + public HiveTableHandle( + String schemaName, + String tableName, + List partitionColumns, + Optional> partitions, + TupleDomain compactEffectivePredicate, + TupleDomain enforcedConstraint, + Optional bucketHandle, + Optional bucketFilter, + Optional>> analyzePartitionValues, + Optional partitionConstraint) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -101,6 +122,7 @@ public HiveTableHandle( this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null"); this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null"); this.analyzePartitionValues = requireNonNull(analyzePartitionValues, "analyzePartitionValues is null"); + this.partitionConstraint = partitionConstraint; } public HiveTableHandle withAnalyzePartitionValues(Optional>> analyzePartitionValues) @@ -172,6 +194,26 @@ public Optional>> getAnalyzePartitionValues() return analyzePartitionValues; } + // do not serialize partition constraint as it is not needed on workers + @JsonIgnore + public Optional getPartitionConstraint() + { + return partitionConstraint; + } + + @Override + public void validateScan(ConnectorSession session) + { + if (HiveSessionProperties.isQueryPartitionFilterRequired(session) && !partitionColumns.isEmpty() + && getEnforcedConstraint().isAll() + && (!getPartitionConstraint().isPresent() || !getPartitionConstraint().get().predicate().isPresent())) { + String partitionColumnNames = partitionColumns.stream().map(n -> n.getName()).collect(Collectors.joining(",")); + throw new PrestoException( + StandardErrorCode.QUERY_REJECTED, + String.format("Filter required on %s.%s for at least one partition column: %s ", schemaName, tableName, partitionColumnNames)); + } + } + public SchemaTableName getSchemaTableName() { return new SchemaTableName(schemaName, tableName); diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java index 48e979ba55e7..ebbed821eb62 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveConfig.java @@ -121,7 +121,8 @@ public void testDefaults() .setTemporaryStagingDirectoryPath("/tmp/presto-${USER}") .setFileStatusCacheExpireAfterWrite(new Duration(1, TimeUnit.MINUTES)) .setFileStatusCacheMaxSize(1000 * 1000) - .setFileStatusCacheTables("")); + .setFileStatusCacheTables("") + .setQueryPartitionFilterRequired(false)); } @Test @@ -210,6 +211,7 @@ public void testExplicitPropertyMappings() .put("hive.file-status-cache-tables", "foo.bar1, foo.bar2") .put("hive.file-status-cache-size", "1000") .put("hive.file-status-cache-expire-time", "30m") + .put("hive.query-partition-filter-required", "true") .build(); HiveConfig expected = new HiveConfig() @@ -294,7 +296,8 @@ public void testExplicitPropertyMappings() .setTemporaryStagingDirectoryPath("updated") .setFileStatusCacheTables("foo.bar1,foo.bar2") .setFileStatusCacheMaxSize(1000) - .setFileStatusCacheExpireAfterWrite(new Duration(30, TimeUnit.MINUTES)); + .setFileStatusCacheExpireAfterWrite(new Duration(30, TimeUnit.MINUTES)) + .setQueryPartitionFilterRequired(true); assertFullMapping(properties, expected); } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java index 9468496203f0..b7fd520f9527 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java @@ -103,7 +103,6 @@ import static io.prestosql.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION; import static io.prestosql.SystemSessionProperties.GROUPED_EXECUTION; import static io.prestosql.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; -import static io.prestosql.SystemSessionProperties.QUERY_PARTITION_FILTER_REQUIRED; import static io.prestosql.plugin.hive.HiveColumnHandle.BUCKET_COLUMN_NAME; import static io.prestosql.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME; import static io.prestosql.plugin.hive.HiveQueryRunner.HIVE_CATALOG; @@ -116,7 +115,6 @@ import static io.prestosql.plugin.hive.HiveTableProperties.STORAGE_FORMAT_PROPERTY; import static io.prestosql.plugin.hive.HiveTestUtils.TYPE_MANAGER; import static io.prestosql.plugin.hive.HiveUtil.columnExtraInfo; -import static io.prestosql.plugin.hive.TestEventListenerPlugin.TestingEventListenerPlugin; import static io.prestosql.spi.predicate.Marker.Bound.ABOVE; import static io.prestosql.spi.predicate.Marker.Bound.EXACTLY; import static io.prestosql.spi.security.SelectedRole.Type.ROLE; @@ -245,130 +243,136 @@ private Consumer assertPrunedLayout(String expectedType) } @Test - public void testLackOfPartitionFilterInExplain() throws Exception + public void testLackOfPartitionFilterNotAllowed() throws Exception { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); - getQueryRunner().installPlugin(new TestingEventListenerPlugin(generatedEvents)); - generatedEvents.initialize(2); assertUpdate( admin, - "create table partition_test(\n" + "create table partition_test1(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - generatedEvents.waitForEvents(10); - generatedEvents.initialize(2); - assertUpdate(admin, "insert into partition_test(id,a,ds) values(1, 'a','a')", 1); - generatedEvents.waitForEvents(10); - generatedEvents.initialize(2); - assertQueryFails(admin, "explain (type io) select id from partition_test where a = 'a'", "Filter required on tpch\\.partition_test for at least one partition column.*"); - generatedEvents.waitForEvents(10); - generatedEvents.initialize(2); - assertUpdate(admin, "DROP TABLE partition_test"); - generatedEvents.waitForEvents(10); + assertUpdate(admin, "insert into partition_test1(id,a,ds) values(1, 'a','a')", 1); + String query = "select id from partition_test1 where a = 'a'"; + String msgRegExp = "Filter required on tpch\\.partition_test1 for at least one partition column:.*"; + assertQueryFails(admin, query, msgRegExp); + assertQueryFails(admin, "explain " + query, msgRegExp); + assertUpdate(admin, "DROP TABLE partition_test1"); } @Test - public void testLackOfPartitionFilterNotAllowed() + public void testPartitionFilterRemoved() { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test(\n" + "create table partition_test2(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate(admin, "insert into partition_test(id,a,ds) values(1, 'a','a')", 1); - assertQueryFails(admin, "select id from partition_test where a = 'a'", "Filter required on tpch\\.partition_test for at least one partition column:.*"); - assertUpdate(admin, "DROP TABLE partition_test"); + assertUpdate(admin, "insert into partition_test2(id,a,ds) values(1, 'a','a')", 1); + assertQueryFails(admin, "select id from partition_test2 where ds is not null or ds is null", "Filter required on tpch\\.partition_test2 for at least one partition column:.*"); + assertUpdate(admin, "DROP TABLE partition_test2"); } @Test - public void testPartitionFilterRemoved() + public void testPartitionFilterIncluded() { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test(\n" + "create table partition_test3(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate(admin, "insert into partition_test(id,a,ds) values(1, 'a','a')", 1); - assertQueryFails(admin, "select id from partition_test where ds is not null or ds is null", "Filter required on tpch\\.partition_test for at least one partition column:.*"); - assertUpdate(admin, "DROP TABLE partition_test"); + assertUpdate(admin, "insert into partition_test3(id,a,ds) values(1, 'a','a')", 1); + String query = "select id from partition_test3 where ds = 'a'"; + assertQuery(admin, query, "select 1"); + computeActual(admin, "explain " + query); + assertUpdate(admin, "DROP TABLE partition_test3"); } @Test - public void testPartitionFilterIncluded() + public void testPartitionFilterIncluded2() { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test(\n" + "create table partition_test4(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate(admin, "insert into partition_test(id,a,ds) values(1, 'a','a')", 1); - assertQuery(admin, "select id from partition_test where ds = 'a'", "select 1"); - assertUpdate(admin, "DROP TABLE partition_test"); + assertUpdate(admin, "insert into partition_test4(id,a,ds) values(1, 'a','a')", 1); + assertQuery(admin, "select id from partition_test4 where ds is not null", "select 1"); + assertUpdate(admin, "DROP TABLE partition_test4"); } @Test - public void testPartitionFilterIncluded2() + public void testPartitionFilterInferred() { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test(\n" + "create table partition_test5(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate(admin, "insert into partition_test(id,a,ds) values(1, 'a','a')", 1); - assertQuery(admin, "select id from partition_test where ds is not null", "select 1"); - assertUpdate(admin, "DROP TABLE partition_test"); + assertUpdate( + admin, + "create table partition_test6(\n" + + "id integer,\n" + + "a varchar,\n" + + "b varchar,\n" + + "ds varchar)" + + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); + assertUpdate(admin, "insert into partition_test5(id,a,ds) values(1, 'a','a')", 1); + assertUpdate(admin, "insert into partition_test6(id,a,ds) values(1, 'a','a')", 1); + assertQuery(admin, "select a.id, b.id from partition_test5 a join partition_test6 b on (a.ds = b.ds) where a.ds = 'a'", "select 1,1"); + assertUpdate(admin, "DROP TABLE partition_test5"); + assertUpdate(admin, "DROP TABLE partition_test6"); } @Test - public void testPartitionFilterInferred() + public void testJoinPartitionedWithMissingPartitionFilter() { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test1(\n" + "create table partition_test7(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" @@ -376,30 +380,30 @@ public void testPartitionFilterInferred() + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); assertUpdate( admin, - "create table partition_test2(\n" + "create table partition_test8(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate(admin, "insert into partition_test1(id,a,ds) values(1, 'a','a')", 1); - assertUpdate(admin, "insert into partition_test2(id,a,ds) values(1, 'a','a')", 1); - assertQuery(admin, "select a.id, b.id from partition_test1 a join partition_test2 b on (a.ds = b.ds) where a.ds = 'a'", "select 1,1"); - assertUpdate(admin, "DROP TABLE partition_test1"); - assertUpdate(admin, "DROP TABLE partition_test2"); + assertUpdate(admin, "insert into partition_test7(id,a,ds) values(1, 'a','a')", 1); + assertUpdate(admin, "insert into partition_test8(id,a,ds) values(1, 'a','a')", 1); + assertQueryFails(admin, "select a.id, b.id from partition_test7 a join partition_test8 b on (a.id = b.id) where a.ds = 'a'", "Filter required on tpch\\.partition_test8 for at least one partition column:.*"); + assertUpdate(admin, "DROP TABLE partition_test7"); + assertUpdate(admin, "DROP TABLE partition_test8"); } @Test - public void testJoinPartitionedWithMissingPartitionFilter() + public void testJoinWithPartitionFilterOnPartionedTable() { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test1(\n" + "create table partition_test9(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" @@ -407,48 +411,40 @@ public void testJoinPartitionedWithMissingPartitionFilter() + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); assertUpdate( admin, - "create table partition_test2(\n" + "create table partition_test10(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" - + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate(admin, "insert into partition_test1(id,a,ds) values(1, 'a','a')", 1); - assertUpdate(admin, "insert into partition_test2(id,a,ds) values(1, 'a','a')", 1); - assertQueryFails(admin, "select a.id, b.id from partition_test1 a join partition_test2 b on (a.id = b.id) where a.ds = 'a'", "Filter required on tpch\\.partition_test2 for at least one partition column:.*"); - assertUpdate(admin, "DROP TABLE partition_test1"); - assertUpdate(admin, "DROP TABLE partition_test2"); + + "WITH (format='PARQUET')"); + assertUpdate(admin, "insert into partition_test9(id,a,ds) values(1, 'a','a')", 1); + assertUpdate(admin, "insert into partition_test10(id,a,ds) values(1, 'a','a')", 1); + assertQuery(admin, "select a.id, b.id from partition_test9 a join partition_test10 b on (a.id = b.id) where a.ds = 'a'", "SELECT 1, 1"); + assertUpdate(admin, "DROP TABLE partition_test9"); + assertUpdate(admin, "DROP TABLE partition_test10"); } @Test - public void testJoinWithPartitionFilterOnPartionedTable() + public void testPartitionPredicateAllowed() throws Exception { Session admin = Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, "true") .setIdentity(new Identity("hive", Optional.empty(), ImmutableMap.of("hive", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("admin"))))) + .setCatalogSessionProperty("hive", "query_partition_filter_required", "true") .build(); assertUpdate( admin, - "create table partition_test1(\n" + "create table partition_test11(\n" + "id integer,\n" + "a varchar,\n" + "b varchar,\n" + "ds varchar)" + "WITH (format='PARQUET', partitioned_by = ARRAY['ds'])"); - assertUpdate( - admin, - "create table partition_test2(\n" - + "id integer,\n" - + "a varchar,\n" - + "b varchar,\n" - + "ds varchar)" - + "WITH (format='PARQUET')"); - assertUpdate(admin, "insert into partition_test1(id,a,ds) values(1, 'a','a')", 1); - assertUpdate(admin, "insert into partition_test2(id,a,ds) values(1, 'a','a')", 1); - assertQuery(admin, "select a.id, b.id from partition_test1 a join partition_test2 b on (a.id = b.id) where a.ds = 'a'", "SELECT 1, 1"); - assertUpdate(admin, "DROP TABLE partition_test1"); - assertUpdate(admin, "DROP TABLE partition_test2"); + assertUpdate(admin, "insert into partition_test11(id,a,ds) values(1, '1','1')", 1); + String query = "select id from partition_test11 where cast(ds as integer) = 1"; + assertQuery(admin, query, "select 1"); + computeActual(admin, "explain " + query); + assertUpdate(admin, "DROP TABLE partition_test11"); } @Test diff --git a/presto-main/src/main/java/io/prestosql/SystemSessionProperties.java b/presto-main/src/main/java/io/prestosql/SystemSessionProperties.java index f2e8e5b51f8e..f054316af8b8 100644 --- a/presto-main/src/main/java/io/prestosql/SystemSessionProperties.java +++ b/presto-main/src/main/java/io/prestosql/SystemSessionProperties.java @@ -121,7 +121,6 @@ public final class SystemSessionProperties public static final String ENABLE_DYNAMIC_FILTERING = "enable_dynamic_filtering"; public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node"; public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE = "query_max_total_memory_per_node"; - public static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private final List> sessionProperties; @@ -524,12 +523,7 @@ public SystemSessionProperties( QUERY_MAX_TOTAL_MEMORY_PER_NODE, "Maximum amount of total memory a query can use per node", nodeMemoryConfig.getMaxQueryTotalMemoryPerNode(), - true), - booleanProperty( - QUERY_PARTITION_FILTER_REQUIRED, - "Require filter on partition column", - featuresConfig.isQueryPartitionFilterRequired(), - false)); + true)); } public List> getSessionProperties() @@ -936,9 +930,4 @@ public static DataSize getQueryMaxTotalMemoryPerNode(Session session) { return session.getSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, DataSize.class); } - - public static boolean isQueryPartitionFilterRequired(Session session) - { - return session.getSystemProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); - } } diff --git a/presto-main/src/main/java/io/prestosql/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/io/prestosql/sql/analyzer/FeaturesConfig.java index 9d0245372595..ffadf9f01b90 100644 --- a/presto-main/src/main/java/io/prestosql/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/io/prestosql/sql/analyzer/FeaturesConfig.java @@ -129,7 +129,6 @@ public class FeaturesConfig private DataSize filterAndProjectMinOutputPageSize = new DataSize(500, KILOBYTE); private int filterAndProjectMinOutputPageRowCount = 256; private int maxGroupingSets = 2048; - private boolean queryPartitionFilterRequired; public enum JoinReorderingStrategy { @@ -919,16 +918,4 @@ public FeaturesConfig setSkipRedundantSort(boolean value) this.skipRedundantSort = value; return this; } - - @Config("query-partition-filter-required") - public FeaturesConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired) - { - this.queryPartitionFilterRequired = queryPartitionFilterRequired; - return this; - } - - public boolean isQueryPartitionFilterRequired() - { - return queryPartitionFilterRequired; - } } diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/planprinter/IoPlanPrinter.java b/presto-main/src/main/java/io/prestosql/sql/planner/planprinter/IoPlanPrinter.java index 1875bb2bbb9c..63c616fe5dd0 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/planprinter/IoPlanPrinter.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/planprinter/IoPlanPrinter.java @@ -31,8 +31,8 @@ import io.prestosql.spi.predicate.Marker.Bound; import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.type.Type; -import io.prestosql.sql.planner.DomainTranslator; import io.prestosql.spi.type.TypeSignature; +import io.prestosql.sql.planner.DomainTranslator; import io.prestosql.sql.planner.Plan; import io.prestosql.sql.planner.plan.FilterNode; import io.prestosql.sql.planner.plan.PlanNode; diff --git a/presto-main/src/main/java/io/prestosql/sql/planner/sanity/PartitionFilterChecker.java b/presto-main/src/main/java/io/prestosql/sql/planner/sanity/PartitionFilterChecker.java index 3f260818e228..2fb6ce92f1ed 100644 --- a/presto-main/src/main/java/io/prestosql/sql/planner/sanity/PartitionFilterChecker.java +++ b/presto-main/src/main/java/io/prestosql/sql/planner/sanity/PartitionFilterChecker.java @@ -17,67 +17,27 @@ import io.prestosql.execution.warnings.WarningCollector; import io.prestosql.metadata.Metadata; import io.prestosql.metadata.TableHandle; -import io.prestosql.metadata.TableProperties; -import io.prestosql.spi.PrestoException; -import io.prestosql.spi.StandardErrorCode; -import io.prestosql.spi.connector.ColumnHandle; -import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.sql.planner.SimplePlanVisitor; import io.prestosql.sql.planner.TypeAnalyzer; import io.prestosql.sql.planner.TypeProvider; import io.prestosql.sql.planner.plan.PlanNode; -import io.prestosql.sql.planner.plan.PlanVisitor; import io.prestosql.sql.planner.plan.TableScanNode; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import static io.prestosql.SystemSessionProperties.isQueryPartitionFilterRequired; - public class PartitionFilterChecker implements PlanSanityChecker.Checker { @Override public void validate(PlanNode plan, final Session session, final Metadata metadata, TypeAnalyzer typeAnalyzer, TypeProvider types, WarningCollector warningCollector) { - if (!isQueryPartitionFilterRequired(session)) { - return; - } - plan.accept(new PlanVisitor() + plan.accept(new SimplePlanVisitor() { - @Override - protected Void visitPlan(PlanNode node, Void context) - { - for (PlanNode source : node.getSources()) { - source.accept(this, context); - } - return null; - } - @Override public Void visitTableScan(TableScanNode node, Void context) { TableHandle table = node.getTable(); - List partitionColumns = getPartitionColumns(table); - if (!partitionColumns.isEmpty() && node.getEnforcedConstraint().isAll()) { - SchemaTableName schemaTableName = metadata.getTableMetadata(session, table).getTable(); - String partitionColumnNames = partitionColumns.stream().map(n -> metadata.getColumnMetadata(session, table, n).getName()).collect(Collectors.joining(",")); - throw new PrestoException( - StandardErrorCode.QUERY_REJECTED, - String.format("Filter required on %s.%s for at least one partition column: %s ", schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionColumnNames)); - } + table.getConnectorHandle().validateScan(session.toConnectorSession(table.getCatalogName())); return null; } - - private List getPartitionColumns(TableHandle handle) - { - TableProperties properties = metadata.getTableProperties(session, handle); - if (properties.getDiscretePredicates().isPresent() - && !properties.getDiscretePredicates().get().getColumns().isEmpty()) { - return properties.getDiscretePredicates().get().getColumns(); - } - return Collections.emptyList(); - } }, null); } } diff --git a/presto-main/src/test/java/io/prestosql/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/io/prestosql/sql/analyzer/TestFeaturesConfig.java index 7ca35bb15542..83a35a561711 100644 --- a/presto-main/src/test/java/io/prestosql/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/io/prestosql/sql/analyzer/TestFeaturesConfig.java @@ -107,8 +107,7 @@ public void testDefaults() .setMaxGroupingSets(2048) .setWorkProcessorPipelines(false) .setSkipRedundantSort(true) - .setEnableDynamicFiltering(false) - .setQueryPartitionFilterRequired(false)); + .setEnableDynamicFiltering(false)); } @Test @@ -176,7 +175,6 @@ public void testExplicitPropertyMappings() .put("experimental.work-processor-pipelines", "true") .put("optimizer.skip-redundant-sort", "false") .put("experimental.enable-dynamic-filtering", "true") - .put("query-partition-filter-required", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -240,8 +238,7 @@ public void testExplicitPropertyMappings() .setDefaultFilterFactorEnabled(true) .setWorkProcessorPipelines(true) .setSkipRedundantSort(false) - .setEnableDynamicFiltering(true) - .setQueryPartitionFilterRequired(true); + .setEnableDynamicFiltering(true); assertFullMapping(properties, expected); } diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableHandle.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableHandle.java index f59957c30368..79ff00be509e 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableHandle.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorTableHandle.java @@ -15,4 +15,5 @@ public interface ConnectorTableHandle { + default void validateScan(ConnectorSession session) {} }