-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41708][SQL] Pull v1write information to WriteFiles
#39277
Conversation
a671e8e
to
76c875d
Compare
WriteFiles
* The call side should create `stagingDir` before using `externalTmpPath` and | ||
* delete `stagingDir` at the end. | ||
*/ | ||
protected def getExternalTmpPath( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the key change for hive insertion. Before this method has a side effect of creating the stagingDir. Now, this method return two paths, one is staging dir for creating and the other is the original externalTmpPath.
try { | ||
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { | ||
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") | ||
} | ||
createdTempDir = Some(dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global variable createdTempDir
is really hack. Since we have specified staging dir, we can pass it to the method deleteExternalTmpPath
, then we do not need it anymore.
try { | ||
processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) | ||
processInsert(sparkSession, externalCatalog, child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now the code looks like:
create stagingDir
try {
processInsert
} finally {
delete stagingDir
}
cc @cloud-fan |
partitionColumns: Seq[Attribute], | ||
bucketSpec: Option[BucketSpec], | ||
options: Map[String, String], | ||
requiredOrdering: Seq[SortOrder]) extends UnaryNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like a logical write information, but more of internal information. Do we really need it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about pull out partitionSpec
instead ? partitionColumns
does not contain the information of the insertion partition spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
options: Map[String, String], | ||
fileFormat: FileFormat, | ||
externalTmpPath: String, | ||
@transient stagingDir: Path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between externalTmpPath
and stagingDir
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for old hive version, externalTmpPath and stagingDir are the same.
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
Lines 129 to 136 in a3c837a
val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version | |
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") | |
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") | |
if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { | |
oldVersionExternalTempPath(path, hadoopConf, scratchDir) | |
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { | |
newVersionExternalTempPath(path, hadoopConf, stagingDir) |
for new hive version:
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
Lines 189 to 197 in a3c837a
private def newVersionExternalTempPath( | |
path: Path, | |
hadoopConf: Configuration, | |
stagingDir: String): Path = { | |
val extURI: URI = path.toUri | |
if (extURI.getScheme == "viewfs") { | |
getExtTmpPathRelTo(path, hadoopConf, stagingDir) | |
} else { | |
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") |
- externalTmpPath:
new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
- stagingDir:
getExternalScratchDir(extURI, hadoopConf, stagingDir)
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
Show resolved
Hide resolved
/** | ||
* Return two paths: | ||
* 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath` | ||
* 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Hadoop Path
provide API to get parent? If it does then we don't need to return 2 paths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it not only can be the parent. for old version hive, they are the same. So if we want reduce one path, we should check the hive version again before using it.
val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec] | ||
|
||
override protected def doExecuteWrite( | ||
writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should WriteFilesSpec
include less information as some information are already available in WriteFilesExec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it's a bit hard. look at the current information:
case class WriteFilesSpec(
description: WriteJobDescription,
committer: FileCommitProtocol,
concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec])
ConcurrentOutputWriterSpec
andFileCommitProtocol
contain the output spec so we can not replace themWriteJobDescription
contains many information which includes what we pull out, but if we want to reduce something insideWriteJobDescription
, we need to create a new class to hold others. I'm not sure it's worth to do that.
class WriteJobDescription(
val uuid: String,
val serializableHadoopConf: SerializableConfiguration,
val outputWriterFactory: OutputWriterFactory,
val allColumns: Seq[Attribute],
val dataColumns: Seq[Attribute],
val partitionColumns: Seq[Attribute],
val bucketSpec: Option[WriterBucketSpec],
val path: String,
val customPartitionLocations: Map[TablePartitionSpec, String],
val maxRecordsPerFile: Long,
val timeZoneId: String,
val statsTrackers: Seq[WriteJobStatsTracker])
* 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath` | ||
* 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-10000` | ||
* The call side should create `stagingDir` before using `externalTmpPath` and | ||
* delete `stagingDir` at the end. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a lot of comments to explain it, let's create a wrapper class
class HiveTableTempPath(session: SparkSession, conf: HadoopConf, path: Path) {
...
def stagingDir: Path = ...
def externalTempPath: Path = ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrapped using HiveTempPath
since it would be used by InsertIntoHiveDirCommand
bucketSpec: Option[BucketSpec], | ||
options: Map[String, String], | ||
fileFormat: FileFormat, | ||
@transient externalTmpPath: HiveTempPath |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@transient externalTmpPath: HiveTempPath | |
@transient hiveTmpPath: HiveTempPath |
import org.apache.spark.sql.hive.HiveExternalCatalog | ||
import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} | ||
|
||
class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move it to a new file?
@@ -105,4 +227,33 @@ trait V1WritesHiveUtils { | |||
.map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) | |||
.getOrElse(Map.empty) | |||
} | |||
|
|||
def setupCompression( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think setupHadoopConfForCompression
is more accurate.
} finally { | ||
// Attempt to delete the staging directory and the inclusive files. If failed, the files are | ||
// expected to be dropped at the normal termination of VM since deleteOnExit is used. | ||
deleteExternalTmpPath(hadoopConf) | ||
deleteExternalTmpPath(stagingDir, hadoopConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add def createTempPath()
and def deleteTempPath()
in HiveTempPath
? Then we don't even need to expose the stagingDir
, which makes the interface cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
} | ||
|
||
def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = { | ||
if (Option(path) != stagingDirForCreating) fs.delete(path, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more method for InsertIntoHiveDirCommand
so we can hide staging dir.
thanks, merging to master! |
@@ -294,3 +285,40 @@ case class InsertIntoHiveTable( | |||
override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable = | |||
copy(query = newChild) | |||
} | |||
|
|||
object InsertIntoHiveTable extends V1WritesHiveUtils with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we log inside this object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I missed to clean up it. Will remove it when I touch the related code
…ew query ### What changes were proposed in this pull request? This is the followup of #39277, does three things: - replace WriteFiles attribute exprId using new query to avoid potential issue - remove unnecessary explain info with `WriteFiles` - cleanup unnecessary `Logging` ### Why are the changes needed? Improve the implementation of `WriteFiles` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #39468 from ulysses-you/SPARK-41708-followup. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… before write command ### What changes were proposed in this pull request? This is a followup of #39277 . With planned write, the write command requires neither columnar nor row-based execution. It invokes a new API `executeWrite`, which returns commit messages, not columnar or row-based data. This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case into consideration. ### Why are the changes needed? If people replaces `WriteFilesExec` with a columnar version, the plan can't be executed due to an extra columnar to row transition between `WriteFilesExee` and the write command. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes #39922 from cloud-fan/write. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… before write command ### What changes were proposed in this pull request? This is a followup of #39277 . With planned write, the write command requires neither columnar nor row-based execution. It invokes a new API `executeWrite`, which returns commit messages, not columnar or row-based data. This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case into consideration. ### Why are the changes needed? If people replaces `WriteFilesExec` with a columnar version, the plan can't be executed due to an extra columnar to row transition between `WriteFilesExee` and the write command. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes #39922 from cloud-fan/write. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 56dd20f) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… before write command ### What changes were proposed in this pull request? This is a followup of apache#39277 . With planned write, the write command requires neither columnar nor row-based execution. It invokes a new API `executeWrite`, which returns commit messages, not columnar or row-based data. This PR updates `ApplyColumnarRulesAndInsertTransitions` to take this case into consideration. ### Why are the changes needed? If people replaces `WriteFilesExec` with a columnar version, the plan can't be executed due to an extra columnar to row transition between `WriteFilesExee` and the write command. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test Closes apache#39922 from cloud-fan/write. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 56dd20f) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This pr aims to pull out the v1write information from
V1WriteCommand
toWriteFiles
:Also, this pr do a cleanup for
WriteSpec
which is unnecessary.Why are the changes needed?
After this pr,
WriteFiles
will hold write information that can help developersDoes this PR introduce any user-facing change?
no
How was this patch tested?
Pass CI