Skip to content

Commit

Permalink
[HUDI-5147] Flink data skipping doesn't work when HepPlanner calls co…
Browse files Browse the repository at this point in the history
…py()… (apache#7113)

* [HUDI-5147] Flink data skipping doesn't work when HepPlanner calls copy() on HoodieTableSource
  • Loading branch information
trushev authored and Alexey Kudinkin committed Dec 14, 2022
1 parent db395f4 commit 15249f2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,9 @@ private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuratio

return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
}

@VisibleForTesting
public List<ResolvedExpression> getFilters() {
return filters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public class HoodieTableSource implements

private int[] requiredPos;
private long limit;
private List<ResolvedExpression> filters;

private List<Map<String, String>> requiredPartitions;

Expand All @@ -145,25 +144,26 @@ public HoodieTableSource(
List<String> partitionKeys,
String defaultPartName,
Configuration conf,
@Nullable FileIndex fileIndex,
@Nullable List<Map<String, String>> requiredPartitions,
@Nullable int[] requiredPos,
@Nullable Long limit,
@Nullable List<ResolvedExpression> filters) {
@Nullable Long limit) {
this.schema = schema;
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
this.path = path;
this.partitionKeys = partitionKeys;
this.defaultPartName = defaultPartName;
this.conf = conf;
this.fileIndex = fileIndex == null
? FileIndex.instance(this.path, this.conf, this.tableRowType)
: fileIndex;
this.requiredPartitions = requiredPartitions;
this.requiredPos = requiredPos == null
? IntStream.range(0, this.tableRowType.getFieldCount()).toArray()
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.filters = filters == null ? Collections.emptyList() : filters;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType);
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}

Expand Down Expand Up @@ -214,7 +214,7 @@ public ChangelogMode getChangelogMode() {
@Override
public DynamicTableSource copy() {
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
conf, requiredPartitions, requiredPos, limit, filters);
conf, fileIndex, requiredPartitions, requiredPos, limit);
}

@Override
Expand All @@ -224,8 +224,10 @@ public String asSummaryString() {

@Override
public Result applyFilters(List<ResolvedExpression> filters) {
this.filters = filters.stream().filter(ExpressionUtils::isSimpleCallExpression).collect(Collectors.toList());
this.fileIndex.setFilters(this.filters);
List<ResolvedExpression> callExpressionFilters = filters.stream()
.filter(ExpressionUtils::isSimpleCallExpression)
.collect(Collectors.toList());
this.fileIndex.setFilters(callExpressionFilters);
// refuse all the filters now
return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters));
}
Expand Down Expand Up @@ -513,4 +515,9 @@ public FileStatus[] getReadFiles() {
}
return fileIndex.getFilesInPartitions();
}

@VisibleForTesting
FileIndex getFileIndex() {
return fileIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
Expand All @@ -40,13 +43,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand Down Expand Up @@ -115,16 +120,7 @@ void testGetInputFormat() throws Exception {

@Test
void testGetTableAvroSchema() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);

HoodieTableSource tableSource = new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA,
new Path(tempFile.getPath()),
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
"default-par",
conf);
HoodieTableSource tableSource = getEmptyStreamingSource();
assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed");
final String schemaFields = tableSource.getTableAvroSchema().getFields().stream()
.map(Schema.Field::name)
Expand All @@ -137,4 +133,31 @@ void testGetTableAvroSchema() {
+ "uuid,name,age,ts,partition";
assertThat(schemaFields, is(expected));
}

@Test
void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() {
HoodieTableSource tableSource = getEmptyStreamingSource();
ResolvedExpression mockExpression = new CallExpression(
BuiltInFunctionDefinitions.IN,
Collections.emptyList(),
TestConfigurations.ROW_DATA_TYPE);
List<ResolvedExpression> expectedFilters = Collections.singletonList(mockExpression);
tableSource.applyFilters(expectedFilters);
HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
List<ResolvedExpression> actualFilters = copiedSource.getFileIndex().getFilters();
assertEquals(expectedFilters, actualFilters);
}

private HoodieTableSource getEmptyStreamingSource() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true);

return new HoodieTableSource(
TestConfigurations.TABLE_SCHEMA,
new Path(tempFile.getPath()),
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
"default-par",
conf);
}
}

0 comments on commit 15249f2

Please sign in to comment.