Skip to content

Commit

Permalink
[HUDI-4353] Column stats data skipping for flink (#6026)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored Jul 3, 2022
1 parent bdf73b2 commit 47792a3
Show file tree
Hide file tree
Showing 30 changed files with 1,930 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -159,10 +160,14 @@ public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecor
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(),
instantTime, table, preppedRecords.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsertPrepped(context, writeHandle, instantTime, preppedRecords);
return postWrite(result, instantTime, table);
Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = preppedRecords.stream().parallel()
.collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
return preppedRecordsByFileId.values().stream().parallel().map(records -> {
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsertPrepped(context, writeHandle, instantTime, records);
return postWrite(result, instantTime, table);
}).flatMap(Collection::stream).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ private FlinkOptions() {
.noDefaultValue()
.withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'");

public static final ConfigOption<Boolean> READ_DATA_SKIPPING_ENABLED = ConfigOptions
.key("read.data.skipping.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
+ "skipping over files");

// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.hudi.configuration;

import org.apache.hudi.util.FlinkClientUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.util.FlinkClientUtil;

import java.util.Map;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.hudi.sink.meta;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;

import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.sink.utils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -29,6 +28,7 @@
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.table.format.FilePathUtils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ private Supplier<String> getActionString(String actionName, Object... actionPara
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------

/**
* The exception hook.
*/
public interface ExceptionHook {
void apply(String errMsg, Throwable t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,24 @@
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.source.stats.ColumnStatsIndices;
import org.apache.hudi.source.stats.ExpressionEvaluator;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -40,26 +50,34 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A file index which supports listing files efficiently through metadata table.
*
* <p>It caches the partition paths to avoid redundant look up.
*/
public class FileIndex {
private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);

private final Path path;
private final RowType rowType;
private final HoodieMetadataConfig metadataConfig;
private List<String> partitionPaths; // cache of partition paths
private final boolean dataSkippingEnabled;
private List<String> partitionPaths; // cache of partition paths
private List<ResolvedExpression> filters; // push down filters
private final boolean tableExists;

private FileIndex(Path path, Configuration conf) {
private FileIndex(Path path, Configuration conf, RowType rowType) {
this.path = path;
this.rowType = rowType;
this.metadataConfig = metadataConfig(conf);
this.dataSkippingEnabled = conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED);
this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
}

public static FileIndex instance(Path path, Configuration conf) {
return new FileIndex(path, conf);
public static FileIndex instance(Path path, Configuration conf, RowType rowType) {
return new FileIndex(path, conf, rowType);
}

/**
Expand Down Expand Up @@ -119,9 +137,17 @@ public FileStatus[] getFilesInPartitions() {
return new FileStatus[0];
}
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
FileStatus[] allFileStatus = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
partitions, "/tmp/")
.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
Set<String> candidateFiles = candidateFilesInMetadataTable(allFileStatus);
if (candidateFiles == null) {
// no need to filter by col stats or error occurs.
return allFileStatus;
}
return Arrays.stream(allFileStatus).parallel()
.filter(fileStatus -> candidateFiles.contains(fileStatus.getPath().getName()))
.toArray(FileStatus[]::new);
}

/**
Expand Down Expand Up @@ -159,10 +185,96 @@ public void setPartitionPaths(@Nullable Set<String> partitionPaths) {
}
}

/**
* Sets up pushed down filters.
*/
public void setFilters(List<ResolvedExpression> filters) {
if (filters.size() > 0) {
this.filters = new ArrayList<>(filters);
}
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

/**
* Computes pruned list of candidate base-files' names based on provided list of data filters.
* conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity)
* bearing "min", "max", "num_nulls" statistics for all columns.
*
* <p>NOTE: This method has to return complete set of candidate files, since only provided candidates will
* ultimately be scanned as part of query execution. Hence, this method has to maintain the
* invariant of conservatively including every base-file's name, that is NOT referenced in its index.
*
* <p>The {@code filters} must all be simple.
*
* @return list of pruned (data-skipped) candidate base-files' names
*/
@Nullable
private Set<String> candidateFilesInMetadataTable(FileStatus[] allFileStatus) {
// NOTE: Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
if (!metadataConfig.enabled() || !dataSkippingEnabled) {
validateConfig();
return null;
}
if (this.filters == null || this.filters.size() == 0) {
return null;
}
String[] referencedCols = ExpressionUtils.referencedColumns(filters);
if (referencedCols.length == 0) {
return null;
}
try {
final List<RowData> colStats = ColumnStatsIndices.readColumnStatsIndex(path.toString(), metadataConfig, referencedCols);
final Pair<List<RowData>, String[]> colStatsTable = ColumnStatsIndices.transposeColumnStatsIndex(colStats, referencedCols, rowType);
List<RowData> transposedColStats = colStatsTable.getLeft();
String[] queryCols = colStatsTable.getRight();
if (queryCols.length == 0) {
// the indexed columns have no intersection with the referenced columns, returns early
return null;
}
RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType, queryCols);

Set<String> allIndexedFileNames = transposedColStats.stream().parallel()
.map(row -> row.getString(0).toString())
.collect(Collectors.toSet());
Set<String> candidateFileNames = transposedColStats.stream().parallel()
.filter(row -> ExpressionEvaluator.filterExprs(filters, row, queryFields))
.map(row -> row.getString(0).toString())
.collect(Collectors.toSet());

// NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
// base-file: since it's bound to clustering, which could occur asynchronously
// at arbitrary point in time, and is not likely to be touching all the base files.
//
// To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
// files and all outstanding base-files, and make sure that all base files not
// represented w/in the index are included in the output of this method
Set<String> nonIndexedFileNames = Arrays.stream(allFileStatus)
.map(fileStatus -> fileStatus.getPath().getName()).collect(Collectors.toSet());
nonIndexedFileNames.removeAll(allIndexedFileNames);

candidateFileNames.addAll(nonIndexedFileNames);
return candidateFileNames;
} catch (Throwable throwable) {
LOG.warn("Read column stats for data skipping error", throwable);
return null;
}
}

private void validateConfig() {
if (dataSkippingEnabled && !metadataConfig.enabled()) {
LOG.warn("Data skipping requires Metadata Table to be enabled! "
+ "isMetadataTableEnabled = {}", metadataConfig.enabled());
}
}

/**
* Returns all the relative partition paths.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class IncrementalInputSplits implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class);
private final Configuration conf;
private final Path path;
private final RowType rowType;
private final long maxCompactionMemoryInBytes;
// for partition pruning
private final Set<String> requiredPartitions;
Expand All @@ -86,11 +88,13 @@ public class IncrementalInputSplits implements Serializable {
private IncrementalInputSplits(
Configuration conf,
Path path,
RowType rowType,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitions,
boolean skipCompaction) {
this.conf = conf;
this.path = path;
this.rowType = rowType;
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitions = requiredPartitions;
this.skipCompaction = skipCompaction;
Expand Down Expand Up @@ -167,7 +171,7 @@ public Result inputSplits(

if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf);
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
if (this.requiredPartitions != null) {
// apply partition push down
fileIndex.setPartitionPaths(this.requiredPartitions);
Expand Down Expand Up @@ -349,6 +353,7 @@ public static Result instance(List<MergeOnReadInputSplit> inputSplits, String en
public static class Builder {
private Configuration conf;
private Path path;
private RowType rowType;
private long maxCompactionMemoryInBytes;
// for partition pruning
private Set<String> requiredPartitions;
Expand All @@ -368,6 +373,11 @@ public Builder path(Path path) {
return this;
}

public Builder rowType(RowType rowType) {
this.rowType = rowType;
return this;
}

public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) {
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
return this;
Expand All @@ -384,7 +394,8 @@ public Builder skipCompaction(boolean skipCompaction) {
}

public IncrementalInputSplits build() {
return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path),
return new IncrementalInputSplits(
Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType),
this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,6 +100,7 @@ public class StreamReadMonitoringFunction
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
RowType rowType,
long maxCompactionMemoryInBytes,
@Nullable Set<String> requiredPartitionPaths) {
this.conf = conf;
Expand All @@ -107,6 +109,7 @@ public StreamReadMonitoringFunction(
this.incrementalInputSplits = IncrementalInputSplits.builder()
.conf(conf)
.path(path)
.rowType(rowType)
.maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
.requiredPartitions(requiredPartitionPaths)
.skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT))
Expand Down
Loading

0 comments on commit 47792a3

Please sign in to comment.