Skip to content

Commit

Permalink
[SPARK-3091] [SQL] Add support for caching metadata on Parquet files
Browse files Browse the repository at this point in the history
For larger Parquet files, reading the file footers (which is done in parallel on up to 5 threads) and HDFS block locations (which is serial) can take multiple seconds. We can add an option to cache this data within FilteringParquetInputFormat. Unfortunately ParquetInputFormat only caches footers within each instance of ParquetInputFormat, not across them.

Note: this PR leaves this turned off by default for 1.1, but I believe it's safe to turn it on after. The keys in the hash maps are FileStatus objects that include a modification time, so this will work fine if files are modified. The location cache could become invalid if files have moved within HDFS, but that's rare so I just made it invalidate entries every 15 minutes.

Author: Matei Zaharia <matei@databricks.com>

Closes apache#2005 from mateiz/parquet-cache and squashes the following commits:

dae8efe [Matei Zaharia] Bug fix
c71e9ed [Matei Zaharia] Handle empty statuses directly
22072b0 [Matei Zaharia] Use Guava caches and add a config option for caching metadata
8fb56ce [Matei Zaharia] Cache file block locations too
453bd21 [Matei Zaharia] Bug fix
4094df6 [Matei Zaharia] First attempt at caching Parquet footers
  • Loading branch information
mateiz authored and conviva-zz committed Sep 4, 2014
1 parent c85d920 commit c07fbe0
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 13 deletions.
1 change: 1 addition & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"

// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@

package org.apache.spark.sql.parquet

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try

import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
import java.util.{Date, List => JList}
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try

import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
Expand All @@ -41,7 +42,7 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
Expand Down Expand Up @@ -96,6 +97,11 @@ case class ParquetTableScan(
ParquetFilters.serializeFilterExpressions(columnPruningPred, conf)
}

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.set(
SQLConf.PARQUET_CACHE_METADATA,
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))

sc.newAPIHadoopRDD(
conf,
classOf[FilteringParquetRowInputFormat],
Expand Down Expand Up @@ -323,10 +329,40 @@ private[parquet] class FilteringParquetRowInputFormat
}

override def getFooters(jobContext: JobContext): JList[Footer] = {
import FilteringParquetRowInputFormat.footerCache

if (footers eq null) {
val conf = ContextUtil.getConfiguration(jobContext)
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
if (statuses.isEmpty) {
footers = Collections.emptyList[Footer]
} else if (!cacheMetadata) {
// Read the footers from HDFS
footers = getFooters(conf, statuses)
} else {
// Read only the footers that are not in the footerCache
val foundFooters = footerCache.getAllPresent(statuses)
val toFetch = new ArrayList[FileStatus]
for (s <- statuses) {
if (!foundFooters.containsKey(s)) {
toFetch.add(s)
}
}
val newFooters = new mutable.HashMap[FileStatus, Footer]
if (toFetch.size > 0) {
val fetched = getFooters(conf, toFetch)
for ((status, i) <- toFetch.zipWithIndex) {
newFooters(status) = fetched.get(i)
}
footerCache.putAll(newFooters)
}
footers = new ArrayList[Footer](statuses.size)
for (status <- statuses) {
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
}
}
}

footers
Expand All @@ -339,6 +375,10 @@ private[parquet] class FilteringParquetRowInputFormat
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {

import FilteringParquetRowInputFormat.blockLocationCache

val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)

val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
Expand Down Expand Up @@ -366,16 +406,23 @@ private[parquet] class FilteringParquetRowInputFormat
for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val status = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
var blockLocations: Array[BlockLocation] = null
if (!cacheMetadata) {
blockLocations = fs.getFileBlockLocations(status, 0, status.getLen)
} else {
blockLocations = blockLocationCache.get(status, new Callable[Array[BlockLocation]] {
def call(): Array[BlockLocation] = fs.getFileBlockLocations(status, 0, status.getLen)
})
}
splits.addAll(
generateSplits.invoke(
null,
blocks,
fileBlockLocations,
fileStatus,
blockLocations,
status,
parquetMetaData.getFileMetaData,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
Expand All @@ -387,6 +434,17 @@ private[parquet] class FilteringParquetRowInputFormat
}
}

private[parquet] object FilteringParquetRowInputFormat {
private val footerCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.build[FileStatus, Footer]()

private val blockLocationCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.expireAfterWrite(15, TimeUnit.MINUTES) // Expire locations since HDFS files might move
.build[FileStatus, Array[BlockLocation]]()
}

private[parquet] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
Expand Down

0 comments on commit c07fbe0

Please sign in to comment.