From f9f98ddc9590f934ce83de6a83a3d5836a4640c2 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Sun, 22 Apr 2018 15:19:02 +0100 Subject: [PATCH] Custom CatalogFileIndex (#364) --- .circleci/config.yml | 10 +- .../spark/sql/internal/StaticSQLConf.scala | 6 + .../datasources/CatalogFileIndex.scala | 119 +++++----------- .../execution/datasources/DataSource.scala | 4 +- .../datasources/HiveCatalogFileIndex.scala | 129 ++++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/CachedTableSuite.scala | 6 +- .../PruneFileSourcePartitionsSuite.scala | 4 +- 8 files changed, 185 insertions(+), 95 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HiveCatalogFileIndex.scala diff --git a/.circleci/config.yml b/.circleci/config.yml index 4e78897454b33..e261070c9814b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -201,11 +201,11 @@ jobs: <<: *defaults steps: # Saves us from recompiling every time... - - restore_cache: - keys: - - v1-build-sbt-{{ .Branch }}-{{ .Revision }} - - v1-build-sbt-{{ .Branch }}- - - v1-build-sbt-master- + #- restore_cache: + #keys: + #- v1-build-sbt-{{ .Branch }}-{{ .Revision }} + #- v1-build-sbt-{{ .Branch }}- + #- v1-build-sbt-master- - *checkout-code - run: name: Hard link cache contents into current build directory diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index a7f594837d3cf..d6eaf72c23b4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -33,6 +33,12 @@ object StaticSQLConf { .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val CATALOG_FILE_INDEX_IMPLEMENTATION = + buildStaticConf("spark.sql.catalogFileIndexImplementation") + .internal() + .stringConf + .createWithDefault("hive") + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .stringConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index a66a07673e25f..290571bf01565 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -17,107 +17,60 @@ package org.apache.spark.sql.execution.datasources -import java.net.URI - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import scala.util.control.NonFatal +import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.StructType - +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_FILE_INDEX_IMPLEMENTATION +import org.apache.spark.util.Utils /** * A [[FileIndex]] for a metastore catalog table. - * - * @param sparkSession a [[SparkSession]] - * @param table the metadata of the table - * @param sizeInBytes the table's data size in bytes */ -class CatalogFileIndex( - sparkSession: SparkSession, - val table: CatalogTable, - override val sizeInBytes: Long) extends FileIndex { - - protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf() +trait CatalogFileIndex extends FileIndex { - /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */ - private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - - assert(table.identifier.database.isDefined, - "The table identifier must be qualified in CatalogFileIndex") + /** + * Returns a [[FileIndex]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): FileIndex - private val baseLocation: Option[URI] = table.storage.locationUri +} - override def partitionSchema: StructType = table.partitionSchema +trait CatalogFileIndexFactory { - override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + /** + * Creates [[CatalogFileIndex]] for given table + */ + def create( + spark: SparkSession, + catalogTable: CatalogTable, + tableSize: Long): CatalogFileIndex - override def listFiles( - partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - filterPartitions(partitionFilters).listFiles(Nil, dataFilters) - } +} - override def refresh(): Unit = fileStatusCache.invalidateAll() +object CatalogFileIndexFactory { - /** - * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions - * specified by the given partition-pruning filters. - * - * @param filters partition-pruning filters - */ - def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { - if (table.partitionColumnNames.nonEmpty) { - val startTime = System.nanoTime() - val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( - table.identifier, filters) - val partitions = selectedPartitions.map { p => - val path = new Path(p.location) - val fs = path.getFileSystem(hadoopConf) - PartitionPath( - p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), - path.makeQualified(fs.getUri, fs.getWorkingDirectory)) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) - val timeNs = System.nanoTime() - startTime - new PrunedInMemoryFileIndex( - sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) - } else { - new InMemoryFileIndex( - sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None) + def reflect[T <: CatalogFileIndexFactory](conf: SparkConf): T = { + val className = fileIndexClassName(conf) + try { + val ctor = Utils.classForName(className).getDeclaredConstructor() + ctor.newInstance().asInstanceOf[T] + } catch { + case NonFatal(e) => + throw new IllegalArgumentException(s"Error while instantiating '$className':", e) } } - override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles - - // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member - // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to - // implement `equals` and `hashCode` here, to make it work with cache lookup. - override def equals(o: Any): Boolean = o match { - case other: CatalogFileIndex => this.table.identifier == other.table.identifier - case _ => false + private def fileIndexClassName(conf: SparkConf): String = { + conf.get(CATALOG_FILE_INDEX_IMPLEMENTATION) match { + case "hive" => "org.apache.spark.sql.execution.datasources.HiveCatalogFileIndexFactory" + case name => name + } } - override def hashCode(): Int = table.identifier.hashCode() } - -/** - * An override of the standard HDFS listing based catalog, that overrides the partition spec with - * the information from the metastore. - * - * @param tableBasePath The default base path of the Hive metastore table - * @param partitionSpec The partition specifications from Hive metastore - */ -private class PrunedInMemoryFileIndex( - sparkSession: SparkSession, - tableBasePath: Path, - fileStatusCache: FileStatusCache, - override val partitionSpec: PartitionSpec, - override val metadataOpsTimeNs: Option[Long]) - extends InMemoryFileIndex( - sparkSession, - partitionSpec.partitions.map(_.path), - Map.empty, - Some(partitionSpec.partitionColumns), - fileStatusCache) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f16d824201e77..5319b828f5760 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -89,6 +89,8 @@ case class DataSource( case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) + lazy val fileIndexFactory: CatalogFileIndexFactory = + CatalogFileIndexFactory.reflect(sparkSession.sparkContext.conf) lazy val providingClass: Class[_] = DataSource.lookupDataSource(className, sparkSession.sessionState.conf) lazy val sourceInfo: SourceInfo = sourceSchema() @@ -361,7 +363,7 @@ case class DataSource( catalogTable.get.partitionColumnNames.nonEmpty val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) { val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes - val index = new CatalogFileIndex( + val index = fileIndexFactory.create( sparkSession, catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HiveCatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HiveCatalogFileIndex.scala new file mode 100644 index 0000000000000..ba91c4167b528 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HiveCatalogFileIndex.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.net.URI + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.StructType + + +/** + * A [[FileIndex]] for a metastore catalog table. + * + * @param sparkSession a [[SparkSession]] + * @param table the metadata of the table + * @param sizeInBytes the table's data size in bytes + */ +class HiveCatalogFileIndex( + sparkSession: SparkSession, + val table: CatalogTable, + override val sizeInBytes: Long) extends CatalogFileIndex { + + protected val hadoopConf: Configuration = sparkSession.sessionState.newHadoopConf() + + /** Globally shared (not exclusive to this table) cache for file statuses to speed up listing. */ + private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + + assert(table.identifier.database.isDefined, + "The table identifier must be qualified in CatalogFileIndex") + + private val baseLocation: Option[URI] = table.storage.locationUri + + override def partitionSchema: StructType = table.partitionSchema + + override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq + + override def listFiles( + partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + filterPartitions(partitionFilters).listFiles(Nil, dataFilters) + } + + override def refresh(): Unit = fileStatusCache.invalidateAll() + + /** + * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions + * specified by the given partition-pruning filters. + * + * @param filters partition-pruning filters + */ + def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { + if (table.partitionColumnNames.nonEmpty) { + val startTime = System.nanoTime() + val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( + table.identifier, filters) + val partitions = selectedPartitions.map { p => + val path = new Path(p.location) + val fs = path.getFileSystem(hadoopConf) + PartitionPath( + p.toRow(partitionSchema, sparkSession.sessionState.conf.sessionLocalTimeZone), + path.makeQualified(fs.getUri, fs.getWorkingDirectory)) + } + val partitionSpec = PartitionSpec(partitionSchema, partitions) + val timeNs = System.nanoTime() - startTime + new PrunedInMemoryFileIndex( + sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) + } else { + new InMemoryFileIndex( + sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None) + } + } + + override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles + + // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member + // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to + // implement `equals` and `hashCode` here, to make it work with cache lookup. + override def equals(o: Any): Boolean = o match { + case other: HiveCatalogFileIndex => this.table.identifier == other.table.identifier + case _ => false + } + + override def hashCode(): Int = table.identifier.hashCode() +} + +class HiveCatalogFileIndexFactory extends CatalogFileIndexFactory { + override def create( + spark: SparkSession, catalogTable: CatalogTable, tableSize: Long): CatalogFileIndex = + new HiveCatalogFileIndex(spark, catalogTable, tableSize) +} + +/** + * An override of the standard HDFS listing based catalog, that overrides the partition spec with + * the information from the metastore. + * + * @param tableBasePath The default base path of the Hive metastore table + * @param partitionSpec The partition specifications from Hive metastore + */ +private class PrunedInMemoryFileIndex( + sparkSession: SparkSession, + tableBasePath: Path, + fileStatusCache: FileStatusCache, + override val partitionSpec: PartitionSpec, + override val metadataOpsTimeNs: Option[Long]) + extends InMemoryFileIndex( + sparkSession, + partitionSpec.partitions.map(_.path), + Map.empty, + Some(partitionSpec.partitionColumns), + fileStatusCache) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8adfda07d29d5..9a94e7c021a0a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -156,7 +156,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val logicalRelation = cached.getOrElse { val sizeInBytes = relation.stats.sizeInBytes.toLong val fileIndex = { - val index = new CatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes) + val index = new HiveCatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes) if (lazyPruningEnabled) { index } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 48ab4eb9a6178..a96b54ce032af 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, HiveCatalogFileIndex, LogicalRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -320,7 +320,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto withTable("test") { sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet") val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + val catalogFileIndex = new HiveCatalogFileIndex(spark, tableMeta, 0) val dataSchema = StructType(tableMeta.schema.filterNot { f => tableMeta.partitionColumnNames.contains(f.name) @@ -338,7 +338,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) - val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0) + val sameCatalog = new HiveCatalogFileIndex(spark, tableMeta, 0) val sameRelation = HadoopFsRelation( location = sameCatalog, partitionSchema = tableMeta.partitionSchema, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 94384185d190a..3788430092973 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -46,7 +46,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te |LOCATION '${dir.toURI}'""".stripMargin) val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + val catalogFileIndex = new HiveCatalogFileIndex(spark, tableMeta, 0) val dataSchema = StructType(tableMeta.schema.filterNot { f => tableMeta.partitionColumnNames.contains(f.name)