Skip to content

Commit

Permalink
Custom CatalogFileIndex (apache-spark-on-k8s#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Apr 22, 2018
1 parent 5ccc040 commit f9f98dd
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 95 deletions.
10 changes: 5 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ jobs:
<<: *defaults
steps:
# Saves us from recompiling every time...
- restore_cache:
keys:
- v1-build-sbt-{{ .Branch }}-{{ .Revision }}
- v1-build-sbt-{{ .Branch }}-
- v1-build-sbt-master-
#- restore_cache:
#keys:
#- v1-build-sbt-{{ .Branch }}-{{ .Revision }}
#- v1-build-sbt-{{ .Branch }}-
#- v1-build-sbt-master-
- *checkout-code
- run:
name: Hard link cache contents into current build directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ object StaticSQLConf {
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)

val CATALOG_FILE_INDEX_IMPLEMENTATION =
buildStaticConf("spark.sql.catalogFileIndexImplementation")
.internal()
.stringConf
.createWithDefault("hive")

val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,60 @@

package org.apache.spark.sql.execution.datasources

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_FILE_INDEX_IMPLEMENTATION
import org.apache.spark.util.Utils

/**
* A [[FileIndex]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
*/
class CatalogFileIndex(
sparkSession: SparkSession,
val table: CatalogTable,
override val sizeInBytes: Long) extends FileIndex {

protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()
trait CatalogFileIndex extends FileIndex {

/** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)

assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")
/**
* Returns a [[FileIndex]] for this table restricted to the subset of partitions
* specified by the given partition-pruning filters.
*
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): FileIndex

private val baseLocation: Option[URI] = table.storage.locationUri
}

override def partitionSchema: StructType = table.partitionSchema
trait CatalogFileIndexFactory {

override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
/**
* Creates [[CatalogFileIndex]] for given table
*/
def create(
spark: SparkSession,
catalogTable: CatalogTable,
tableSize: Long): CatalogFileIndex

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
}
}

override def refresh(): Unit = fileStatusCache.invalidateAll()
object CatalogFileIndexFactory {

/**
* Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
* specified by the given partition-pruning filters.
*
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
def reflect[T <: CatalogFileIndexFactory](conf: SparkConf): T = {
val className = fileIndexClassName(conf)
try {
val ctor = Utils.classForName(className).getDeclaredConstructor()
ctor.newInstance().asInstanceOf[T]
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}

override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles

// `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
case other: CatalogFileIndex => this.table.identifier == other.table.identifier
case _ => false
private def fileIndexClassName(conf: SparkConf): String = {
conf.get(CATALOG_FILE_INDEX_IMPLEMENTATION) match {
case "hive" => "org.apache.spark.sql.execution.datasources.HiveCatalogFileIndexFactory"
case name => name
}
}

override def hashCode(): Int = table.identifier.hashCode()
}

/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
Some(partitionSpec.partitionColumns),
fileStatusCache)
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ case class DataSource(

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

lazy val fileIndexFactory: CatalogFileIndexFactory =
CatalogFileIndexFactory.reflect(sparkSession.sparkContext.conf)
lazy val providingClass: Class[_] =
DataSource.lookupDataSource(className, sparkSession.sessionState.conf)
lazy val sourceInfo: SourceInfo = sourceSchema()
Expand Down Expand Up @@ -361,7 +363,7 @@ case class DataSource(
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
val index = new CatalogFileIndex(
val index = fileIndexFactory.create(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType


/**
* A [[FileIndex]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
*/
class HiveCatalogFileIndex(
sparkSession: SparkSession,
val table: CatalogTable,
override val sizeInBytes: Long) extends CatalogFileIndex {

protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf()

/** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */
private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)

assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")

private val baseLocation: Option[URI] = table.storage.locationUri

override def partitionSchema: StructType = table.partitionSchema

override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
}

override def refresh(): Unit = fileStatusCache.invalidateAll()

/**
* Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
* specified by the given partition-pruning filters.
*
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val startTime = System.nanoTime()
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
val path = new Path(p.location)
val fs = path.getFileSystem(hadoopConf)
PartitionPath(
p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone),
path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val timeNs = System.nanoTime() - startTime
new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
}
}

override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles

// `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
case other: HiveCatalogFileIndex => this.table.identifier == other.table.identifier
case _ => false
}

override def hashCode(): Int = table.identifier.hashCode()
}

class HiveCatalogFileIndexFactory extends CatalogFileIndexFactory {
override def create(
spark: SparkSession, catalogTable: CatalogTable, tableSize: Long): CatalogFileIndex =
new HiveCatalogFileIndex(spark, catalogTable, tableSize)
}

/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
*
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec,
override val metadataOpsTimeNs: Option[Long])
extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
Some(partitionSpec.partitionColumns),
fileStatusCache)
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val logicalRelation = cached.getOrElse {
val sizeInBytes = relation.stats.sizeInBytes.toLong
val fileIndex = {
val index = new CatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes)
val index = new HiveCatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes)
if (lazyPruningEnabled) {
index
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, HiveCatalogFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -320,7 +320,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
withTable("test") {
sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val catalogFileIndex = new HiveCatalogFileIndex(spark, tableMeta, 0)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
Expand All @@ -338,7 +338,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto

assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)

val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0)
val sameCatalog = new HiveCatalogFileIndex(spark, tableMeta, 0)
val sameRelation = HadoopFsRelation(
location = sameCatalog,
partitionSchema = tableMeta.partitionSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
Expand All @@ -46,7 +46,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.toURI}'""".stripMargin)

val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val catalogFileIndex = new HiveCatalogFileIndex(spark, tableMeta, 0)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
Expand Down

0 comments on commit f9f98dd

Please sign in to comment.