diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index a382954fd21f..9ee93719813a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -298,4 +298,9 @@ private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuratio return HoodieMetadataConfig.newBuilder().fromProperties(properties).build(); } + + @VisibleForTesting + public List getFilters() { + return filters; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index f4c301892e58..8571390f5309 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -127,7 +127,6 @@ public class HoodieTableSource implements private int[] requiredPos; private long limit; - private List filters; private List> requiredPartitions; @@ -146,25 +145,26 @@ public HoodieTableSource( List partitionKeys, String defaultPartName, Configuration conf, + @Nullable FileIndex fileIndex, @Nullable List> requiredPartitions, @Nullable int[] requiredPos, - @Nullable Long limit, - @Nullable List filters) { + @Nullable Long limit) { this.schema = schema; this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(); this.path = path; this.partitionKeys = partitionKeys; this.defaultPartName = defaultPartName; this.conf = conf; + this.fileIndex = fileIndex == null + ? FileIndex.instance(this.path, this.conf, this.tableRowType) + : fileIndex; this.requiredPartitions = requiredPartitions; this.requiredPos = requiredPos == null ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; - this.filters = filters == null ? Collections.emptyList() : filters; this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); - this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType); this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } @@ -215,7 +215,7 @@ public ChangelogMode getChangelogMode() { @Override public DynamicTableSource copy() { return new HoodieTableSource(schema, path, partitionKeys, defaultPartName, - conf, requiredPartitions, requiredPos, limit, filters); + conf, fileIndex, requiredPartitions, requiredPos, limit); } @Override @@ -225,8 +225,10 @@ public String asSummaryString() { @Override public Result applyFilters(List filters) { - this.filters = filters.stream().filter(ExpressionUtils::isSimpleCallExpression).collect(Collectors.toList()); - this.fileIndex.setFilters(this.filters); + List callExpressionFilters = filters.stream() + .filter(ExpressionUtils::isSimpleCallExpression) + .collect(Collectors.toList()); + this.fileIndex.setFilters(callExpressionFilters); // refuse all the filters now return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters)); } @@ -543,4 +545,9 @@ public FileStatus[] getReadFiles() { } return fileIndex.getFilesInPartitions(); } + + @VisibleForTesting + FileIndex getFileIndex() { + return fileIndex; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 924ce929b030..d8093793fccc 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -28,6 +28,9 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; @@ -40,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -47,6 +51,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -115,16 +120,7 @@ void testGetInputFormat() throws Exception { @Test void testGetTableAvroSchema() { - final String path = tempFile.getAbsolutePath(); - conf = TestConfigurations.getDefaultConf(path); - conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); - - HoodieTableSource tableSource = new HoodieTableSource( - TestConfigurations.TABLE_SCHEMA, - new Path(tempFile.getPath()), - Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), - "default-par", - conf); + HoodieTableSource tableSource = getEmptyStreamingSource(); assertNull(tableSource.getMetaClient(), "Streaming source with empty table path is allowed"); final String schemaFields = tableSource.getTableAvroSchema().getFields().stream() .map(Schema.Field::name) @@ -137,4 +133,31 @@ void testGetTableAvroSchema() { + "uuid,name,age,ts,partition"; assertThat(schemaFields, is(expected)); } + + @Test + void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() { + HoodieTableSource tableSource = getEmptyStreamingSource(); + ResolvedExpression mockExpression = new CallExpression( + BuiltInFunctionDefinitions.IN, + Collections.emptyList(), + TestConfigurations.ROW_DATA_TYPE); + List expectedFilters = Collections.singletonList(mockExpression); + tableSource.applyFilters(expectedFilters); + HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy(); + List actualFilters = copiedSource.getFileIndex().getFilters(); + assertEquals(expectedFilters, actualFilters); + } + + private HoodieTableSource getEmptyStreamingSource() { + final String path = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(path); + conf.setBoolean(FlinkOptions.READ_AS_STREAMING, true); + + return new HoodieTableSource( + TestConfigurations.TABLE_SCHEMA, + new Path(tempFile.getPath()), + Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), + "default-par", + conf); + } }