Skip to content

Commit

Permalink
Update SparkHadoopMapRedUtil.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed May 27, 2022
1 parent b4e11cc commit 7642cdb
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ object SparkHadoopMapRedUtil extends Logging {
val taskAttemptOutputs: Seq[Path] = if (shouldCoordinateWithTaskAttemptOutputs) {
committer match {
case f: FileOutputCommitter =>
val taskAttemptPath = f.getTaskAttemptPath(mrTaskContext)
if (f.isCommitJobRepeatable(mrTaskContext)) {
// If algorithmVersion is 2, Spark should get file under final output path.
def getTaskAttemptCommittedPaths(
Expand All @@ -102,6 +101,7 @@ object SparkHadoopMapRedUtil extends Logging {
}
}

val taskAttemptPath = f.getTaskAttemptPath(mrTaskContext)
val fs: FileSystem = taskAttemptPath.getFileSystem(mrTaskContext.getConfiguration)
try {
val taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath)
Expand All @@ -111,8 +111,8 @@ object SparkHadoopMapRedUtil extends Logging {
Seq.empty
}
} else {
// If algorithmVersion is 1, task commit final path is `taskAttemptPath`.
Seq(taskAttemptPath)
// If algorithmVersion is 1, task commit final path is `committedTaskAttemptPath`.
Seq(f.getCommittedTaskPath(mrTaskContext))
}
case _ => Seq.empty
}
Expand Down

0 comments on commit 7642cdb

Please sign in to comment.