diff --git a/presto-hudi/pom.xml b/presto-hudi/pom.xml index cbd81df8097a..5cb9de91bcf9 100644 --- a/presto-hudi/pom.xml +++ b/presto-hudi/pom.xml @@ -222,5 +222,12 @@ units provided + + + + org.lz4 + lz4-java + 1.8.0 + diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiFileSkippingManager.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiFileSkippingManager.java new file mode 100644 index 000000000000..c00f0800a045 --- /dev/null +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiFileSkippingManager.java @@ -0,0 +1,330 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.hudi; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.common.predicate.Domain; +import com.facebook.presto.common.predicate.Range; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.ValueSet; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.parquet.predicate.TupleDomainParquetPredicate; +import com.facebook.presto.spi.ColumnHandle; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.config.HoodieCommonConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieTableQueryType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.hash.ColumnIndexID; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.common.type.DateType.DATE; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.RealType.REAL; +import static com.facebook.presto.common.type.SmallintType.SMALLINT; +import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.common.type.Varchars.isVarcharType; +import static com.facebook.presto.parquet.predicate.PredicateUtils.isStatisticsOverflow; +import static java.lang.Float.floatToRawIntBits; +import static java.util.Objects.requireNonNull; + +public class HudiFileSkippingManager +{ + private static final Logger log = Logger.get(HudiFileSkippingManager.class); + + private final HoodieTableQueryType queryType; + private final Optional specifiedQueryInstant; + private final HoodieTableMetaClient metaClient; + private final HoodieTableMetadata metadataTable; + + private final Map> allInputFileSlices; + + public HudiFileSkippingManager( + List partitions, + String spillableDir, + HoodieEngineContext engineContext, + HoodieTableMetaClient metaClient, + HoodieTableQueryType queryType, + Optional specifiedQueryInstant) + { + requireNonNull(partitions, "partitions is null"); + requireNonNull(spillableDir, "spillableDir is null"); + requireNonNull(engineContext, "engineContext is null"); + this.queryType = requireNonNull(queryType, "queryType is null"); + this.specifiedQueryInstant = requireNonNull(specifiedQueryInstant, "specifiedQueryInstant is null"); + this.metaClient = requireNonNull(metaClient, "metaClient is null"); + + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build(); + this.metadataTable = HoodieTableMetadata + .create(engineContext, metadataConfig, metaClient.getBasePathV2().toString(), spillableDir, true); + this.allInputFileSlices = prepareAllInputFileSlices(partitions, engineContext, metadataConfig, spillableDir); + } + + private Map> prepareAllInputFileSlices( + List partitions, + HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String spillableDir) + { + long startTime = System.currentTimeMillis(); + HoodieTimeline activeTimeline = metaClient.reloadActiveTimeline(); + Optional latestInstant = activeTimeline.lastInstant().toJavaOptional(); + // build system view. + SyncableFileSystemView fileSystemView = FileSystemViewManager + .createViewManager(engineContext, + metadataConfig, + FileSystemViewStorageConfig.newBuilder().withBaseStoreDir(spillableDir).build(), + HoodieCommonConfig.newBuilder().build(), + () -> metadataTable) + .getFileSystemView(metaClient); + Optional queryInstant = specifiedQueryInstant.isPresent() ? + specifiedQueryInstant : latestInstant.map(HoodieInstant::getTimestamp); + + Map> allInputFileSlices = engineContext + .mapToPair( + partitions, + partitionPath -> Pair.of( + partitionPath, + getLatestFileSlices(partitionPath, fileSystemView, queryInstant)), + partitions.size()); + + long duration = System.currentTimeMillis() - startTime; + log.debug("prepare query files for table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration); + return allInputFileSlices; + } + + private List getLatestFileSlices( + String partitionPath, + SyncableFileSystemView fileSystemView, + Optional queryInstant) + { + return queryInstant + .map(instant -> + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, queryInstant.get())) + .orElse(fileSystemView.getLatestFileSlices(partitionPath)) + .collect(Collectors.toList()); + } + + public Map> listQueryFiles(TupleDomain tupleDomain) + { + // do file skipping by MetadataTable + Map> candidateFileSlices = allInputFileSlices; + try { + if (!tupleDomain.isAll()) { + candidateFileSlices = lookupCandidateFilesInMetadataTable(candidateFileSlices, tupleDomain); + } + } + catch (Exception e) { + // Should not throw exception, just log this Exception. + log.warn(e, "failed to do data skipping for table: %s, fallback to all files scan", metaClient.getBasePathV2()); + candidateFileSlices = allInputFileSlices; + } + if (log.isDebugEnabled()) { + int candidateFileSize = candidateFileSlices.values().stream().mapToInt(List::size).sum(); + int totalFiles = allInputFileSlices.values().stream().mapToInt(List::size).sum(); + double skippingPercent = totalFiles == 0 ? 0.0d : (totalFiles - candidateFileSize) / (totalFiles + 0.0d); + log.debug("Total files: %s; candidate files after data skipping: %s; skipping percent %s", + totalFiles, + candidateFileSize, + skippingPercent); + } + return candidateFileSlices; + } + + private Map> lookupCandidateFilesInMetadataTable( + Map> inputFileSlices, + TupleDomain tupleDomain) + { + // split regular column predicates + TupleDomain regularTupleDomain = HudiPredicates.from(tupleDomain).getRegularColumnPredicates(); + TupleDomain regularColumnPredicates = regularTupleDomain.transform(HudiColumnHandle::getName); + if (regularColumnPredicates.isAll() || !regularColumnPredicates.getDomains().isPresent()) { + return inputFileSlices; + } + List regularColumns = regularColumnPredicates + .getDomains().get().entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList()); + // get filter columns + List encodedTargetColumnNames = regularColumns + .stream() + .map(col -> new ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList()); + Map> statsByFileName = metadataTable.getRecordsByKeyPrefixes( + encodedTargetColumnNames, + HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, true) + .collectAsList() + .stream() + .filter(f -> f.getData().getColumnStatMetadata().isPresent()) + .map(f -> f.getData().getColumnStatMetadata().get()) + .collect(Collectors.groupingBy(HoodieMetadataColumnStats::getFileName)); + + // prune files. + return inputFileSlices + .entrySet() + .stream() + .collect(Collectors + .toMap(entry -> entry.getKey(), entry -> entry + .getValue() + .stream() + .filter(fileSlice -> pruneFiles(fileSlice, statsByFileName, regularColumnPredicates, regularColumns)) + .collect(Collectors.toList()))); + } + + private boolean pruneFiles( + FileSlice fileSlice, + Map> statsByFileName, + TupleDomain regularColumnPredicates, + List regularColumns) + { + String fileSliceName = fileSlice.getBaseFile().map(BaseFile::getFileName).orElse(""); + // no stats found + if (!statsByFileName.containsKey(fileSliceName)) { + return true; + } + List stats = statsByFileName.get(fileSliceName); + return evaluateStatisticPredicate(regularColumnPredicates, stats, regularColumns); + } + + private boolean evaluateStatisticPredicate( + TupleDomain regularColumnPredicates, + List stats, + List regularColumns) + { + if (regularColumnPredicates.isNone() || !regularColumnPredicates.getDomains().isPresent()) { + return true; + } + for (String regularColumn : regularColumns) { + Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn); + Optional currentColumnStats = stats + .stream().filter(s -> s.getColumnName().equals(regularColumn)).findFirst(); + if (!currentColumnStats.isPresent()) { + // no stats for column + } + else { + Domain domain = getDomain(regularColumn, columnPredicate.getType(), currentColumnStats.get()); + if (columnPredicate.intersect(domain).isNone()) { + return false; + } + } + } + return true; + } + + private static Domain getDomain(String colName, Type type, HoodieMetadataColumnStats statistics) + { + if (statistics == null) { + return Domain.all(type); + } + boolean hasNullValue = statistics.getNullCount() != 0L; + boolean hasNonNullValue = statistics.getValueCount() - statistics.getNullCount() > 0; + if (!hasNonNullValue || statistics.getMaxValue() == null || statistics.getMinValue() == null) { + return Domain.create(ValueSet.all(type), hasNullValue); + } + if (!(statistics.getMinValue() instanceof org.apache.hudi.org.apache.avro.generic.GenericRecord) || + !(statistics.getMaxValue() instanceof org.apache.hudi.org.apache.avro.generic.GenericRecord)) { + return Domain.all(type); + } + return getDomain(colName, type, ((org.apache.hudi.org.apache.avro.generic.GenericRecord) statistics.getMinValue()).get(0), + ((org.apache.hudi.org.apache.avro.generic.GenericRecord) statistics.getMaxValue()).get(0), hasNullValue); + } + + /** + * Get a domain for the ranges defined by each pair of elements from {@code minimums} and {@code maximums}. + * Both arrays must have the same length. + */ + private static Domain getDomain(String colName, Type type, Object minimum, Object maximum, boolean hasNullValue) + { + try { + if (type.equals(BOOLEAN)) { + boolean hasTrueValue = (boolean) minimum || (boolean) maximum; + boolean hasFalseValue = !(boolean) minimum || !(boolean) maximum; + if (hasTrueValue && hasFalseValue) { + return Domain.all(type); + } + if (hasTrueValue) { + return Domain.create(ValueSet.of(type, true), hasNullValue); + } + if (hasFalseValue) { + return Domain.create(ValueSet.of(type, false), hasNullValue); + } + // No other case, since all null case is handled earlier. + } + + if ((type.equals(BIGINT) || type.equals(TINYINT) || type.equals(SMALLINT) || type.equals(INTEGER) || type.equals(DATE))) { + long minValue = TupleDomainParquetPredicate.asLong(minimum); + long maxValue = TupleDomainParquetPredicate.asLong(maximum); + if (isStatisticsOverflow(type, minValue, maxValue)) { + return Domain.create(ValueSet.all(type), hasNullValue); + } + return ofMinMax(type, minValue, maxValue, hasNullValue); + } + + if (type.equals(REAL)) { + Float minValue = (Float) minimum; + Float maxValue = (Float) maximum; + if (minValue.isNaN() || maxValue.isNaN()) { + return Domain.create(ValueSet.all(type), hasNullValue); + } + return ofMinMax(type, (long) floatToRawIntBits(minValue), (long) floatToRawIntBits(maxValue), hasNullValue); + } + + if (type.equals(DOUBLE)) { + Double minValue = (Double) minimum; + Double maxValue = (Double) maximum; + if (minValue.isNaN() || maxValue.isNaN()) { + return Domain.create(ValueSet.all(type), hasNullValue); + } + return ofMinMax(type, minValue, maxValue, hasNullValue); + } + + if (isVarcharType(type)) { + Slice min = Slices.utf8Slice((String) minimum); + Slice max = Slices.utf8Slice((String) maximum); + return ofMinMax(type, min, max, hasNullValue); + } + return Domain.create(ValueSet.all(type), hasNullValue); + } + catch (Exception e) { + log.warn("failed to create Domain for column: %s which type is: %s", colName, type.toString()); + return Domain.create(ValueSet.all(type), hasNullValue); + } + } + + private static Domain ofMinMax(Type type, Object min, Object max, boolean hasNullValue) + { + Range range = Range.range(type, min, true, max, true); + ValueSet vs = ValueSet.ofRanges(ImmutableList.of(range)); + return Domain.create(vs, hasNullValue); + } +} diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java index b512575978e8..8ec5ed341a83 100644 --- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPageSourceProvider.java @@ -14,7 +14,6 @@ package com.facebook.presto.hudi; -import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.FileFormatDataSourceStats; @@ -104,7 +103,7 @@ public ConnectorPageSource createPageSource( baseFile.getStart(), baseFile.getLength(), dataColumns, - TupleDomain.all(), // TODO: predicates + HudiPredicates.from(layout.getTupleDomain()).getRegularColumnPredicates(), fileFormatDataSourceStats); } else if (tableType == HudiTableType.MOR) { diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java index f66b47d0d2f8..c4b03540d683 100644 --- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPartitionManager.java @@ -14,34 +14,67 @@ package com.facebook.presto.hudi; +import com.facebook.airlift.log.Logger; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.ValueSet; +import com.facebook.presto.common.type.StandardTypes; +import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.hive.metastore.Column; import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.MetastoreContext; +import com.facebook.presto.hive.metastore.MetastoreUtil; import com.facebook.presto.hive.metastore.Table; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.PrestoException; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import javax.inject.Inject; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import static com.facebook.presto.hudi.HudiErrorCode.HUDI_INVALID_PARTITION_VALUE; import static com.facebook.presto.hudi.HudiMetadata.fromPartitionColumns; import static com.facebook.presto.hudi.HudiMetadata.toMetastoreContext; +import static com.facebook.presto.hudi.HudiSessionProperties.isHudiMetadataTableEnabled; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.Slices.utf8Slice; +import static java.lang.Double.doubleToRawLongBits; +import static java.lang.Double.parseDouble; +import static java.lang.Float.floatToRawIntBits; +import static java.lang.Float.parseFloat; +import static java.lang.Long.parseLong; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class HudiPartitionManager { + private static final Logger log = Logger.get(HudiPartitionManager.class); + private final TypeManager typeManager; + private static final Pattern HIVE_PARTITION_NAME_PATTERN = Pattern.compile("([^/]+)=([^/]+)"); + @Inject public HudiPartitionManager(TypeManager typeManager) { @@ -51,6 +84,7 @@ public HudiPartitionManager(TypeManager typeManager) public List getEffectivePartitions( ConnectorSession connectorSession, ExtendedHiveMetastore metastore, + HoodieTableMetaClient metaClient, String schemaName, String tableName, TupleDomain tupleDomain) @@ -59,10 +93,233 @@ public List getEffectivePartitions( Optional table = metastore.getTable(metastoreContext, schemaName, tableName); Verify.verify(table.isPresent()); List partitionColumns = table.get().getPartitionColumns(); + + if (partitionColumns.isEmpty()) { + return ImmutableList.of(""); + } + + boolean metaTableEnabled = isHudiMetadataTableEnabled(connectorSession); + + return metaTableEnabled ? + prunePartitionByMetaDataTable(metaClient, partitionColumns, tupleDomain) : + prunePartitionByMetaStore(metastore, metastoreContext, schemaName, tableName, partitionColumns, tupleDomain); + } + + private List prunePartitionByMetaDataTable( + HoodieTableMetaClient metaClient, + List partitionColumns, + TupleDomain tupleDomain) + { + // non-partition table if (partitionColumns.isEmpty()) { return ImmutableList.of(""); } + Configuration conf = metaClient.getHadoopConf(); + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf); + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build(); + + // Load all the partition path from the basePath + List allPartitions = FSUtils.getAllPartitionPaths( + engineContext, + metadataConfig, + metaClient.getBasePathV2().toString()); + + // Extract partition columns predicate + TupleDomain partitionPredicate = tupleDomain.transform(hudiColumnHandle -> { + if (((HudiColumnHandle) hudiColumnHandle).getColumnType() != HudiColumnHandle.ColumnType.PARTITION_KEY) { + return null; + } + return ((HudiColumnHandle) hudiColumnHandle).getName(); + }); + + if (partitionPredicate.isAll()) { + return allPartitions; + } + + if (partitionPredicate.isNone()) { + return ImmutableList.of(""); + } + + List partitionColumnHandles = fromPartitionColumns(partitionColumns); + + List matchedPartitionPaths = prunePartitions( + partitionPredicate, + partitionColumnHandles, + getPartitions( + partitionColumns.stream().map(f -> f.getName()).collect(Collectors.toList()), + allPartitions)); + log.debug(format("Total partition size is %s, after partition prune size is %s.", + allPartitions.size(), matchedPartitionPaths.size())); + return matchedPartitionPaths; + } + + /** + * Returns the partition path key and values as a list of map. + * For example: + * partition keys: [p1, p2, p3], + * partition paths: + * p1=val1/p2=val2/p3=val3 (hive style partition) + * p1=val4/p2=val5/p3=val6 (hive style partition) + * return values {p1=val1/p2=val2/p3=val3 -> {p1 -> val1, p2 -> value2, p3 -> value3}}, + * {p1=val4/p2=val5/p3=val6 -> {p1 -> val4, p2 -> value5, p3 -> value6}} + * + * @param partitionKey The partition key list + * @param partitionPaths partition path list + */ + public static Map> getPartitions(List partitionKey, List partitionPaths) + { + Map> result = new HashMap<>(); + if (partitionPaths.isEmpty() || partitionKey.isEmpty()) { + return result; + } + // try to infer hive style + boolean hiveStylePartition = HIVE_PARTITION_NAME_PATTERN.matcher(partitionPaths.get(0).split(Path.SEPARATOR)[0]).matches(); + for (String partitionPath : partitionPaths) { + String[] pathParts = partitionPath.split(Path.SEPARATOR); + Map partitionMapping = new LinkedHashMap<>(); + if (hiveStylePartition) { + Arrays.stream(pathParts).forEach(p -> { + String[] keyValue = p.split("="); + if (keyValue.length == 2) { + partitionMapping.put(keyValue[0], keyValue[1]); + } + }); + } + else { + for (int i = 0; i < partitionKey.size(); i++) { + partitionMapping.put(partitionKey.get(i), pathParts[i]); + } + } + result.put(partitionPath, partitionMapping); + } + return result; + } + public static List extractPartitionValues(String partitionName, Optional> partitionColumnNames) + { + boolean hiveStylePartition = HIVE_PARTITION_NAME_PATTERN.matcher(partitionName).matches(); + if (!hiveStylePartition) { + if (!partitionColumnNames.isPresent() || partitionColumnNames.get().size() == 1) { + return ImmutableList.of(partitionName); + } + else { + String[] partitionValues = partitionName.split(Path.SEPARATOR); + checkArgument( + partitionValues.length == partitionColumnNames.get().size(), + "Invalid partition spec: {partitionName: %s, partitionColumnNames: %s}", + partitionName, + partitionColumnNames.get()); + return Arrays.asList(partitionValues); + } + } + + return MetastoreUtil.extractPartitionValues(partitionName, partitionColumnNames); + } + + private List prunePartitions( + TupleDomain partitionPredicate, + List partitionColumnHandles, + Map> candidatePartitionPaths) + { + return candidatePartitionPaths.entrySet().stream().filter(f -> { + Map partitionMapping = f.getValue(); + return partitionMapping + .entrySet() + .stream() + .allMatch(p -> evaluatePartitionPredicate(partitionPredicate, partitionColumnHandles, p.getValue(), p.getKey())); + }).map(entry -> entry.getKey()).collect(Collectors.toList()); + } + + private boolean evaluatePartitionPredicate( + TupleDomain partitionPredicate, + List partitionColumnHandles, + String partitionPathValue, + String partitionName) + { + Optional columnHandleOpt = + partitionColumnHandles.stream().filter(f -> f.getName().equals(partitionName)).findFirst(); + if (columnHandleOpt.isPresent()) { + Domain domain = getDomain(columnHandleOpt.get(), partitionPathValue); + if (!partitionPredicate.getDomains().isPresent()) { + return true; + } + Domain columnPredicate = partitionPredicate.getDomains().get().get(partitionName); + // no predicate on current partitionName + if (columnPredicate == null) { + return true; + } + + // For null partition, hive will produce a default value for current partition. + if (partitionPathValue.equals("default")) { + return true; + } + return !columnPredicate.intersect(domain).isNone(); + } + else { + // Should not happen + throw new IllegalArgumentException(format("Mismatched partition information found," + + " partition: %s from Hudi metadataTable is not included by the partitions from HMS: %s", + partitionName, partitionColumnHandles.stream().map(f -> f.getName()).collect(Collectors.joining(",")))); + } + } + + private Domain getDomain(HudiColumnHandle columnHandle, String partitionValue) + { + Type type = columnHandle.getHiveType().getType(typeManager); + if (partitionValue == null) { + return Domain.onlyNull(type); + } + try { + switch (columnHandle.getHiveType().getTypeSignature().getBase()) { + case StandardTypes.TINYINT: + case StandardTypes.SMALLINT: + case StandardTypes.INTEGER: + case StandardTypes.BIGINT: + Long intValue = parseLong(partitionValue); + return Domain.create(ValueSet.of(type, intValue), false); + case StandardTypes.REAL: + Long realValue = (long) floatToRawIntBits(parseFloat(partitionValue)); + return Domain.create(ValueSet.of(type, realValue), false); + case StandardTypes.DOUBLE: + Long doubleValue = doubleToRawLongBits(parseDouble(partitionValue)); + return Domain.create(ValueSet.of(type, doubleValue), false); + case StandardTypes.VARCHAR: + case StandardTypes.VARBINARY: + Slice sliceValue = utf8Slice(partitionValue); + return Domain.create(ValueSet.of(type, sliceValue), false); + case StandardTypes.DATE: + Long dateValue = LocalDate.parse(partitionValue, java.time.format.DateTimeFormatter.ISO_LOCAL_DATE).toEpochDay(); + return Domain.create(ValueSet.of(type, dateValue), false); + case StandardTypes.TIMESTAMP: + Long timestampValue = Timestamp.valueOf(partitionValue).getTime(); + return Domain.create(ValueSet.of(type, timestampValue), false); + case StandardTypes.BOOLEAN: + Boolean booleanValue = Boolean.valueOf(partitionValue); + return Domain.create(ValueSet.of(type, booleanValue), false); + default: + throw new PrestoException(HUDI_INVALID_PARTITION_VALUE, format( + "partition data type '%s' is unsupported for partition key: %s", + columnHandle.getHiveType(), + columnHandle.getName())); + } + } + catch (IllegalArgumentException e) { + throw new PrestoException(HUDI_INVALID_PARTITION_VALUE, format( + "Invalid partition value '%s' for %s partition key: %s", + partitionValue, + type.getDisplayName(), + columnHandle.getName())); + } + } + + private List prunePartitionByMetaStore( + ExtendedHiveMetastore metastore, + MetastoreContext metastoreContext, + String schemaName, + String tableName, + List partitionColumns, + TupleDomain tupleDomain) + { Map partitionPredicate = new HashMap<>(); Map domains = tupleDomain.getDomains().orElse(ImmutableMap.of()); diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPredicates.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPredicates.java new file mode 100644 index 000000000000..5c80422fff70 --- /dev/null +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiPredicates.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.hudi; + +import com.facebook.presto.common.predicate.Domain; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.spi.ColumnHandle; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class HudiPredicates +{ + private final TupleDomain regularColumnPredicates; + + public static HudiPredicates from(TupleDomain predicate) + { + Map partitionColumnPredicates = new HashMap<>(); + Map regularColumnPredicates = new HashMap<>(); + + Optional> domains = predicate.getDomains(); + domains.ifPresent(columnHandleDomainMap -> columnHandleDomainMap.forEach((key, value) -> { + HudiColumnHandle columnHandle = (HudiColumnHandle) key; + if (columnHandle.getColumnType() == HudiColumnHandle.ColumnType.PARTITION_KEY) { + partitionColumnPredicates.put(columnHandle, value); + } + else { + regularColumnPredicates.put(columnHandle, value); + } + })); + + return new HudiPredicates(TupleDomain.withColumnDomains(regularColumnPredicates)); + } + + private HudiPredicates(TupleDomain regularColumnPredicates) + { + this.regularColumnPredicates = regularColumnPredicates; + } + + public TupleDomain getRegularColumnPredicates() + { + return regularColumnPredicates; + } +} diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSessionProperties.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSessionProperties.java index a598f19c1c6e..21aae189defc 100644 --- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSessionProperties.java +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSessionProperties.java @@ -31,6 +31,7 @@ import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty; import static com.facebook.presto.spi.session.PropertyMetadata.dataSizeProperty; +import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; import static java.lang.String.format; public class HudiSessionProperties @@ -47,6 +48,7 @@ public class HudiSessionProperties private static final String STANDARD_SPLIT_WEIGHT_SIZE = "standard_split_weight_size"; private static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; public static final String READ_MASKED_VALUE_ENABLED = "read_null_masked_parquet_encrypted_value_enabled"; + private static final String HOODIE_FILESYSTEM_VIEW_SPILLABLE_DIR = "hoodie_filesystem_view_spillable_dir"; @Inject public HudiSessionProperties(HiveClientConfig hiveClientConfig, HudiConfig hudiConfig) @@ -107,6 +109,11 @@ public HudiSessionProperties(HiveClientConfig hiveClientConfig, HudiConfig hudiC READ_MASKED_VALUE_ENABLED, "Return null when access is denied for an encrypted parquet column", hiveClientConfig.getReadNullMaskedParquetEncryptedValue(), + false), + stringProperty( + HOODIE_FILESYSTEM_VIEW_SPILLABLE_DIR, + "Path on local storage to use, when file system view is held in a spillable map.", + "/tmp/", false)); } @@ -154,4 +161,9 @@ public static boolean getReadNullMaskedParquetEncryptedValue(ConnectorSession se { return session.getProperty(READ_MASKED_VALUE_ENABLED, Boolean.class); } + + public static String getHoodieFilesystemViewSpillableDir(ConnectorSession session) + { + return session.getProperty(HOODIE_FILESYSTEM_VIEW_SPILLABLE_DIR, String.class); + } } diff --git a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSplitManager.java b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSplitManager.java index aa7b80eaa703..39679e9883f3 100644 --- a/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSplitManager.java +++ b/presto-hudi/src/main/java/com/facebook/presto/hudi/HudiSplitManager.java @@ -31,6 +31,7 @@ import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; @@ -41,6 +42,7 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieTableQueryType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -52,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues; import static com.facebook.presto.hudi.HudiErrorCode.HUDI_FILESYSTEM_ERROR; @@ -98,24 +101,57 @@ public ConnectorSplitSource getSplits( HudiTableLayoutHandle layout = (HudiTableLayoutHandle) layoutHandle; HudiTableHandle table = layout.getTable(); + // Load Hudi metadata + ExtendedFileSystem fs = getFileSystem(session, table); + boolean hudiMetadataTableEnabled = isHudiMetadataTableEnabled(session); + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(hudiMetadataTableEnabled).build(); + Configuration conf = fs.getConf(); + HoodieTableMetaClient metaClient = HoodieTableMetaClient + .builder() + .setConf(conf) + .setBasePath(table.getPath()) + .build(); + // Retrieve and prune partitions - List partitions = hudiPartitionManager.getEffectivePartitions(session, metastore, table.getSchemaName(), table.getTableName(), layout.getTupleDomain()); + List partitions = hudiPartitionManager.getEffectivePartitions(session, metastore, metaClient, table.getSchemaName(), table.getTableName(), layout.getTupleDomain()); if (partitions.isEmpty()) { return new FixedSplitSource(ImmutableList.of()); } - // Load Hudi metadata - ExtendedFileSystem fs = getFileSystem(session, table); - HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(isHudiMetadataTableEnabled(session)).build(); - Configuration conf = fs.getConf(); - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(table.getPath()).build(); + // load timeline HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String timestamp = timeline.lastInstant().map(HoodieInstant::getTimestamp).orElse(null); if (timestamp == null) { // no completed instant for current table return new FixedSplitSource(ImmutableList.of()); } + // prepare splits HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf); + // if metadata table enabled, support dataskipping + if (hudiMetadataTableEnabled) { + MetastoreContext metastoreContext = toMetastoreContext(session); + Optional
hiveTableOpt = metastore.getTable(metastoreContext, table.getSchemaName(), table.getTableName()); + Verify.verify(hiveTableOpt.isPresent()); + HudiFileSkippingManager hudiFileSkippingManager = new HudiFileSkippingManager( + partitions, + HudiSessionProperties.getHoodieFilesystemViewSpillableDir(session), + engineContext, + metaClient, + getQueryType(hiveTableOpt.get().getStorage().getStorageFormat().getInputFormat()), + Optional.empty()); + ImmutableList.Builder splitsBuilder = ImmutableList.builder(); + Map hudiPartitionMap = getHudiPartitions(hiveTableOpt.get(), layout, partitions); + hudiFileSkippingManager.listQueryFiles(layout.getTupleDomain()) + .entrySet() + .stream() + .flatMap(entry -> entry.getValue().stream().map(fileSlice -> createHudiSplit(table, fileSlice, timestamp, hudiPartitionMap.get(entry.getKey()), splitWeightProvider))) + .filter(Optional::isPresent) + .map(Optional::get) + .forEach(splitsBuilder::add); + List splitsList = splitsBuilder.build(); + return splitsList.isEmpty() ? new FixedSplitSource(ImmutableList.of()) : new FixedSplitSource(splitsList); + } + HoodieTableFileSystemView fsView = createInMemoryFileSystemViewWithTimeline(engineContext, metaClient, metadataConfig, timeline); // Construct Presto splits @@ -179,6 +215,24 @@ private Optional createHudiSplit( splitWeightProvider.calculateSplitWeight(sizeInBytes))); } + private Map getHudiPartitions(Table table, HudiTableLayoutHandle tableLayout, List partitions) + { + List partitionColumnNames = table.getPartitionColumns().stream().map(f -> f.getName()).collect(Collectors.toList()); + + Map> partitionMap = HudiPartitionManager + .getPartitions(partitionColumnNames, partitions); + if (partitions.size() == 1 && partitions.get(0).isEmpty()) { + // non-partitioned + return ImmutableMap.of(partitions.get(0), new HudiPartition(partitions.get(0), ImmutableList.of(), ImmutableMap.of(), table.getStorage(), tableLayout.getDataColumns())); + } + ImmutableMap.Builder builder = ImmutableMap.builder(); + partitionMap.entrySet().stream().map(entry -> { + List partitionValues = HudiPartitionManager.extractPartitionValues(entry.getKey(), Optional.of(partitionColumnNames)); + return new HudiPartition(entry.getKey(), partitionValues, entry.getValue(), table.getStorage(), fromDataColumns(table.getDataColumns())); + }).forEach(p -> builder.put(p.getName(), p)); + return builder.build(); + } + private static HudiPartition getHudiPartition(ExtendedHiveMetastore metastore, MetastoreContext context, HudiTableLayoutHandle tableLayout, String partitionName) { String databaseName = tableLayout.getTable().getSchemaName(); @@ -220,4 +274,21 @@ private static HudiSplitWeightProvider createSplitWeightProvider(ConnectorSessio } return HudiSplitWeightProvider.uniformStandardWeightProvider(); } + + public static HoodieTableQueryType getQueryType(String inputFormat) + { + // TODO support incremental query + switch (inputFormat) { + case "org.apache.hudi.hadoop.HoodieParquetInputFormat": + case "com.uber.hoodie.hadoop.HoodieInputFormat": + // cow table/ mor ro table + return HoodieTableQueryType.READ_OPTIMIZED; + case "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat": + case "com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat": + // mor rt table + return HoodieTableQueryType.SNAPSHOT; + default: + throw new IllegalArgumentException(String.format("failed to infer query type for current inputFormat: %s", inputFormat)); + } + } } diff --git a/presto-hudi/src/test/java/com/facebook/presto/hudi/AbstractHudiDistributedQueryTestBase.java b/presto-hudi/src/test/java/com/facebook/presto/hudi/AbstractHudiDistributedQueryTestBase.java new file mode 100644 index 000000000000..52659e78e3ed --- /dev/null +++ b/presto-hudi/src/test/java/com/facebook/presto/hudi/AbstractHudiDistributedQueryTestBase.java @@ -0,0 +1,296 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.hudi; + +import com.facebook.presto.Session; +import com.facebook.presto.hive.HdfsConfiguration; +import com.facebook.presto.hive.HdfsConfigurationInitializer; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.HiveColumnConverterProvider; +import com.facebook.presto.hive.HiveHdfsConfiguration; +import com.facebook.presto.hive.HiveType; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.metastore.Column; +import com.facebook.presto.hive.metastore.Database; +import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; +import com.facebook.presto.hive.metastore.MetastoreContext; +import com.facebook.presto.hive.metastore.PrestoTableType; +import com.facebook.presto.hive.metastore.PrincipalPrivileges; +import com.facebook.presto.hive.metastore.Storage; +import com.facebook.presto.hive.metastore.StorageFormat; +import com.facebook.presto.hive.metastore.Table; +import com.facebook.presto.hive.metastore.file.FileHiveMetastore; +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.security.PrincipalType; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Streams; +import com.google.common.io.Resources; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.testng.annotations.AfterClass; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static com.facebook.presto.hive.HiveType.HIVE_BINARY; +import static com.facebook.presto.hive.HiveType.HIVE_BOOLEAN; +import static com.facebook.presto.hive.HiveType.HIVE_DATE; +import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE; +import static com.facebook.presto.hive.HiveType.HIVE_FLOAT; +import static com.facebook.presto.hive.HiveType.HIVE_INT; +import static com.facebook.presto.hive.HiveType.HIVE_LONG; +import static com.facebook.presto.hive.HiveType.HIVE_STRING; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.nio.file.Files.createDirectories; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + +public abstract class AbstractHudiDistributedQueryTestBase + extends AbstractTestQueryFramework +{ + public static final String HUDI_CATALOG = "hudi"; + + public static final String HUDI_SCHEMA = "testing"; // Schema in Hive which has test hudi tables + + protected static ExtendedHiveMetastore metastore; + + private static final String OWNER_PUBLIC = "public"; + protected static final MetastoreContext METASTORE_CONTEXT = new MetastoreContext("test_user", "test_queryId", Optional.empty(), Optional.empty(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER); + private static final PrincipalPrivileges PRINCIPAL_PRIVILEGES = new PrincipalPrivileges(ImmutableMultimap.of(), ImmutableMultimap.of()); + + private static final StorageFormat STORAGE_FORMAT_COPY_ON_WRITE = StorageFormat.create( + ParquetHiveSerDe.class.getName(), + HoodieParquetInputFormat.class.getName(), + MapredParquetOutputFormat.class.getName()); + private static final StorageFormat STORAGE_FORMAT_MERGE_ON_READ = StorageFormat.create( + ParquetHiveSerDe.class.getName(), + HoodieParquetRealtimeInputFormat.class.getName(), + MapredParquetOutputFormat.class.getName()); + + // spark.sql( + // """create table data_partition_prune + // |(id int, comb int, col0 int, col1 bigint, col2 float, col3 double, + // | col4 string, col5 date, col6 boolean, col7 binary, year int, month int, day int) + // | using hudi + // | partitioned by (year,month,day) + // | options( + // | type='cow', primaryKey='id', preCombineField='comb', + // | hoodie.metadata.index.column.stats.enable = "true", + // | hoodie.metadata.index.column.stats.file.group.count = "1", + // | hoodie.metadata.index.column.stats.column.list = 'col0,col3,col4,col5', + // | 'hoodie.metadata.enable'='true')""".stripMargin) + // + // spark.sql( + // s""" + // | insert into data_partition_prune values + // | (1,1,99,1111111,101.01,1001.0001,'x000001','2021-12-25',true,'a01',2022, 11, 12), + // | (2,2,99,1111111,102.02,1002.0002,'x000002','2021-12-25',true,'a02',2022, 10, 30), + // | (3,3,99,1111111,103.03,1003.0003,'x000003','2021-12-25',false,'a03',2021, 10, 11), + // | (4,4,99,1111111,104.04,1004.0004,'x000004','2021-12-26',true,'a04',2021, 11, 12) + // |""".stripMargin) + public static final List DATA_COLUMNS = ImmutableList.of( + column("id", HIVE_INT), + column("comb", HIVE_INT), + column("col0", HIVE_INT), + column("col1", HIVE_LONG), + column("col2", HIVE_FLOAT), + column("col3", HIVE_DOUBLE), + column("col4", HIVE_STRING), + column("col5", HIVE_DATE), + column("col6", HIVE_BOOLEAN), + column("col7", HIVE_BINARY)); + + public static final List PARTITION_COLUMNS = ImmutableList.of(column("year", HIVE_INT), column("month", HIVE_INT), column("day", HIVE_INT)); + public static final List HUDI_META_COLUMNS = ImmutableList.of( + column("_hoodie_commit_time", HiveType.HIVE_STRING), + column("_hoodie_commit_seqno", HiveType.HIVE_STRING), + column("_hoodie_record_key", HiveType.HIVE_STRING), + column("_hoodie_partition_path", HiveType.HIVE_STRING), + column("_hoodie_file_name", HiveType.HIVE_STRING)); + + /** + * List of tables present in the test resources directory. + * used to test dataskipping/partition prune. + */ + protected static final String HUDI_SKIPPING_TABLE = "data_partition_prune"; + protected static final String HUDI_SKIPPING_TABLE_NON_HIVE_STYLE = "data_partition_prune_non_hive_style_partition"; + protected static ConnectorSession connectorSession; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createHudiQueryRunner(ImmutableMap.of()); + } + + @AfterClass + public void deleteTestHudiTables() + { + QueryRunner queryRunner = getQueryRunner(); + if (queryRunner != null) { + // Remove the test hudi tables from HMS + metastore.dropTable(METASTORE_CONTEXT, HUDI_SCHEMA, HUDI_SKIPPING_TABLE, false); + metastore.dropTable(METASTORE_CONTEXT, HUDI_SCHEMA, HUDI_SKIPPING_TABLE_NON_HIVE_STYLE, false); + } + } + + protected static String getTablePath(String tableName, Path dataDir) + { + return "file://" + dataDir.resolve(tableName); + } + + private static DistributedQueryRunner createHudiQueryRunner(Map extraProperties) + throws Exception + { + Session session = testSessionBuilder() + .setCatalog(HUDI_CATALOG) + .setSchema(HUDI_SCHEMA) + .setCatalogSessionProperty(HUDI_CATALOG, "hudi_metadata_table_enabled", "true") + .setConnectionProperty(new ConnectorId("hudi"), "hudi_metadata_table_enabled", "true") + .build(); + + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) + .setExtraProperties(extraProperties) + .build(); + + // setup file metastore + Path catalogDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("catalog"); + metastore = createFileHiveMetastore(catalogDirectory.toString()); + // create database + Database database = Database.builder() + .setDatabaseName(HUDI_SCHEMA) + .setOwnerName(OWNER_PUBLIC) + .setOwnerType(PrincipalType.ROLE) + .build(); + metastore.createDatabase(METASTORE_CONTEXT, database); + + Path testingDataDir = queryRunner.getCoordinator().getDataDirectory().resolve("data"); + + try (InputStream stream = Resources.getResource("hudi-skipping-schema-data.zip").openStream()) { + unzip(stream, testingDataDir); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + + // Create the test hudi tables for dataSkipping/partition prune in HMS + registerHudiTableInHMS(HoodieTableType.COPY_ON_WRITE, HUDI_SKIPPING_TABLE, testingDataDir, Streams.concat(HUDI_META_COLUMNS.stream(), DATA_COLUMNS.stream()).collect(Collectors.toList())); + + // Create the test hudi tables with non_hive_style_partitions for dataSkipping/partition prune in HMS + registerHudiTableInHMS(HoodieTableType.COPY_ON_WRITE, HUDI_SKIPPING_TABLE_NON_HIVE_STYLE, testingDataDir, Streams.concat(HUDI_META_COLUMNS.stream(), DATA_COLUMNS.stream()).collect(Collectors.toList())); + + // Install a hudi connector catalog + queryRunner.installPlugin(new HudiPlugin("hudi", Optional.of(metastore))); + Map hudiProperties = ImmutableMap.builder().build(); + queryRunner.createCatalog(HUDI_CATALOG, "hudi", hudiProperties); + + connectorSession = queryRunner.getDefaultSession().toConnectorSession(new ConnectorId(session.getCatalog().get())); + + return queryRunner; + } + + private static ExtendedHiveMetastore createFileHiveMetastore(String catalogDir) + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration( + new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), + ImmutableSet.of(), hiveClientConfig); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); + return new FileHiveMetastore(hdfsEnvironment, catalogDir, "test"); + } + + private static Column column(String name, HiveType type) + { + return new Column(name, type, Optional.empty(), Optional.empty()); + } + + private static void registerHudiTableInHMS(HoodieTableType type, String name, Path dataDir, List dataColumns) + { + // ref: org.apache.hudi.hive.ddl.HMSDDLExecutor#createTable + Table table = Table.builder() + .setDatabaseName(HUDI_SCHEMA) + .setTableName(name) + .setTableType(PrestoTableType.EXTERNAL_TABLE) + .setOwner(OWNER_PUBLIC) + .setDataColumns(dataColumns) + .setPartitionColumns(PARTITION_COLUMNS) + .setParameters(ImmutableMap.of("serialization.format", "1", "EXTERNAL", "TRUE")) + .withStorage(buildingStorage(type, getTablePath(name, dataDir))) + .build(); + metastore.createTable(METASTORE_CONTEXT, table, PRINCIPAL_PRIVILEGES); + } + + private static Consumer buildingStorage(HoodieTableType tableType, String location) + { + return storageBuilder -> storageBuilder.setStorageFormat(getStorageFormat(tableType)).setLocation(location); + } + + private static StorageFormat getStorageFormat(HoodieTableType tableType) + { + if (tableType == HoodieTableType.COPY_ON_WRITE) { + return STORAGE_FORMAT_COPY_ON_WRITE; + } + if (tableType == HoodieTableType.MERGE_ON_READ) { + return STORAGE_FORMAT_MERGE_ON_READ; + } + throw new IllegalArgumentException("Unsupported table type " + tableType); + } + + private static void unzip(InputStream inputStream, Path destination) + throws IOException + { + createDirectories(destination); + try (ZipInputStream zipStream = new ZipInputStream(inputStream)) { + while (true) { + ZipEntry zipEntry = zipStream.getNextEntry(); + if (zipEntry == null) { + break; + } + + Path entryPath = destination.resolve(zipEntry.getName()); + if (zipEntry.isDirectory()) { + createDirectories(entryPath); + } + else { + createDirectories(entryPath.getParent()); + Files.copy(zipStream, entryPath, REPLACE_EXISTING); + } + zipStream.closeEntry(); + } + } + } +} diff --git a/presto-hudi/src/test/java/com/facebook/presto/hudi/TestHudiSkipping.java b/presto-hudi/src/test/java/com/facebook/presto/hudi/TestHudiSkipping.java new file mode 100644 index 000000000000..ecc7467d9412 --- /dev/null +++ b/presto-hudi/src/test/java/com/facebook/presto/hudi/TestHudiSkipping.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.hudi; + +import com.facebook.presto.common.predicate.Domain; +import com.facebook.presto.common.predicate.Range; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.predicate.ValueSet; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.hive.metastore.Table; +import com.facebook.presto.spi.ColumnHandle; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import net.jpountz.xxhash.XXHashFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; + +/** + * Integration tests for reading hudi tables. + */ +public class TestHudiSkipping + extends AbstractHudiDistributedQueryTestBase +{ + @Test + public void testPartitionPruneAndFileSkipping() + { + Arrays.stream(new String[] {HUDI_SKIPPING_TABLE, HUDI_SKIPPING_TABLE_NON_HIVE_STYLE}).forEach(f -> { + Optional
table = metastore.getTable(METASTORE_CONTEXT, HUDI_SCHEMA, f); + HudiPartitionManager hudiPartitionManager = new HudiPartitionManager(getQueryRunner().getMetadata().getFunctionAndTypeManager()); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient + .builder() + .setConf(new Configuration()) + .setBasePath(table.get().getStorage().getLocation()) + .build(); + // test partition prune by mdt + // create domain + List partitionColumns = HudiMetadata.fromPartitionColumns(table.get().getPartitionColumns()); + // year=2022 and month=11 and day=12 + TupleDomain predicate = TupleDomain.withColumnDomains(ImmutableMap.of( + partitionColumns.get(0), Domain.create(ValueSet.of(IntegerType.INTEGER, 2022L), false), + partitionColumns.get(1), Domain.create(ValueSet.of(IntegerType.INTEGER, 11L), false), + partitionColumns.get(2), Domain.create(ValueSet.of(IntegerType.INTEGER, 12L), false))); + + List parts = hudiPartitionManager.getEffectivePartitions(connectorSession, metastore, metaClient, table.get().getDatabaseName(), table.get().getTableName(), predicate); + + assertEquals(parts.size(), 1); + + // month = 11 + TupleDomain predicate1 = TupleDomain.withColumnDomains(ImmutableMap.of( + partitionColumns.get(1), Domain.create(ValueSet.of(IntegerType.INTEGER, 11L), false))); + + List parts1 = hudiPartitionManager.getEffectivePartitions(connectorSession, metastore, metaClient, table.get().getDatabaseName(), table.get().getTableName(), predicate1); + + assertEquals(parts1.size(), 2); + + // test file skipping + List dataColumns = HudiMetadata.fromDataColumns(table.get().getDataColumns()); + List partitions = hudiPartitionManager.getEffectivePartitions(connectorSession, metastore, metaClient, table.get().getDatabaseName(), table.get().getTableName(), TupleDomain.all()); + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + HudiFileSkippingManager hudiFileSkippingManager = new HudiFileSkippingManager( + partitions, + HudiSessionProperties.getHoodieFilesystemViewSpillableDir(connectorSession), + engineContext, + metaClient, + HudiSplitManager.getQueryType(table.get().getStorage().getStorageFormat().getInputFormat()), + Optional.empty()); + // case1: no filter + assertEquals(hudiFileSkippingManager.listQueryFiles(TupleDomain.all()).entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, Integer::sum), 4); + // case2: where col0 > 99, should skip all files + assertEquals(hudiFileSkippingManager + .listQueryFiles(TupleDomain.withColumnDomains(ImmutableMap.of(dataColumns.get(7), Domain.create(ValueSet.ofRanges(Range.greaterThan(IntegerType.INTEGER, 99L)), false)))) + .entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, Integer::sum), 0); + // case3: where col0<=99 and col3 > 1001.0002 + assertEquals(hudiFileSkippingManager + .listQueryFiles(TupleDomain.withColumnDomains(ImmutableMap.of( + dataColumns.get(7), Domain.create(ValueSet.ofRanges(Range.lessThanOrEqual(IntegerType.INTEGER, 99L)), false), + dataColumns.get(10), Domain.create(ValueSet.ofRanges(Range.greaterThan(DoubleType.DOUBLE, 1002.0002d)), false)))) + .entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, Integer::sum), 2); + }); + } + + @Test + public void testSkippingResult() + { + Arrays.stream(new String[] {HUDI_SKIPPING_TABLE, HUDI_SKIPPING_TABLE_NON_HIVE_STYLE}).forEach(f -> { + String testQuery = format("SELECT col0,col3,col4 FROM %s where year=2022 and month=11", f); + List expRows = new ArrayList<>(); + expRows.add("SELECT 99,cast(1001.0001 as double),'x000001'"); + String expResultsQuery = Joiner.on(" UNION ").join(expRows); + assertQuery(testQuery, expResultsQuery); + }); + } + + // should remove this function, once we bump hudi to 0.13.0. + // old hudi-presto-bundle has not include lz4 which is used by data-skipping. + private void shouldRemoved() + { + XXHashFactory.fastestInstance(); + } +} diff --git a/presto-hudi/src/test/resources/hudi-skipping-schema-data.zip b/presto-hudi/src/test/resources/hudi-skipping-schema-data.zip new file mode 100644 index 000000000000..70b7d9230a2f Binary files /dev/null and b/presto-hudi/src/test/resources/hudi-skipping-schema-data.zip differ