Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41708][SQL] Pull v1write information to WriteFiles #39277

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, UnaryLike}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.WriteFilesSpec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.{SQLConf, WriteSpec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
Expand Down Expand Up @@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*
* Concrete implementations of SparkPlan should override `doExecuteWrite`.
*/
def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery {
def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = executeQuery {
if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
doExecuteWrite(writeSpec)
doExecuteWrite(writeFilesSpec)
}

/**
Expand Down Expand Up @@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*
* Overridden by concrete implementations of SparkPlan.
*/
protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
throw SparkException.internalError(s"Internal Error ${this.getClass} has write support" +
s" mismatch:\n${this}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE")
case logical.CollectMetrics(name, metrics, child) =>
execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
case WriteFiles(child) =>
WriteFilesExec(planLater(child)) :: Nil
case WriteFiles(child, fileFormat, partitionColumns, bucket, options, staticPartitions) =>
WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options,
staticPartitions) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
Expand All @@ -31,11 +32,31 @@ import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String

trait V1WriteCommand extends DataWritingCommand {
/**
* Specify the [[FileFormat]] of the provider of V1 write command.
*/
def fileFormat: FileFormat

/**
* Specify the partition columns of the V1 write command.
*/
def partitionColumns: Seq[Attribute]

/**
* Specify the partition spec of the V1 write command.
*/
def staticPartitions: TablePartitionSpec

/**
* Specify the bucket spec of the V1 write command.
*/
def bucketSpec: Option[BucketSpec]

/**
* Specify the storage options of the V1 write command.
*/
def options: Map[String, String]

/**
* Specify the required ordering for the V1 write command. `FileFormatWriter` will
* add SortExec if necessary when the requiredOrdering is empty.
Expand All @@ -56,7 +77,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
val newQuery = prepareQuery(write, write.query)
val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
val newChild = WriteFiles(newQuery)
val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
write.bucketSpec, write.options, write.staticPartitions)
val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
import org.apache.spark.sql.internal.WriteSpec

/**
* The write files spec holds all information of [[V1WriteCommand]] if its provider is
Expand All @@ -38,13 +39,18 @@ case class WriteFilesSpec(
description: WriteJobDescription,
committer: FileCommitProtocol,
concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
extends WriteSpec

/**
* During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query.
* [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
*/
case class WriteFiles(child: LogicalPlan) extends UnaryNode {
case class WriteFiles(
child: LogicalPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles =
copy(child = newChild)
Expand All @@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode {
/**
* Responsible for writing files.
*/
case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
case class WriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec) extends UnaryExecNode {
override def output: Seq[Attribute] = Seq.empty

override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
assert(writeSpec.isInstanceOf[WriteFilesSpec])
val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]

override protected def doExecuteWrite(
writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should WriteFilesSpec include less information as some information are already available in WriteFilesExec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it's a bit hard. look at the current information:

case class WriteFilesSpec(
    description: WriteJobDescription,
    committer: FileCommitProtocol,
    concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
  • ConcurrentOutputWriterSpec and FileCommitProtocol contain the output spec so we can not replace them
  • WriteJobDescription contains many information which includes what we pull out, but if we want to reduce something inside WriteJobDescription, we need to create a new class to hold others. I'm not sure it's worth to do that.
class WriteJobDescription(
    val uuid: String,
    val serializableHadoopConf: SerializableConfiguration,
    val outputWriterFactory: OutputWriterFactory,
    val allColumns: Seq[Attribute],
    val dataColumns: Seq[Attribute],
    val partitionColumns: Seq[Attribute],
    val bucketSpec: Option[WriterBucketSpec],
    val path: String,
    val customPartitionLocations: Map[TablePartitionSpec, String],
    val maxRecordsPerFile: Long,
    val timeZoneId: String,
    val statsTrackers: Seq[WriteJobStatsTracker])

val rdd = child.execute()
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
// partition rdd to make sure we at least set up one write task to write the metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ case class RelationConversions(
// that only matches table insertion inside Hive CTAS.
// This pattern would not cause conflicts because this rule is always applied before
// `HiveAnalysis` and both of these rules are running once.
case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _)
case InsertIntoHiveTable(
tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _)
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) &&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveVersion

class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path)
extends Logging {
private var stagingDirForCreating: Option[Path] = None

lazy val externalTempPath: Path = getExternalTmpPath(path)

private def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._

// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
// moving staging directory to table directory, Hive will still empty the table directory, but
// will exclude the staging directory there.
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)

val externalCatalog = session.sharedState.externalCatalog
val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")

if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, scratchDir)
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
}
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())

val fs = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())
stagingDirForCreating = Some(dirPath)
dirPath
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
val qualifiedStagingDir = getStagingDir(path, stagingDir)
stagingDirForCreating = Some(qualifiedStagingDir)
// Hive uses 10000
new Path(qualifiedStagingDir, "-ext-10000")
} else {
val qualifiedStagingDir = getExternalScratchDir(extURI, stagingDir)
stagingDirForCreating = Some(qualifiedStagingDir)
new Path(qualifiedStagingDir, "-ext-10000")
}
}

private def getExternalScratchDir(extURI: URI, stagingDir: String): Path = {
getStagingDir(
new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
stagingDir)
}

private[hive] def getStagingDir(inputPath: Path, stagingDir: String): Path = {
val inputPathName: String = inputPath.toString
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}

// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the
// staging directory needs to avoid being deleted when users set hive.exec.stagingdir
// under the table directory.
if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
"directory.")
stagingPathName = new Path(inputPathName, ".hive-staging").toString
}

val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
dir
}

// HIVE-14259 removed FileUtils.isSubDir(). Adapted it from Hive 1.2's FileUtils.isSubDir().
private def isSubDir(p1: Path, p2: Path, fs: FileSystem): Boolean = {
val path1 = fs.makeQualified(p1).toString + Path.SEPARATOR
val path2 = fs.makeQualified(p2).toString + Path.SEPARATOR
path1.startsWith(path2)
}

private def executionId: String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}

def deleteTmpPath() : Unit = {
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
stagingDirForCreating.foreach { stagingDir =>
val fs = stagingDir.getFileSystem(hadoopConf)
if (fs.delete(stagingDir, true)) {
// If we successfully delete the staging directory, remove it from FileSystem's cache.
fs.cancelDeleteOnExit(stagingDir)
}
}
} catch {
case NonFatal(e) =>
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}
}

def createTmpPath(): Unit = {
try {
stagingDirForCreating.foreach { stagingDir =>
val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
throw new IllegalStateException(
"Cannot create staging directory '" + stagingDir.toString + "'")
}
fs.deleteOnExit(stagingDir)
}
} catch {
case e: IOException =>
throw QueryExecutionErrors.cannotCreateStagingDirError(
s"'${stagingDirForCreating.toString}': ${e.getMessage}", e)
}
}

def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = {
if (Option(path) != stagingDirForCreating) fs.delete(path, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more method for InsertIntoHiveDirCommand so we can hide staging dir.

}
}
Loading