Skip to content

Commit

Permalink
[SPARK-21400] Don't overwrite output committers on append (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Jul 15, 2017
1 parent 68ef3f5 commit fac457d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,40 +33,32 @@ class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Bo
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
val clazz = context.getConfiguration
.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

if (!isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the appending job fails.
// See SPARK-8578 for more details.
val configuration = context.getConfiguration
val clazz =
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
if (clazz != null) {
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

if (clazz != null) {
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(new Path(path), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
committer = ctor.newInstance()
}
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(path), context)
} else {
// The specified output committer is just an OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
} else {
val committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
committer
}
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
committer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -783,52 +783,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}

test("SPARK-8578 specified custom output committer will not be used to append data") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
val extraOptions = Map[String, String](
SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName,
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
"spark.sql.parquet.output.committer.class" ->
classOf[AlwaysFailParquetOutputCommitter].getName
)

val df = spark.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
// with file format and AlwaysFailOutputCommitter will not be used.
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
spark.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.options(extraOptions)
.load(dir.getCanonicalPath),
df.union(df))

// This will fail because AlwaysFailOutputCommitter is used when we do append.
intercept[Exception] {
df.write.mode("overwrite")
.options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath)
}
}
withTempPath { dir =>
// Because there is no existing data,
// this append will fail because AlwaysFailOutputCommitter is used when we do append
// and there is no existing data.
intercept[Exception] {
df.write.mode("append")
.options(extraOptions)
.format(dataSourceName)
.save(dir.getCanonicalPath)
}
}
}
}

test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
val df = Seq(
(1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
Expand Down

0 comments on commit fac457d

Please sign in to comment.