From 0adc99b00d2e11215e39a8eeee3f3baa40680f5e Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Sun, 26 Jul 2015 19:19:12 +0300 Subject: [PATCH 01/12] Add hooks for selecting the set of files for a table scan --- .../apache/spark/sql/hive/HiveContext.scala | 12 +++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 11 ++++++++-- .../apache/spark/sql/hive/TableReader.scala | 21 +++++++++++++++++-- .../sql/hive/client/ClientInterface.scala | 2 ++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d43afb70619e8..8ef7dfe5f7109 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -150,6 +150,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient protected[sql] lazy val substitutor = new VariableSubstitution() + @transient + protected[sql] var hadoopFileSelector: Option[HadoopFileSelector] = None + /** * The copy of the hive client that is used for execution. Currently this must always be * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the @@ -514,6 +517,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { case _ => super.simpleString } } + + def setTableNamePreprocessor(tableNamePreprocessor: (String) => String): Unit = { + catalog.setTableNamePreprocessor(tableNamePreprocessor) + } + + def setHadoopFileSelector(hadoopFileSelector: Option[HadoopFileSelector]): Unit = { + this.hadoopFileSelector = hadoopFileSelector + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ca1f49b546bd7..2d9afb0aa158a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -217,14 +217,21 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive client.getTableOption(databaseName, tblName).isDefined } + private[this] var tableNamePreprocessor: (String) => String = identity + + def setTableNamePreprocessor(newTableNamePreprocessor: (String) => String): Unit = { + tableNamePreprocessor = newTableNamePreprocessor + } + def lookupRelation( tableIdentifier: Seq[String], alias: Option[String]): LogicalPlan = { val tableIdent = processTableIdentifier(tableIdentifier) val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( client.currentDatabase) - val tblName = tableIdent.last - val table = client.getTable(databaseName, tblName) + val rawTableName = tableIdent.last + val tblName = tableNamePreprocessor(rawTableName) + val table = client.getTable(databaseName, tblName).withTableName(rawTableName) if (table.properties.get("spark.sql.sources.provider").isDefined) { val dataSourceTable = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 294fc3bd7d5e9..e29d5ca6a53a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, PathFilter} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities @@ -106,7 +106,11 @@ class HadoopTableReader( val broadcastedHiveConf = _broadcastedHiveConf val tablePath = hiveTable.getPath - val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) + val fs = tablePath.getFileSystem(sc.hiveconf) + val inputPathStr = + sc.hadoopFileSelector.flatMap( + _.selectFiles(relation.tableName, fs, tablePath)).map(_.mkString(",")).getOrElse( + applyFilterIfNeeded(tablePath, filterOpt)) // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass @@ -396,3 +400,16 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { } } } + +abstract class HadoopFileSelector { + /** + * Select files constituting a table from the given base path according to the client's custom + * algorithm. This is only applied to non-partitioned tables. + * @param tableName table name to select files for + * @param fs the filesystem containing the table + * @param basePath base path of the table in the filesystem + * @return a set of files, or [[None]] if the custom file selection algorithm does not apply + * to this table. + */ + def selectFiles(tableName: String, fs: FileSystem, basePath: Path): Option[Seq[Path]] +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 0a1d761a52f88..5f9fe3530b50a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -67,6 +67,8 @@ private[hive] case class HiveTable( this } + def withTableName(newName: String): HiveTable = copy(name = newName).withClient(client) + def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved")) def isPartitioned: Boolean = partitionColumns.nonEmpty From 065618880e0c4bd96f6c08b9f78917ef2e051a65 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Mon, 27 Jul 2015 14:02:09 +0300 Subject: [PATCH 02/12] Use hiveTable.getTableName instead of relation.tableName --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index e29d5ca6a53a2..5f0e30fd65c70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -109,7 +109,7 @@ class HadoopTableReader( val fs = tablePath.getFileSystem(sc.hiveconf) val inputPathStr = sc.hadoopFileSelector.flatMap( - _.selectFiles(relation.tableName, fs, tablePath)).map(_.mkString(",")).getOrElse( + _.selectFiles(hiveTable.getTableName, fs, tablePath)).map(_.mkString(",")).getOrElse( applyFilterIfNeeded(tablePath, filterOpt)) // logDebug("Table input: %s".format(tablePath)) From 70f41e80793235e9ca5cc6068ecc0d55ac4430ce Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Mon, 27 Jul 2015 14:25:09 +0300 Subject: [PATCH 03/12] Do not use Path.toString directly; use toUri.getPath instead to avoid including a filesystem prefix. --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 5f0e30fd65c70..bb3fc1a197348 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -109,8 +109,9 @@ class HadoopTableReader( val fs = tablePath.getFileSystem(sc.hiveconf) val inputPathStr = sc.hadoopFileSelector.flatMap( - _.selectFiles(hiveTable.getTableName, fs, tablePath)).map(_.mkString(",")).getOrElse( - applyFilterIfNeeded(tablePath, filterOpt)) + _.selectFiles(hiveTable.getTableName, fs, tablePath)).map { fileOrDirList => + fileOrDirList.map(_.toUri.getPath).mkString(",") + }.getOrElse(applyFilterIfNeeded(tablePath, filterOpt)) // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass From a3a90150cad26f2fe187d34444259e49e691d619 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Mon, 27 Jul 2015 18:31:17 +0300 Subject: [PATCH 04/12] Specify multiple input paths correctly --- .../apache/spark/sql/hive/TableReader.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index bb3fc1a197348..a4a8fa0b4bc4d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -107,16 +107,15 @@ class HadoopTableReader( val tablePath = hiveTable.getPath val fs = tablePath.getFileSystem(sc.hiveconf) - val inputPathStr = + val inputPaths: Seq[Path] = sc.hadoopFileSelector.flatMap( - _.selectFiles(hiveTable.getTableName, fs, tablePath)).map { fileOrDirList => - fileOrDirList.map(_.toUri.getPath).mkString(",") - }.getOrElse(applyFilterIfNeeded(tablePath, filterOpt)) + _.selectFiles(hiveTable.getTableName, fs, tablePath) + ).getOrElse(applyFilterIfNeeded(tablePath, filterOpt)) // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val hadoopRDD = createHadoopRdd(tableDesc, inputPaths, ifc) val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) @@ -193,7 +192,7 @@ class HadoopTableReader( .map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) val partPath = HiveShim.getDataLocationPath(partition) - val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) + val inputPaths = applyFilterIfNeeded(partPath, filterOpt) val ifc = partDesc.getInputFileFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] // Get partition field info @@ -233,7 +232,7 @@ class HadoopTableReader( // Fill all partition keys to the given MutableRow object fillPartitionKeys(partValues, mutableRow) - createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => + createHadoopRdd(tableDesc, inputPaths, ifc).mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.newInstance() deserializer.initialize(hconf, partProps) @@ -259,13 +258,12 @@ class HadoopTableReader( * If `filterOpt` is defined, then it will be used to filter files from `path`. These files are * returned in a single, comma-separated string. */ - private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = { + private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Seq[Path] = { filterOpt match { case Some(filter) => val fs = path.getFileSystem(sc.hiveconf) - val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) - filteredFiles.mkString(",") - case None => path.toString + fs.listStatus(path, filter).map(_.getPath) + case None => Seq(path) } } @@ -275,10 +273,10 @@ class HadoopTableReader( */ private def createHadoopRdd( tableDesc: TableDesc, - path: String, + paths: Seq[Path], inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { - val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ + val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(paths, tableDesc) _ val rdd = new HadoopRDD( sc.sparkContext, @@ -299,8 +297,8 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to * instantiate a HadoopRDD. */ - def initializeLocalJobConfFunc(path: String, tableDesc: TableDesc)(jobConf: JobConf) { - FileInputFormat.setInputPaths(jobConf, Seq[Path](new Path(path)): _*) + def initializeLocalJobConfFunc(paths: Seq[Path], tableDesc: TableDesc)(jobConf: JobConf) { + FileInputFormat.setInputPaths(jobConf, paths: _*) if (tableDesc != null) { PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc) Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) From 3958ff9a9618c496c7e1a830c34cc6cf721c3d39 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Mon, 27 Jul 2015 18:57:25 +0300 Subject: [PATCH 05/12] Use string representations of paths because Path does not seem to be serializable --- .../org/apache/spark/sql/hive/TableReader.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index a4a8fa0b4bc4d..2feb9bd0b7cc6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -109,8 +109,8 @@ class HadoopTableReader( val fs = tablePath.getFileSystem(sc.hiveconf) val inputPaths: Seq[Path] = sc.hadoopFileSelector.flatMap( - _.selectFiles(hiveTable.getTableName, fs, tablePath) - ).getOrElse(applyFilterIfNeeded(tablePath, filterOpt)) + _.selectFiles(hiveTable.getTableName, fs, tablePath). + ).map(_.map(_.toString)).getOrElse(applyFilterIfNeeded(tablePath, filterOpt)) // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass @@ -258,11 +258,11 @@ class HadoopTableReader( * If `filterOpt` is defined, then it will be used to filter files from `path`. These files are * returned in a single, comma-separated string. */ - private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Seq[Path] = { + private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): Seq[String] = { filterOpt match { case Some(filter) => val fs = path.getFileSystem(sc.hiveconf) - fs.listStatus(path, filter).map(_.getPath) + fs.listStatus(path, filter).map(_.getPath.toString) case None => Seq(path) } } @@ -273,7 +273,7 @@ class HadoopTableReader( */ private def createHadoopRdd( tableDesc: TableDesc, - paths: Seq[Path], + paths: Seq[String], inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = { val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(paths, tableDesc) _ @@ -297,8 +297,8 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { * Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to * instantiate a HadoopRDD. */ - def initializeLocalJobConfFunc(paths: Seq[Path], tableDesc: TableDesc)(jobConf: JobConf) { - FileInputFormat.setInputPaths(jobConf, paths: _*) + def initializeLocalJobConfFunc(paths: Seq[String], tableDesc: TableDesc)(jobConf: JobConf) { + FileInputFormat.setInputPaths(jobConf, paths.map { pathStr => new Path(pathStr) }: _*) if (tableDesc != null) { PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc) Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) From dcbe6839c9dd58bf55409d242b07779abbfbbdea Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Mon, 27 Jul 2015 20:25:29 +0300 Subject: [PATCH 06/12] Add docstrings to hooks used for selecting a custom set of files --- .../apache/spark/sql/hive/HiveContext.scala | 18 ++++++++++++++++++ .../apache/spark/sql/hive/TableReader.scala | 10 ++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8ef7dfe5f7109..bf014530dc074 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -518,10 +518,28 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } + /** + * Allows the user to pre-process table names before the Hive metastore is looked up. This can + * be used to encode additional information into the table name, such as a version number + * (e.g. `mytable_v1`, `mytable_v2`, etc.) + * @param tableNamePreprocessor a function to be applied to Hive table name before we look up the + * table in the Hive metastore. + */ def setTableNamePreprocessor(tableNamePreprocessor: (String) => String): Unit = { catalog.setTableNamePreprocessor(tableNamePreprocessor) } + /** + * Allows to register a custom way to select files/directories to be included in a table scan + * based on the table name. This can be used together with [[setTableNamePreprocessor]] to + * customize table scan results based on the specified table name, e.g. `mytable_v1` could have a + * different set of files than `mytable_v2`, and both of these "virtual tables" would be backed + * by a real Hive table `mytable`. Note that the table name passed to the user-provided file + * selection method is the name specified in the query, not the table name in the Hive metastore + * that is generated by applying the user-specified "table name preprocessor" method. + * @param hadoopFileSelector + * @see [[setTableNamePreprocessor]] + */ def setHadoopFileSelector(hadoopFileSelector: Option[HadoopFileSelector]): Unit = { this.hadoopFileSelector = hadoopFileSelector } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 2feb9bd0b7cc6..81e0eae0a17e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -107,9 +107,9 @@ class HadoopTableReader( val tablePath = hiveTable.getPath val fs = tablePath.getFileSystem(sc.hiveconf) - val inputPaths: Seq[Path] = + val inputPaths: Seq[String] = sc.hadoopFileSelector.flatMap( - _.selectFiles(hiveTable.getTableName, fs, tablePath). + _.selectFiles(hiveTable.getTableName, fs, tablePath) ).map(_.map(_.toString)).getOrElse(applyFilterIfNeeded(tablePath, filterOpt)) // logDebug("Table input: %s".format(tablePath)) @@ -263,7 +263,7 @@ class HadoopTableReader( case Some(filter) => val fs = path.getFileSystem(sc.hiveconf) fs.listStatus(path, filter).map(_.getPath.toString) - case None => Seq(path) + case None => Seq(path.toString) } } @@ -404,7 +404,9 @@ abstract class HadoopFileSelector { /** * Select files constituting a table from the given base path according to the client's custom * algorithm. This is only applied to non-partitioned tables. - * @param tableName table name to select files for + * @param tableName table name to select files for. This is the exact table name specified + * in the query, not a "preprocessed" file name returned by the user-defined + * function registered via [[HiveContext.setTableNamePreprocessor]]. * @param fs the filesystem containing the table * @param basePath base path of the table in the filesystem * @return a set of files, or [[None]] if the custom file selection algorithm does not apply From f1a93c8fe0f9155c5327d512c87d51fed6e63f84 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Tue, 28 Jul 2015 02:25:33 +0300 Subject: [PATCH 07/12] Create two separate [un]setHadoopFileSelector methods --- .../org/apache/spark/sql/hive/HiveContext.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index bf014530dc074..8e84cf9b25569 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -532,16 +532,24 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /** * Allows to register a custom way to select files/directories to be included in a table scan * based on the table name. This can be used together with [[setTableNamePreprocessor]] to - * customize table scan results based on the specified table name, e.g. `mytable_v1` could have a + * customize table scan results based on the specified table name. E.g. `mytable_v1` could have a * different set of files than `mytable_v2`, and both of these "virtual tables" would be backed * by a real Hive table `mytable`. Note that the table name passed to the user-provided file * selection method is the name specified in the query, not the table name in the Hive metastore * that is generated by applying the user-specified "table name preprocessor" method. - * @param hadoopFileSelector + * @param hadoopFileSelector the user Hadoop file selection strategy * @see [[setTableNamePreprocessor]] */ - def setHadoopFileSelector(hadoopFileSelector: Option[HadoopFileSelector]): Unit = { - this.hadoopFileSelector = hadoopFileSelector + def setHadoopFileSelector(hadoopFileSelector: HadoopFileSelector): Unit = { + this.hadoopFileSelector = Some(hadoopFileSelector) + } + + /** + * Removes the "Hadoop file selector" strategy that was installed using the + * [[setHadoopFileSelector]] method. + */ + def unsetHadoopFileSelector(): Unit = { + hadoopFileSelector = None } } From 78b6d09f4c5e385107209fefb4c79ffcd5829766 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Wed, 29 Jul 2015 16:01:28 +0300 Subject: [PATCH 08/12] Add an option to treat empty strings as nulls during Hive table scans --- .../apache/spark/sql/hive/TableReader.scala | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 81e0eae0a17e1..ee1081953c082 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -60,6 +60,9 @@ class HadoopTableReader( @transient hiveExtraConf: HiveConf) extends TableReader with Logging { + private val emptyStringsAsNulls = + sc.hiveconf.getBoolean("spark.sql.emptyStringsAsNulls", false) + // Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local". // https://hadoop.apache.org/docs/r1.0.4/mapred-default.html // @@ -124,7 +127,8 @@ class HadoopTableReader( val hconf = broadcastedHiveConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) - HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer, + emptyStringsAsNulls) } deserializedHadoopRDD @@ -242,7 +246,7 @@ class HadoopTableReader( // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, - mutableRow, tableSerDe) + mutableRow, tableSerDe, emptyStringsAsNulls) } }.toSeq @@ -316,6 +320,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { * positions in the output schema * @param mutableRow A reusable `MutableRow` that should be filled * @param tableDeser Table Deserializer + * @param emptyStringsAsNulls whether to treat empty strings as nulls * @return An `Iterator[Row]` transformed from `iterator` */ def fillObject( @@ -323,7 +328,8 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { rawDeser: Deserializer, nonPartitionKeyAttrs: Seq[(Attribute, Int)], mutableRow: MutableRow, - tableDeser: Deserializer): Iterator[Row] = { + tableDeser: Deserializer, + emptyStringsAsNulls: Boolean): Iterator[Row] = { val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) { rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector] @@ -360,8 +366,18 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { case oi: DoubleObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) case oi: HiveVarcharObjectInspector => - (value: Any, row: MutableRow, ordinal: Int) => - row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue) + if (emptyStringsAsNulls) { + (value: Any, row: MutableRow, ordinal: Int) => + val colValue = oi.getPrimitiveJavaObject(value).getValue + if (colValue.isInstanceOf[String] && colValue.asInstanceOf[String].isEmpty) { + row.setString(ordinal, null) + } else { + row.setString(ordinal, colValue) + } + } else { + (value: Any, row: MutableRow, ordinal: Int) => + row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue) + } case oi: HiveDecimalObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.update(ordinal, HiveShim.toCatalystDecimal(oi, value)) From d65ac2e9c4e654a1a23cfa7e74d847b44155bd67 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Wed, 29 Jul 2015 16:25:23 +0300 Subject: [PATCH 09/12] Use SparkSQL configuration instead of Hive configuration --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index ee1081953c082..558d1930e4f0b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -61,7 +61,7 @@ class HadoopTableReader( extends TableReader with Logging { private val emptyStringsAsNulls = - sc.hiveconf.getBoolean("spark.sql.emptyStringsAsNulls", false) + sc.getConf("spark.sql.emptyStringsAsNulls", "false").toBoolean // Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local". // https://hadoop.apache.org/docs/r1.0.4/mapred-default.html From 31ab636a04a589a515a8e4159e01b6aa554f2773 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Wed, 29 Jul 2015 18:22:39 +0300 Subject: [PATCH 10/12] Fix the non-serializability of HadoopTableReader --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 558d1930e4f0b..21e64cbaea26b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -123,12 +123,13 @@ class HadoopTableReader( val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + val localEmptyStringsAsNulls = emptyStringsAsNulls // for serializability val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = deserializerClass.newInstance() deserializer.initialize(hconf, tableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer, - emptyStringsAsNulls) + localEmptyStringsAsNulls) } deserializedHadoopRDD @@ -236,6 +237,7 @@ class HadoopTableReader( // Fill all partition keys to the given MutableRow object fillPartitionKeys(partValues, mutableRow) + val localEmptyStringsAsNulls = emptyStringsAsNulls // for serializability createHadoopRdd(tableDesc, inputPaths, ifc).mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.newInstance() @@ -246,7 +248,7 @@ class HadoopTableReader( // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, - mutableRow, tableSerDe, emptyStringsAsNulls) + mutableRow, tableSerDe, localEmptyStringsAsNulls) } }.toSeq From 0d0ba2cfc36230cc72a31d2e82ef013ee1f0056a Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Wed, 29 Jul 2015 18:55:53 +0300 Subject: [PATCH 11/12] Special handling for StringObjectInspector in HiveTableReader --- .../apache/spark/sql/hive/TableReader.scala | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 21e64cbaea26b..50f405dddb4c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -369,17 +369,32 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) case oi: HiveVarcharObjectInspector => if (emptyStringsAsNulls) { - (value: Any, row: MutableRow, ordinal: Int) => - val colValue = oi.getPrimitiveJavaObject(value).getValue - if (colValue.isInstanceOf[String] && colValue.asInstanceOf[String].isEmpty) { + (value: Any, row: MutableRow, ordinal: Int) => { + val strValue = oi.getPrimitiveJavaObject(value).getValue + if (strValue.isEmpty) { row.setString(ordinal, null) } else { - row.setString(ordinal, colValue) + row.setString(ordinal, strValue) } + } } else { (value: Any, row: MutableRow, ordinal: Int) => row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue) } + case oi: StringObjectInspector => + if (emptyStringsAsNulls) { + (value: Any, row: MutableRow, ordinal: Int) => { + val strValue = oi.getPrimitiveJavaObject(value) + if (strValue.isEmpty) { + row.setString(ordinal, null) + } else { + row.setString(ordinal, strValue) + } + } + } else { + (value: Any, row: MutableRow, ordinal: Int) => + row.setString(ordinal, oi.getPrimitiveJavaObject(value)) + } case oi: HiveDecimalObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.update(ordinal, HiveShim.toCatalystDecimal(oi, value)) From 08c5d0367b0111274de6a9b90bc287c1f0cd0427 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Wed, 29 Jul 2015 21:13:58 +0300 Subject: [PATCH 12/12] Refactor emptyStringsAsNulls handling to be more case-friendly --- .../apache/spark/sql/hive/TableReader.scala | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 50f405dddb4c7..3885486564c40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -367,34 +367,30 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value)) case oi: DoubleObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) - case oi: HiveVarcharObjectInspector => - if (emptyStringsAsNulls) { - (value: Any, row: MutableRow, ordinal: Int) => { - val strValue = oi.getPrimitiveJavaObject(value).getValue - if (strValue.isEmpty) { - row.setString(ordinal, null) - } else { - row.setString(ordinal, strValue) - } + case oi: HiveVarcharObjectInspector if emptyStringsAsNulls => + (value: Any, row: MutableRow, ordinal: Int) => { + val strValue = oi.getPrimitiveJavaObject(value).getValue + if (strValue.isEmpty) { + row.setString(ordinal, null) + } else { + row.setString(ordinal, strValue) } - } else { - (value: Any, row: MutableRow, ordinal: Int) => - row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue) } - case oi: StringObjectInspector => - if (emptyStringsAsNulls) { - (value: Any, row: MutableRow, ordinal: Int) => { - val strValue = oi.getPrimitiveJavaObject(value) - if (strValue.isEmpty) { - row.setString(ordinal, null) - } else { - row.setString(ordinal, strValue) - } + case oi: HiveVarcharObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => + row.setString(ordinal, oi.getPrimitiveJavaObject(value).getValue) + case oi: StringObjectInspector if emptyStringsAsNulls => + (value: Any, row: MutableRow, ordinal: Int) => { + val strValue = oi.getPrimitiveJavaObject(value) + if (strValue.isEmpty) { + row.setString(ordinal, null) + } else { + row.setString(ordinal, strValue) } - } else { - (value: Any, row: MutableRow, ordinal: Int) => - row.setString(ordinal, oi.getPrimitiveJavaObject(value)) } + case oi: StringObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => + row.setString(ordinal, oi.getPrimitiveJavaObject(value)) case oi: HiveDecimalObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.update(ordinal, HiveShim.toCatalystDecimal(oi, value))