Skip to content

Commit

Permalink
Parquet partition pruning (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoody authored Nov 14, 2016
1 parent 06cbbf6 commit a3b7ad5
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -155,26 +155,7 @@ case class FileSourceScanExec(
false
}

@transient private lazy val selectedPartitions = {
val originalPartitions = relation.location.listFiles(partitionFilters)
val filteredPartitions = if (relation.location.rootPaths.isEmpty) {
originalPartitions
} else {
relation.fileFormat.filterPartitions(
dataFilters,
outputSchema,
relation.sparkSession.sparkContext.hadoopConfiguration,
originalPartitions.flatMap(_.files),
relation.location.rootPaths.head,
originalPartitions)
}
val totalFilesRaw = originalPartitions.map(_.files.size).sum
val totalFilesFiltered = filteredPartitions.map(_.files.size).sum
logInfo(s"Filtered down total number of partitions to ${filteredPartitions.size}"
+ s" from ${originalPartitions.size}, "
+ s"total number of files to ${totalFilesFiltered} from ${totalFilesRaw}")
filteredPartitions
}
@transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)

override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
Expand Down Expand Up @@ -453,23 +434,36 @@ case class FileSourceScanExec(
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val bucketed =
selectedPartitions.flatMap { p =>
p.files.map { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
val session = fsRelation.sparkSession
val partitionFiles = selectedPartitions.flatMap { partition =>
partition.files.map((_, partition.values))
}
val bucketed = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
val format = fsRelation.fileFormat

if (format.isSplitable(session, fsRelation.options, file.getPath)) {
val validSplits = format.getSplits(session, fsRelation.location, file,
dataFilters, schema, session.sessionState.newHadoopConf())
validSplits.map { split =>
val hosts = getBlockHosts(blockLocations, split.getStart, split.getLength)
PartitionedFile(values, filePath, split.getStart, split.getLength, hosts)
}
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(values, filePath, 0, file.getLen, hosts))
}

}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
.getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
}
val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
new FileScanRDD(session, readFile, filePartitions)
}

/**
Expand All @@ -484,34 +478,43 @@ case class FileSourceScanExec(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Seq[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
val defaultMaxSplitBytes =
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
val session = fsRelation.sparkSession
val defaultMaxSplitBytes = session.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = session.sessionState.conf.filesOpenCostInBytes
val defaultParallelism = session.sparkContext.defaultParallelism
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / defaultParallelism

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val partitionFiles = selectedPartitions.flatMap { partition =>
partition.files.map((_, partition.values))
}
val splitFiles = partitionFiles.flatMap { case (file, values) =>
val blockLocations = getBlockLocations(file)
val filePath = file.getPath.toUri.toString
val format = fsRelation.fileFormat

// If the format is splittable, attempt to split and filter the file.
if (format.isSplitable(session, fsRelation.options, file.getPath)) {
val validSplits = format.getSplits(session, fsRelation.location, file,
dataFilters, schema, session.sessionState.newHadoopConf())
validSplits.flatMap { split =>
val splitOffset = split.getStart
val end = splitOffset + split.getLength
(splitOffset until end by maxSplitBytes).map { offset =>
val remaining = end - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(blockLocations, offset, size)
PartitionedFile(
partition.values, file.getPath.toUri.toString, offset, size, hosts)
PartitionedFile(values, filePath, offset, size, hosts)
}
} else {
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(
partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
}
} else {
// Take the entire file as one partition.
val hosts = getBlockHosts(blockLocations, 0, file.getLen)
Seq(PartitionedFile(values, filePath, 0, file.getLen, hosts))
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

Expand Down Expand Up @@ -544,7 +547,7 @@ case class FileSourceScanExec(
}
closePartition()

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
new FileScanRDD(session, readFile, partitions)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileSplit

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -78,21 +79,6 @@ trait FileFormat {
false
}

/**
* Allow FileFormats to have a pluggable way to utilize pushed filters to eliminate partitions
* before execution. By default no pruning is performed and the original partitioning is
* preserved.
*/
def filterPartitions(
filters: Seq[Filter],
schema: StructType,
conf: Configuration,
allFiles: Seq[FileStatus],
root: Path,
partitions: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
partitions
}

/**
* Returns whether a file with `path` could be splitted or not.
*/
Expand All @@ -103,6 +89,19 @@ trait FileFormat {
false
}

/**
* For a file, return valid splits that may pass the given data filter.
*/
def getSplits(
sparkSession: SparkSession,
fileIndex: FileIndex,
fileStatus: FileStatus,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
Seq(new FileSplit(fileStatus.getPath, 0, fileStatus.getLen, Array.empty))
}

