Skip to content

Commit

Permalink
Pull v1write information to write file node
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Dec 30, 2022
1 parent cfdbfb7 commit 76c875d
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 287 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, TreeNodeTag, UnaryLike}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.WriteFilesSpec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.{SQLConf, WriteSpec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
Expand Down Expand Up @@ -230,11 +231,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*
* Concrete implementations of SparkPlan should override `doExecuteWrite`.
*/
def executeWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = executeQuery {
def executeWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = executeQuery {
if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
doExecuteWrite(writeSpec)
doExecuteWrite(writeFilesSpec)
}

/**
Expand Down Expand Up @@ -343,7 +344,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*
* Overridden by concrete implementations of SparkPlan.
*/
protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
throw SparkException.internalError(s"Internal Error ${this.getClass} has write support" +
s" mismatch:\n${this}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("MERGE INTO TABLE")
case logical.CollectMetrics(name, metrics, child) =>
execution.CollectMetricsExec(name, metrics, planLater(child)) :: Nil
case WriteFiles(child) =>
WriteFilesExec(planLater(child)) :: Nil
case WriteFiles(child, fileFormat, partitionColumns, bucket, options, requiredOrdering) =>
WriteFilesExec(planLater(child), fileFormat, partitionColumns, bucket, options,
requiredOrdering) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,26 @@ import org.apache.spark.sql.types.StringType
import org.apache.spark.unsafe.types.UTF8String

trait V1WriteCommand extends DataWritingCommand {
/**
* Specify the [[FileFormat]] of the provider of V1 write command.
*/
def fileFormat: FileFormat

/**
* Specify the partition columns of the V1 write command.
*/
def partitionColumns: Seq[Attribute]

/**
* Specify the bucket spec of the V1 write command.
*/
def bucketSpec: Option[BucketSpec]

/**
* Specify the storage options of the V1 write command.
*/
def options: Map[String, String]

/**
* Specify the required ordering for the V1 write command. `FileFormatWriter` will
* add SortExec if necessary when the requiredOrdering is empty.
Expand All @@ -56,7 +71,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case write: V1WriteCommand if !write.child.isInstanceOf[WriteFiles] =>
val newQuery = prepareQuery(write, write.query)
val attrMap = AttributeMap(write.query.output.zip(newQuery.output))
val newChild = WriteFiles(newQuery)
val newChild = WriteFiles(newQuery, write.fileFormat, write.partitionColumns,
write.bucketSpec, write.options, write.requiredOrdering)
val newWrite = write.withNewChildren(newChild :: Nil).transformExpressions {
case a: Attribute if attrMap.contains(a) =>
a.withExprId(attrMap(a).exprId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec
import org.apache.spark.sql.internal.WriteSpec

/**
* The write files spec holds all information of [[V1WriteCommand]] if its provider is
Expand All @@ -38,13 +38,18 @@ case class WriteFilesSpec(
description: WriteJobDescription,
committer: FileCommitProtocol,
concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
extends WriteSpec

/**
* During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query.
* [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
*/
case class WriteFiles(child: LogicalPlan) extends UnaryNode {
case class WriteFiles(
child: LogicalPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
requiredOrdering: Seq[SortOrder]) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override protected def withNewChildInternal(newChild: LogicalPlan): WriteFiles =
copy(child = newChild)
Expand All @@ -53,13 +58,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode {
/**
* Responsible for writing files.
*/
case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
case class WriteFilesExec(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
requiredOrdering: Seq[SortOrder]) extends UnaryExecNode {
override def output: Seq[Attribute] = Seq.empty

override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = {
assert(writeSpec.isInstanceOf[WriteFilesSpec])
val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]

override protected def doExecuteWrite(
writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {
val rdd = child.execute()
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
// partition rdd to make sure we at least set up one write task to write the metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ case class RelationConversions(
// that only matches table insertion inside Hive CTAS.
// This pattern would not cause conflicts because this rule is always applied before
// `HiveAnalysis` and both of these rules are running once.
case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _)
case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _,
_, _, _)
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) &&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,23 @@ case class InsertIntoHiveDirCommand(
}

// The temporary path must be a HDFS path, not a local path.
val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
val (stagingDir, tmpPath) = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
tmpPath.toString, tableDesc, false)
InsertIntoHiveTable.setupCompression(fileSinkConf, hadoopConf, sparkSession)
createExternalTmpPath(stagingDir, hadoopConf)

try {
saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
fileSinkConf = fileSinkConf,
fileFormat = new HiveFileFormat(fileSinkConf),
outputLocation = tmpPath.toString)

if (overwrite && fs.exists(writeToPath)) {
fs.listStatus(writeToPath).foreach { existFile =>
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
if (existFile.getPath != stagingDir) fs.delete(existFile.getPath, true)
}
}

Expand All @@ -131,7 +133,7 @@ case class InsertIntoHiveDirCommand(
throw new SparkException(
"Failed inserting overwrite directory " + storage.locationUri.get, e)
} finally {
deleteExternalTmpPath(hadoopConf)
deleteExternalTmpPath(stagingDir, hadoopConf)
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
Expand All @@ -30,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{V1WriteCommand, V1WritesUtils}
import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveClientImpl
Expand Down Expand Up @@ -73,16 +74,18 @@ case class InsertIntoHiveTable(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
outputColumnNames: Seq[String]
outputColumnNames: Seq[String],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
fileFormat: FileFormat,
externalTmpPath: String,
@transient stagingDir: Path,
@transient hadoopConf: Configuration
) extends SaveAsHiveFile with V1WriteCommand with V1WritesHiveUtils {

override lazy val partitionColumns: Seq[Attribute] = {
getDynamicPartitionColumns(table, partition, query)
}

override def requiredOrdering: Seq[SortOrder] = {
val options = getOptionsWithHiveBucketWrite(table.bucketSpec)
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, table.bucketSpec, options)
V1WritesUtils.getSortOrder(outputColumns, partitionColumns, bucketSpec, options)
}

/**
Expand All @@ -92,29 +95,13 @@ case class InsertIntoHiveTable(
*/
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val externalCatalog = sparkSession.sharedState.externalCatalog
val hadoopConf = sparkSession.sessionState.newHadoopConf()

val hiveQlTable = HiveClientImpl.toHiveTable(table)
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = new TableDesc(
hiveQlTable.getInputFormatClass,
// The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
// getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
// substitute some output formats, e.g. substituting SequenceFileOutputFormat to
// HiveSequenceFileOutputFormat.
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata
)
val tableLocation = hiveQlTable.getDataLocation
val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)

createExternalTmpPath(stagingDir, hadoopConf)
try {
processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child)
processInsert(sparkSession, externalCatalog, child)
} finally {
// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
deleteExternalTmpPath(hadoopConf)
deleteExternalTmpPath(stagingDir, hadoopConf)
}

// un-cache this table.
Expand All @@ -133,24 +120,20 @@ case class InsertIntoHiveTable(
private def processInsert(
sparkSession: SparkSession,
externalCatalog: ExternalCatalog,
hadoopConf: Configuration,
tableDesc: TableDesc,
tmpLocation: Path,
child: SparkPlan): Unit = {
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)

val numDynamicPartitions = partition.values.count(_.isEmpty)
val partitionSpec = getPartitionSpec(partition)
val partitionAttributes = getDynamicPartitionColumns(table, partition, query)

val writtenParts = saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
fileSinkConf = fileSinkConf,
outputLocation = tmpLocation.toString,
partitionAttributes = partitionAttributes,
bucketSpec = table.bucketSpec)
fileFormat = fileFormat,
outputLocation = externalTmpPath,
partitionAttributes = partitionColumns,
bucketSpec = bucketSpec,
options = options)

if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
Expand Down Expand Up @@ -204,7 +187,7 @@ case class InsertIntoHiveTable(
externalCatalog.loadDynamicPartitions(
db = table.database,
table = table.identifier.table,
tmpLocation.toString,
externalTmpPath,
partitionSpec,
overwrite,
numDynamicPartitions)
Expand Down Expand Up @@ -274,7 +257,7 @@ case class InsertIntoHiveTable(
externalCatalog.loadPartition(
table.database,
table.identifier.table,
tmpLocation.toString,
externalTmpPath,
partitionSpec,
isOverwrite = doHiveOverwrite,
inheritTableSpecs = inheritTableSpecs,
Expand All @@ -285,7 +268,7 @@ case class InsertIntoHiveTable(
externalCatalog.loadTable(
table.database,
table.identifier.table,
tmpLocation.toString, // TODO: URI
externalTmpPath, // TODO: URI
overwrite,
isSrcLocal = false)
}
Expand All @@ -294,3 +277,40 @@ case class InsertIntoHiveTable(
override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable =
copy(query = newChild)
}
object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {
def apply(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
outputColumnNames: Seq[String]): InsertIntoHiveTable = {
val sparkSession = SparkSession.getActiveSession.orNull
val hiveQlTable = HiveClientImpl.toHiveTable(table)
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = new TableDesc(
hiveQlTable.getInputFormatClass,
// The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
// getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
// substitute some output formats, e.g. substituting SequenceFileOutputFormat to
// HiveSequenceFileOutputFormat.
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata
)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val tableLocation = hiveQlTable.getDataLocation
val (stagingDir, externalTmpPath) = getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
val fileSinkConf = new FileSinkDesc(externalTmpPath.toString, tableDesc, false)
setupCompression(fileSinkConf, hadoopConf, sparkSession)
val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf)

val partitionColumns = getDynamicPartitionColumns(table, partition, query)
val bucketSpec = table.bucketSpec
val options = getOptionsWithHiveBucketWrite(bucketSpec)

new InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists,
outputColumnNames, partitionColumns, bucketSpec, options, fileFormat,
externalTmpPath.toString, stagingDir, hadoopConf)
}
}
Loading

0 comments on commit 76c875d

Please sign in to comment.