Skip to content

Commit

Permalink
[SPARK-10063][SQL] Remove DirectParquetOutputCommitter
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Apr 7, 2016
1 parent 21d5ca1 commit 8719c26
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 190 deletions.
33 changes: 0 additions & 33 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1466,37 +1466,6 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.output.committer.class</code></td>
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
<td>
<p>
The output committer class used by Parquet. The specified class needs to be a subclass of
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
</p>
<p>
<b>Note:</b>
<ul>
<li>
This option is automatically ignored if <code>spark.speculation</code> is turned on.
</li>
<li>
This option must be set via Hadoop <code>Configuration</code> rather than Spark
<code>SQLConf</code>.
</li>
<li>
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
</li>
</ul>
</p>
<p>
Spark SQL comes with a builtin
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
efficient then the default Parquet output committer when writing data to S3.
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.mergeSchema</code></td>
<td><code>false</code></td>
Expand Down Expand Up @@ -2165,8 +2134,6 @@ options.
- In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains
unchanged.
- The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).
- It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe
and thus this output committer will not be used when speculation is on, independent of configuration.
- JSON data source will not automatically load new files that are created by other applications
(i.e. files that are not inserted to the dataset through Spark SQL).
For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer(
outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
// Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
if (outputCommitter.getClass.getName.contains("Direct")) {
// SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
"the first error.\n File exists error: " + e)
"the first error.\n File exists error: " + e.getLocalizedMessage, e)
} else {
throw e
}
throw e
}
}

Expand All @@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else if (speculationEnabled) {
// When speculation is enabled, it's not safe to use customized output committer classes,
// especially direct output committers (e.g. `DirectParquetOutputCommitter`).
//
// See SPARK-9899 for more details.
logInfo(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
val configuration = context.getConfiguration
val committerClass = configuration.getClass(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ private[sql] class DefaultSource

val conf = ContextUtil.getConfiguration(job)

// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[DirectParquetOutputCommitter].getCanonicalName)
}

val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,55 +445,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(hadoopConfiguration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
classOf[DirectParquetOutputCommitter].getCanonicalName)
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}

testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") {
val clonedConf = new Configuration(hadoopConfiguration)

// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}


test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)
Expand Down

0 comments on commit 8719c26

Please sign in to comment.