Skip to content

Commit

Permalink
Partition predicate allowed (#26)
Browse files Browse the repository at this point in the history
* Revert "Fail explain with partition filter check (#21)"

This reverts commit 528c3f9.

* Partition column predicate should pass partition filter check

* Remove unrelated changes
  • Loading branch information
James Taylor authored Feb 6, 2020
1 parent 3fda4bc commit 2e1c530
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 156 deletions.
15 changes: 15 additions & 0 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/HiveConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public class HiveConfig
private long fileStatusCacheMaxSize = 1000 * 1000;
private List<String> fileStatusCacheTables = ImmutableList.of();

private boolean queryPartitionFilterRequired;

public int getMaxInitialSplits()
{
return maxInitialSplits;
Expand Down Expand Up @@ -1264,4 +1266,17 @@ public String getTemporaryStagingDirectoryPath()
{
return temporaryStagingDirectoryPath;
}

@Config("hive.query-partition-filter-required")
@ConfigDescription("Require filter on at least one partition column")
public HiveConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
{
this.queryPartitionFilterRequired = queryPartitionFilterRequired;
return this;
}

public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public ConnectorTableHandle getTableHandleForStatisticsCollection(ConnectorSessi
HiveTableHandle table = handle;
return partitionValuesList
.map(values -> partitionManager.getPartitions(table, values))
.map(result -> partitionManager.applyPartitionResult(table, result))
.map(result -> partitionManager.applyPartitionResult(table, result, Optional.empty()))
.orElse(table);
}

Expand Down Expand Up @@ -1848,7 +1848,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
checkArgument(!handle.getAnalyzePartitionValues().isPresent() || constraint.getSummary().isAll(), "Analyze should not have a constraint");

HivePartitionResult partitionResult = partitionManager.getPartitions(metastore, handle, constraint);
HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult);
HiveTableHandle newHandle = partitionManager.applyPartitionResult(handle, partitionResult, Optional.of(constraint));

if (handle.getPartitions().equals(newHandle.getPartitions()) &&
handle.getCompactEffectivePredicate().equals(newHandle.getCompactEffectivePredicate()) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public List<HivePartition> getPartitionsAsList(HivePartitionResult partitionResu
return partitionList.build();
}

public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions)
public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitionResult partitions, Optional<Constraint> constraint)
{
return new HiveTableHandle(
handle.getSchemaName(),
Expand All @@ -228,7 +228,8 @@ public HiveTableHandle applyPartitionResult(HiveTableHandle handle, HivePartitio
partitions.getEnforcedConstraint(),
partitions.getBucketHandle(),
partitions.getBucketFilter(),
handle.getAnalyzePartitionValues());
handle.getAnalyzePartitionValues(),
constraint);
}

public List<HivePartition> getOrLoadPartitions(SemiTransactionalHiveMetastore metastore, HiveTableHandle table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public final class HiveSessionProperties
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -308,6 +309,11 @@ public HiveSessionProperties(HiveConfig hiveConfig, OrcFileWriterConfig orcFileW
TEMPORARY_STAGING_DIRECTORY_PATH,
"Temporary staging directory location",
hiveConfig.getTemporaryStagingDirectoryPath(),
false),
booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
hiveConfig.isQueryPartitionFilterRequired(),
false));
}

Expand Down Expand Up @@ -516,4 +522,9 @@ public static String getTemporaryStagingDirectoryPath(ConnectorSession session)
{
return session.getProperty(TEMPORARY_STAGING_DIRECTORY_PATH, String.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.prestosql.plugin.hive.HiveBucketing.HiveBucketFilter;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.StandardErrorCode;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

Expand All @@ -41,6 +46,7 @@ public class HiveTableHandle
private final Optional<HiveBucketHandle> bucketHandle;
private final Optional<HiveBucketFilter> bucketFilter;
private final Optional<List<List<String>>> analyzePartitionValues;
private final Optional<Constraint> partitionConstraint;

@JsonCreator
public HiveTableHandle(
Expand Down Expand Up @@ -91,6 +97,21 @@ public HiveTableHandle(
Optional<HiveBucketHandle> bucketHandle,
Optional<HiveBucketFilter> bucketFilter,
Optional<List<List<String>>> analyzePartitionValues)
{
this(schemaName, tableName, partitionColumns, partitions, compactEffectivePredicate, enforcedConstraint, bucketHandle, bucketFilter, analyzePartitionValues, Optional.empty());
}

public HiveTableHandle(
String schemaName,
String tableName,
List<HiveColumnHandle> partitionColumns,
Optional<List<HivePartition>> partitions,
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
TupleDomain<ColumnHandle> enforcedConstraint,
Optional<HiveBucketHandle> bucketHandle,
Optional<HiveBucketFilter> bucketFilter,
Optional<List<List<String>>> analyzePartitionValues,
Optional<Constraint> partitionConstraint)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -101,6 +122,7 @@ public HiveTableHandle(
this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null");
this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null");
this.analyzePartitionValues = requireNonNull(analyzePartitionValues, "analyzePartitionValues is null");
this.partitionConstraint = partitionConstraint;
}

public HiveTableHandle withAnalyzePartitionValues(Optional<List<List<String>>> analyzePartitionValues)
Expand Down Expand Up @@ -172,6 +194,26 @@ public Optional<List<List<String>>> getAnalyzePartitionValues()
return analyzePartitionValues;
}

// do not serialize partition constraint as it is not needed on workers
@JsonIgnore
public Optional<Constraint> getPartitionConstraint()
{
return partitionConstraint;
}

@Override
public void validateScan(ConnectorSession session)
{
if (HiveSessionProperties.isQueryPartitionFilterRequired(session) && !partitionColumns.isEmpty()
&& getEnforcedConstraint().isAll()
&& (!getPartitionConstraint().isPresent() || !getPartitionConstraint().get().predicate().isPresent())) {
String partitionColumnNames = partitionColumns.stream().map(n -> n.getName()).collect(Collectors.joining(","));
throw new PrestoException(
StandardErrorCode.QUERY_REJECTED,
String.format("Filter required on %s.%s for at least one partition column: %s ", schemaName, tableName, partitionColumnNames));
}
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void testDefaults()
.setTemporaryStagingDirectoryPath("/tmp/presto-${USER}")
.setFileStatusCacheExpireAfterWrite(new Duration(1, TimeUnit.MINUTES))
.setFileStatusCacheMaxSize(1000 * 1000)
.setFileStatusCacheTables(""));
.setFileStatusCacheTables("")
.setQueryPartitionFilterRequired(false));
}

@Test
Expand Down Expand Up @@ -210,6 +211,7 @@ public void testExplicitPropertyMappings()
.put("hive.file-status-cache-tables", "foo.bar1, foo.bar2")
.put("hive.file-status-cache-size", "1000")
.put("hive.file-status-cache-expire-time", "30m")
.put("hive.query-partition-filter-required", "true")
.build();

HiveConfig expected = new HiveConfig()
Expand Down Expand Up @@ -294,7 +296,8 @@ public void testExplicitPropertyMappings()
.setTemporaryStagingDirectoryPath("updated")
.setFileStatusCacheTables("foo.bar1,foo.bar2")
.setFileStatusCacheMaxSize(1000)
.setFileStatusCacheExpireAfterWrite(new Duration(30, TimeUnit.MINUTES));
.setFileStatusCacheExpireAfterWrite(new Duration(30, TimeUnit.MINUTES))
.setQueryPartitionFilterRequired(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit 2e1c530

Please sign in to comment.