diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala index 9b9ed28412cac..2a5579465ed8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala @@ -33,40 +33,32 @@ class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Bo extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging { override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { - var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) + val clazz = context.getConfiguration + .getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - if (!isAppend) { - // If we are appending data to an existing dir, we will only use the output committer - // associated with the file output format since it is not safe to use a custom - // committer for appending. For example, in S3, direct parquet output committer may - // leave partial data in the destination dir when the appending job fails. - // See SPARK-8578 for more details. - val configuration = context.getConfiguration - val clazz = - configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + if (clazz != null) { + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - if (clazz != null) { - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - committer = ctor.newInstance(new Path(path), context) - } else { - // The specified output committer is just an OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - committer = ctor.newInstance() - } + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + ctor.newInstance(new Path(path), context) + } else { + // The specified output committer is just an OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + ctor.newInstance() } + } else { + val committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context) + logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") + committer } - logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}") - committer } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index d23b66a5300e7..e0195715e5803 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -783,52 +783,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } - test("SPARK-8578 specified custom output committer will not be used to append data") { - withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> - classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { - val extraOptions = Map[String, String]( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, - // Since Parquet has its own output committer setting, also set it - // to AlwaysFailParquetOutputCommitter at here. - "spark.sql.parquet.output.committer.class" -> - classOf[AlwaysFailParquetOutputCommitter].getName - ) - - val df = spark.range(1, 10).toDF("i") - withTempPath { dir => - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - // Because there data already exists, - // this append should succeed because we will use the output committer associated - // with file format and AlwaysFailOutputCommitter will not be used. - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - checkAnswer( - spark.read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .load(dir.getCanonicalPath), - df.union(df)) - - // This will fail because AlwaysFailOutputCommitter is used when we do append. - intercept[Exception] { - df.write.mode("overwrite") - .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) - } - } - withTempPath { dir => - // Because there is no existing data, - // this append will fail because AlwaysFailOutputCommitter is used when we do append - // and there is no existing data. - intercept[Exception] { - df.write.mode("append") - .options(extraOptions) - .format(dataSourceName) - .save(dir.getCanonicalPath) - } - } - } - } - test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") { val df = Seq( (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),