/**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.FileNotFoundException
import java.net.URI
import java.util.logging.{Logger => JLogger}

Expand Down Expand Up @@ -59,8 +60,8 @@ class ParquetFileFormat
with Logging
with Serializable {

// Attempt to cache parquet metadata
@transient @volatile private var cachedMetadata: ParquetMetadata = _
@transient private val cachedMetadata: mutable.LinkedHashMap[Path, ParquetMetadata] =
new mutable.LinkedHashMap[Path, ParquetMetadata]

override def shortName(): String = "parquet"

Expand Down Expand Up @@ -277,6 +278,94 @@ class ParquetFileFormat
true
}

override def getSplits(
sparkSession: SparkSession,
fileIndex: FileIndex,
fileStatus: FileStatus,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
if (filters.isEmpty || !sparkSession.sessionState.conf.parquetPartitionPruningEnabled) {
// Return immediately to save FileSystem overhead
super.getSplits(sparkSession, fileIndex, fileStatus, filters, schema, hadoopConf)
} else {
val filePath = fileStatus.getPath
val rootOption: Option[Path] = fileIndex.rootPaths
.find(root => filePath.toString.startsWith(root.toString))
val metadataOption = rootOption.flatMap { root =>
cachedMetadata.get(root).orElse(getMetadataForPath(filePath, root, hadoopConf))
.map { metadata =>
cachedMetadata.put(root, metadata)
metadata
}
}
// If the metadata exists, filter the splits.
// Otherwise, fall back to the default implementation.
metadataOption
.map(filterToSplits(fileStatus, _, rootOption.get, filters, schema, hadoopConf))
.getOrElse(super.getSplits(sparkSession, fileIndex, fileStatus,
filters, schema, hadoopConf))
}
}

private def filterToSplits(
fileStatus: FileStatus,
metadata: ParquetMetadata,
metadataRoot: Path,
filters: Seq[Filter],
schema: StructType,
hadoopConf: Configuration): Seq[FileSplit] = {
val metadataBlocks = metadata.getBlocks

// Ensure that the metadata has an entry for the file.
// If it does not, do not filter at this stage.
val metadataContainsPath = metadataBlocks.asScala.exists { bmd =>
new Path(metadataRoot, bmd.getPath) == fileStatus.getPath
}
if (!metadataContainsPath) {
log.warn(s"Found _metadata file for $metadataRoot," +
s" but no entries for blocks in ${fileStatus.getPath}. Retaining whole file.")
return Seq(new FileSplit(fileStatus.getPath, 0, fileStatus.getLen, Array.empty))
}

val parquetSchema = metadata.getFileMetaData.getSchema
val filter = FilterCompat.get(filters
.flatMap(ParquetFilters.createFilter(schema, _))
.reduce(FilterApi.and))
val filteredMetadata =
RowGroupFilter.filterRowGroups(filter, metadataBlocks, parquetSchema).asScala
filteredMetadata.flatMap { bmd =>
val bmdPath = new Path(metadataRoot, bmd.getPath)
val fsPath = fileStatus.getPath
if (bmdPath == fsPath) {
Some(new FileSplit(bmdPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty))
} else {
None
}
}
}

private def getMetadataForPath(
filePath: Path,
rootPath: Path,
conf: Configuration): Option[ParquetMetadata] = {
val fs = rootPath.getFileSystem(conf)
try {
val stat = fs.getFileStatus(rootPath)
// Mimic Parquet behavior. If given a directory, find the underlying _metadata file
// If given a single file, check the parent directory for a _metadata file
val directory = if (stat.isDirectory) stat.getPath else stat.getPath.getParent
val metadataFile = new Path(directory, ParquetFileWriter.PARQUET_METADATA_FILE)
val metadata =
ParquetFileReader.readFooter(conf, metadataFile, ParquetMetadataConverter.NO_FILTER)
Option(metadata)
} catch {
case notFound: FileNotFoundException =>
log.debug(s"No _metadata file found in root $rootPath")
None
}
}

override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
Expand Down Expand Up @@ -431,59 +520,6 @@ class ParquetFileFormat
sqlContext.sessionState.newHadoopConf(),
options)
}

override def filterPartitions(
filters: Seq[Filter],
schema: StructType,
conf: Configuration,
allFiles: Seq[FileStatus],
root: Path,
partitions: Seq[PartitionDirectory]): Seq[PartitionDirectory] = {
// Read the "_metadata" file if available, contains all block headers. On S3 better to grab
// all of the footers in a batch rather than having to read every single file just to get its
// footer.
allFiles.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
.map { stat =>
val metadata = getOrReadMetadata(conf, stat)
partitions.map { partition =>
filterByMetadata(filters, schema, conf, root, metadata, partition)
}.filterNot(_.files.isEmpty)
}.getOrElse(partitions)
}

private def filterByMetadata(
filters: Seq[Filter],
schema: StructType,
conf: Configuration,
root: Path,
metadata: ParquetMetadata,
partition: PartitionDirectory): PartitionDirectory = {
val blockMetadatas = metadata.getBlocks.asScala
val parquetSchema = metadata.getFileMetaData.getSchema
val conjunctiveFilter = filters
.flatMap(ParquetFilters.createFilter(schema, _))
.reduceOption(FilterApi.and)
conjunctiveFilter.map { conjunction =>
val filteredBlocks = RowGroupFilter.filterRowGroups(
FilterCompat.get(conjunction), blockMetadatas.asJava, parquetSchema).asScala.map { bmd =>
new Path(root, bmd.getPath).toString
}
PartitionDirectory(partition.values, partition.files.filter { f =>
filteredBlocks.contains(f.getPath.toString)
})
}.getOrElse(partition)
}

private def getOrReadMetadata(conf: Configuration, stat: FileStatus): ParquetMetadata = {
if (cachedMetadata == null) {
logInfo("Reading summary metadata into cache in ParquetFileFormat")
cachedMetadata = ParquetFileReader.readFooter(conf, stat, ParquetMetadataConverter.NO_FILTER)
} else {
logInfo("Using cached summary metadata")
}
cachedMetadata
}

}

object ParquetFileFormat extends Logging {
Expand Down
Loading

0 comments on commit a3b7ad5

Please sign in to comment.