diff --git a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java b/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java deleted file mode 100644 index c51a3ed7dc6b2..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/internal/WriteSpec.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.internal; - -import java.io.Serializable; - -/** - * Write spec is a input parameter of - * {@link org.apache.spark.sql.execution.SparkPlan#executeWrite}. - * - *

- * This is an empty interface, the concrete class which implements - * {@link org.apache.spark.sql.execution.SparkPlan#doExecuteWrite} - * should define its own class and use it. - * - * @since 3.4.0 - */ -public interface WriteSpec extends Serializable {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 401302e5bdea2..5ca36a8a216af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, UnaryLike} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.WriteFilesSpec import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.internal.{SQLConf, WriteSpec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.NextIterator import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} @@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * * Concrete implementations of SparkPlan should override `doExecuteWrite`. */ - def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery { + def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = executeQuery { if (isCanonicalizedPlan) { throw SparkException.internalError("A canonicalized plan is not supposed to be executed.") } - doExecuteWrite(writeSpec) + doExecuteWrite(writeFilesSpec) } /** @@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * * Overridden by concrete implementations of SparkPlan. */ - protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = { + protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { throw SparkException.internalError(s"Internal Error ${this.getClass} has write support" + s" mismatch:\n${this}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 51a0c837c3e94..73c034cf2868b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -895,8 +895,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE") case logical.CollectMetrics(name, metrics, child) => execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil - case WriteFiles(child) => - WriteFilesExec(planLater(child)) :: Nil + case WriteFiles(child, fileFormat, partitionColumns, bucket, options, requiredOrdering) => + WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options, + requiredOrdering) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index 3ed04e5bd6da4..f2e8b050d0687 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -31,11 +31,26 @@ import org.apache.spark.sql.types.StringType import org.apache.spark.unsafe.types.UTF8String trait V1WriteCommand extends DataWritingCommand { + /** + * Specify the [[FileFormat]] of the provider of V1 write command. + */ + def fileFormat: FileFormat + /** * Specify the partition columns of the V1 write command. */ def partitionColumns: Seq[Attribute] + /** + * Specify the bucket spec of the V1 write command. + */ + def bucketSpec: Option[BucketSpec] + + /** + * Specify the storage options of the V1 write command. + */ + def options: Map[String, String] + /** * Specify the required ordering for the V1 write command. `FileFormatWriter` will * add SortExec if necessary when the requiredOrdering is empty. @@ -56,7 +71,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] => val newQuery = prepareQuery(write, write.query) val attrMap = AttributeMap(write.query.output.zip(newQuery.output)) - val newChild = WriteFiles(newQuery) + val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns, + write.bucketSpec, write.options, write.requiredOrdering) val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions { case a: Attribute if attrMap.contains(a) => a.withExprId(attrMap(a).exprId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala index 5bc8f9db32bcb..f880574a3cd66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala @@ -23,12 +23,12 @@ import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec -import org.apache.spark.sql.internal.WriteSpec /** * The write files spec holds all information of [[V1WriteCommand]] if its provider is @@ -38,13 +38,18 @@ case class WriteFilesSpec( description: WriteJobDescription, committer: FileCommitProtocol, concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec]) - extends WriteSpec /** * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query. * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]]. */ -case class WriteFiles(child: LogicalPlan) extends UnaryNode { +case class WriteFiles( + child: LogicalPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + requiredOrdering: Seq[SortOrder]) extends UnaryNode { override def output: Seq[Attribute] = child.output override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles = copy(child = newChild) @@ -53,13 +58,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode { /** * Responsible for writing files. */ -case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode { +case class WriteFilesExec( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + requiredOrdering: Seq[SortOrder]) extends UnaryExecNode { override def output: Seq[Attribute] = Seq.empty - override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = { - assert(writeSpec.isInstanceOf[WriteFilesSpec]) - val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec] - + override protected def doExecuteWrite( + writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { val rdd = child.execute() // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index af727f966e531..ac1772b4f4ec4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -238,7 +238,8 @@ case class RelationConversions( // that only matches table insertion inside Hive CTAS. // This pattern would not cause conflicts because this rule is always applied before // `HiveAnalysis` and both of these rules are running once. - case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _) + case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, + _, _, _) if query.resolved && DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) && conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index fb15432013d91..9c2032967eaf0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -99,21 +99,23 @@ case class InsertIntoHiveDirCommand( } // The temporary path must be a HDFS path, not a local path. - val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath) + val (stagingDir, tmpPath) = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath) val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( tmpPath.toString, tableDesc, false) + InsertIntoHiveTable.setupCompression(fileSinkConf, hadoopConf, sparkSession) + createExternalTmpPath(stagingDir, hadoopConf) try { saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, - fileSinkConf = fileSinkConf, + fileFormat = new HiveFileFormat(fileSinkConf), outputLocation = tmpPath.toString) if (overwrite && fs.exists(writeToPath)) { fs.listStatus(writeToPath).foreach { existFile => - if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) + if (existFile.getPath != stagingDir) fs.delete(existFile.getPath, true) } } @@ -131,7 +133,7 @@ case class InsertIntoHiveDirCommand( throw new SparkException( "Failed inserting overwrite directory " + storage.locationUri.get, e) } finally { - deleteExternalTmpPath(hadoopConf) + deleteExternalTmpPath(stagingDir, hadoopConf) } Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8c3aa0a80c1b7..6f1cda92458dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} @@ -30,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils -import org.apache.spark.sql.execution.datasources.{V1WriteCommand, V1WritesUtils} +import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -73,16 +74,18 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - outputColumnNames: Seq[String] + outputColumnNames: Seq[String], + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + fileFormat: FileFormat, + externalTmpPath: String, + @transient stagingDir: Path, + @transient hadoopConf: Configuration ) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils { - override lazy val partitionColumns: Seq[Attribute] = { - getDynamicPartitionColumns(table, partition, query) - } - override def requiredOrdering: Seq[SortOrder] = { - val options = getOptionsWithHiveBucketWrite(table.bucketSpec) - V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options) + V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options) } /** @@ -92,29 +95,13 @@ case class InsertIntoHiveTable( */ override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog - val hadoopConf = sparkSession.sessionState.newHadoopConf() - - val hiveQlTable = HiveClientImpl.toHiveTable(table) - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer - // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata - ) - val tableLocation = hiveQlTable.getDataLocation - val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) - + createExternalTmpPath(stagingDir, hadoopConf) try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) + processInsert(sparkSession, externalCatalog, child) } finally { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. - deleteExternalTmpPath(hadoopConf) + deleteExternalTmpPath(stagingDir, hadoopConf) } // un-cache this table. @@ -133,24 +120,20 @@ case class InsertIntoHiveTable( private def processInsert( sparkSession: SparkSession, externalCatalog: ExternalCatalog, - hadoopConf: Configuration, - tableDesc: TableDesc, - tmpLocation: Path, child: SparkPlan): Unit = { - val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val numDynamicPartitions = partition.values.count(_.isEmpty) val partitionSpec = getPartitionSpec(partition) - val partitionAttributes = getDynamicPartitionColumns(table, partition, query) val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, - fileSinkConf = fileSinkConf, - outputLocation = tmpLocation.toString, - partitionAttributes = partitionAttributes, - bucketSpec = table.bucketSpec) + fileFormat = fileFormat, + outputLocation = externalTmpPath, + partitionAttributes = partitionColumns, + bucketSpec = bucketSpec, + options = options) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { @@ -204,7 +187,7 @@ case class InsertIntoHiveTable( externalCatalog.loadDynamicPartitions( db = table.database, table = table.identifier.table, - tmpLocation.toString, + externalTmpPath, partitionSpec, overwrite, numDynamicPartitions) @@ -274,7 +257,7 @@ case class InsertIntoHiveTable( externalCatalog.loadPartition( table.database, table.identifier.table, - tmpLocation.toString, + externalTmpPath, partitionSpec, isOverwrite = doHiveOverwrite, inheritTableSpecs = inheritTableSpecs, @@ -285,7 +268,7 @@ case class InsertIntoHiveTable( externalCatalog.loadTable( table.database, table.identifier.table, - tmpLocation.toString, // TODO: URI + externalTmpPath, // TODO: URI overwrite, isSrcLocal = false) } @@ -294,3 +277,40 @@ case class InsertIntoHiveTable( override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable = copy(query = newChild) } +object InsertIntoHiveTable extends V1WritesHiveUtils with Logging { + def apply( + table: CatalogTable, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean, + outputColumnNames: Seq[String]): InsertIntoHiveTable = { + val sparkSession = SparkSession.getActiveSession.orNull + val hiveQlTable = HiveClientImpl.toHiveTable(table) + // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer + // instances within the closure, since Serializer is not serializable while TableDesc is. + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because + // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to + // substitute some output formats, e.g. substituting SequenceFileOutputFormat to + // HiveSequenceFileOutputFormat. + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val tableLocation = hiveQlTable.getDataLocation + val (stagingDir, externalTmpPath) = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) + val fileSinkConf = new FileSinkDesc(externalTmpPath.toString, tableDesc, false) + setupCompression(fileSinkConf, hadoopConf, sparkSession) + val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf) + + val partitionColumns = getDynamicPartitionColumns(table, partition, query) + val bucketSpec = table.bucketSpec + val options = getOptionsWithHiveBucketWrite(bucketSpec) + + new InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists, + outputColumnNames, partitionColumns, bucketSpec, options, fileFormat, + externalTmpPath.toString, stagingDir, hadoopConf) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 799cea42e1e8a..db42777e73b5c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -18,16 +18,12 @@ package org.apache.spark.sql.hive.execution import java.io.IOException -import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils -import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession @@ -37,61 +33,31 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand -import org.apache.spark.sql.execution.datasources.FileFormatWriter -import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.HiveVersion +import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter} // Base trait from which all hive insert statement physical execution extends. private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveUtils { - var createdTempDir: Option[Path] = None - protected def saveAsHiveFile( sparkSession: SparkSession, plan: SparkPlan, hadoopConf: Configuration, - fileSinkConf: FileSinkDesc, + fileFormat: FileFormat, outputLocation: String, customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, partitionAttributes: Seq[Attribute] = Nil, - bucketSpec: Option[BucketSpec] = None): Set[String] = { - - val isCompressed = - fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { - case formatName if formatName.endsWith("orcoutputformat") => - // For ORC,"mapreduce.output.fileoutputformat.compress", - // "mapreduce.output.fileoutputformat.compress.codec", and - // "mapreduce.output.fileoutputformat.compress.type" - // have no impact because it uses table properties to store compression information. - false - case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean - } - - if (isCompressed) { - hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.codec")) - fileSinkConf.setCompressType(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.type")) - } else { - // Set compression by priority - HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) - .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } - } + bucketSpec: Option[BucketSpec] = None, + options: Map[String, String] = Map.empty): Set[String] = { val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, outputPath = outputLocation) - val options = getOptionsWithHiveBucketWrite(bucketSpec) - FileFormatWriter.write( sparkSession = sparkSession, plan = plan, - fileFormat = new HiveFileFormat(fileSinkConf), + fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns), @@ -102,53 +68,14 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU options = options) } - protected def getExternalTmpPath( - sparkSession: SparkSession, - hadoopConf: Configuration, - path: Path): Path = { - import org.apache.spark.sql.hive.client.hive._ - - // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under - // a common scratch directory. After the writing is finished, Hive will simply empty the table - // directory and move the staging directory to it. - // After Hive 1.1, Hive will create the staging directory under the table directory, and when - // moving staging directory to table directory, Hive will still empty the table directory, but - // will exclude the staging directory there. - // We have to follow the Hive behavior here, to avoid troubles. For example, if we create - // staging directory under the table director for Hive prior to 1.1, the staging directory will - // be removed by Hive when Hive is trying to empty the table directory. - val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) - val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = - Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) - - // Ensure all the supported versions are considered here. - assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == - allSupportedHiveVersions) - - val externalCatalog = sparkSession.sharedState.externalCatalog - val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") - - if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, hadoopConf, scratchDir) - } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) - } else { - throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) - } - } - - protected def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { + protected def deleteExternalTmpPath(dir: Path, hadoopConf: Configuration) : Unit = { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { - createdTempDir.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (fs.delete(path, true)) { - // If we successfully delete the staging directory, remove it from FileSystem's cache. - fs.cancelDeleteOnExit(path) - } + val fs = dir.getFileSystem(hadoopConf) + if (fs.delete(dir, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(dir) } } catch { case NonFatal(e) => @@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU } } - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { - val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) - var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - - try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) - } catch { - case e: IOException => - throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e) - } - dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathName: String = inputPath.toString - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) + protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): Unit = { + val fs: FileSystem = dir.getFileSystem(hadoopConf) try { if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } - createdTempDir = Some(dir) fs.deleteOnExit(dir) } catch { case e: IOException => throw QueryExecutionErrors.cannotCreateStagingDirError( s"'${dir.toString}': ${e.getMessage}", e) } - dir - } - - // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir(). - private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { - val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR - val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR - path1.startsWith(path2) - } - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala index 752753f334a23..ad22f62317e96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala @@ -17,21 +17,28 @@ package org.apache.spark.sql.hive.execution -import java.util.Locale +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.ErrorMsg -import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.exec.TaskRunner +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.spark.SparkException +import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.BucketingUtils -import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} -trait V1WritesHiveUtils { +trait V1WritesHiveUtils extends Logging { def getPartitionSpec(partition: Map[String, Option[String]]): Map[String, String] = { partition.map { case (key, Some(null)) => key -> ExternalCatalogUtils.DEFAULT_PARTITION_NAME @@ -105,4 +112,164 @@ trait V1WritesHiveUtils { .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) .getOrElse(Map.empty) } + + def setupCompression( + fileSinkConf: FileSinkDesc, + hadoopConf: Configuration, + sparkSession: SparkSession): Unit = { + val isCompressed = + fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { + case formatName if formatName.endsWith("orcoutputformat") => + // For ORC,"mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact because it uses table properties to store compression information. + false + case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean + } + + if (isCompressed) { + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.type")) + } else { + // Set compression by priority + HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) + .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } + } + } + + /** + * Return two paths: + * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath` + * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000` + * The call side should create `stagingDir` before using `externalTmpPath` and + * delete `stagingDir` at the end. + */ + protected def getExternalTmpPath( + sparkSession: SparkSession, + hadoopConf: Configuration, + path: Path): (Path, Path) = { + import org.apache.spark.sql.hive.client.hive._ + + // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under + // a common scratch directory. After the writing is finished, Hive will simply empty the table + // directory and move the staging directory to it. + // After Hive 1.1, Hive will create the staging directory under the table directory, and when + // moving staging directory to table directory, Hive will still empty the table directory, but + // will exclude the staging directory there. + // We have to follow the Hive behavior here, to avoid troubles. For example, if we create + // staging directory under the table director for Hive prior to 1.1, the staging directory will + // be removed by Hive when Hive is trying to empty the table directory. + val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) + val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = + Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) + + // Ensure all the supported versions are considered here. + assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == + allSupportedHiveVersions) + + val externalCatalog = sparkSession.sharedState.externalCatalog + val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + + if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { + oldVersionExternalTempPath(path, hadoopConf, scratchDir) + } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { + newVersionExternalTempPath(path, hadoopConf, stagingDir) + } else { + throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) + } + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 + private def oldVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + scratchDir: String): (Path, Path) = { + val extURI: URI = path.toUri + val scratchPath = new Path(scratchDir, executionId) + var dirPath = new Path( + extURI.getScheme, + extURI.getAuthority, + scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) + + val fs = dirPath.getFileSystem(hadoopConf) + dirPath = new Path(fs.makeQualified(dirPath).toString()) + (dirPath, dirPath) + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 + private def newVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String): (Path, Path) = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + val qualifiedStagingDir = getStagingDir(path, hadoopConf, stagingDir) + // Hive uses 10000 + (qualifiedStagingDir, new Path(qualifiedStagingDir, "-ext-10000")) + } else { + val qualifiedStagingDir = getExternalScratchDir(extURI, hadoopConf, stagingDir) + (qualifiedStagingDir, new Path(qualifiedStagingDir, "-ext-10000")) + } + } + + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir) + } + + private[hive] def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val inputPathName: String = inputPath.toString + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + var stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. + if (isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") + stagingPathName = new Path(inputPathName, ".hive-staging").toString + } + + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + dir + } + + // HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir(). + private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = { + val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR + val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR + path1.startsWith(path2) + } + + private def executionId: String = { + val rand: Random = new Random + val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) + "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index f62d941746b40..b15c8b6f9e5dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -541,25 +541,24 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter val conf = spark.sessionState.newHadoopConf() val inputPath = new Path("/tmp/b/c") var stagingDir = "tmp/b" - val saveHiveFile = InsertIntoHiveTable(null, Map.empty, null, false, false, null) val getStagingDir = PrivateMethod[Path](Symbol("getStagingDir")) - var path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + var path = InsertIntoHiveTable invokePrivate getStagingDir(inputPath, conf, stagingDir) assert(path.toString.indexOf("/tmp/b_hive_") != -1) stagingDir = "tmp/b/c" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = InsertIntoHiveTable invokePrivate getStagingDir(inputPath, conf, stagingDir) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = "d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = InsertIntoHiveTable invokePrivate getStagingDir(inputPath, conf, stagingDir) assert(path.toString.indexOf("/tmp/b/c/.hive-staging_hive_") != -1) stagingDir = ".d/e" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = InsertIntoHiveTable invokePrivate getStagingDir(inputPath, conf, stagingDir) assert(path.toString.indexOf("/tmp/b/c/.d/e_hive_") != -1) stagingDir = "/tmp/c/" - path = saveHiveFile invokePrivate getStagingDir(inputPath, conf, stagingDir) + path = InsertIntoHiveTable invokePrivate getStagingDir(inputPath, conf, stagingDir) assert(path.toString.indexOf("/tmp/c_hive_") != -1) }