Skip to content

Commit

Permalink
spark filescan filter pushdown
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
zenghua committed Sep 26, 2023
1 parent 534b590 commit 8bc7afe
Showing 1 changed file with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,24 @@ case class LakeSoulScanBuilder(sparkSession: SparkSession,
parquetFilters.convertibleFilters(pushedDataFilters).toArray
}

// override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
// val (partitionFilters, dataFilters) =
// DataSourceUtils.getPartitionFiltersAndDataFilters(fileIndex.partitionSchema, filters)
// this.partitionFilters = partitionFilters
// this.dataFilters = dataFilters
// val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
// for (filterExpr <- dataFilters) {
// val translated = DataSourceStrategy.translateFilter(filterExpr, true)
// if (translated.nonEmpty) {
// translatedFilters += translated.get
// }
// }
// pushedDataFilters = pushDataFilters(translatedFilters.toArray)
// Seq.empty
// }
override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
val (partitionFilters, dataFilters) =
DataSourceUtils.getPartitionFiltersAndDataFilters(fileIndex.partitionSchema, filters)
this.partitionFilters = partitionFilters
this.dataFilters = dataFilters
val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
val remainingExpressions = mutable.ArrayBuffer.empty[Expression]
for (filterExpr <- dataFilters) {
val translated = DataSourceStrategy.translateFilter(filterExpr, true)
if (translated.nonEmpty) {
translatedFilters += translated.get
} else {
remainingExpressions += filterExpr
}
}
pushedDataFilters = pushDataFilters(translatedFilters.toArray)
remainingExpressions
}

override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = dataFilters

Expand Down

0 comments on commit 8bc7afe

Please sign in to comment.