From 227ff647550967ca98405fdf487f2159621a65a0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 17 Jan 2017 10:57:22 -0800 Subject: [PATCH] [SC-5557] Initial implementation of directory level atomicity (omits delete hook) ## What changes were proposed in this pull request? This implements a file commit protocol optimized for cloud storage. Files are written directly to their final locations. Their commit status is determined by the presence of specially named marker files. Job commit proceeds as follows: 1) When tasks request a file to write, we create a `_started_$txnId` marker in the output directory. The output files are hidden from readers while the start transaction marker is present. 2) We commit the job by replacing the _started marker with a `_committed_$txnId` file that contains a list of files added and removed in that directory. The protocol is fail-open. That is, if a start marker is not present, we assume the file is committed. Note that this is only atomic per-directory, and may suffer from update anomalies if there are multiple concurrent writers. To clean up garbage files, we provide the sql VACUUM command that takes a time horizon. `VACUUM '/path/to/directory' [RETAIN HOURS]` will remove garbage / uncommitted files after the specified number of hours, defaulting to 48. ### Config flags: spark.sql.sources.commitProtocolClass -- commit protocol (default DatabricksAtomicCommitProtocol) com.databricks.sql.enableFilterUncommitted -- whether to enable the read protocol (default true) com.databricks.sql.ignoreCorruptCommitMarkers -- whether to ignore unreadable commit markers rather than raising error (default false) ## How was this patch tested? Unit tests. TODO(ekl): randomized tests in a follow-up to verify correctness Author: Eric Liang Author: Eric Liang Closes #164 from ericl/directory-atomicity. --- .../spark/sql/catalyst/parser/SqlBase.g4 | 5 + .../DatabricksAtomicCommitProtocol.scala | 240 +++++++++++++ .../DatabricksAtomicReadProtocol.scala | 282 +++++++++++++++ .../spark/sql/execution/SparkSqlParser.scala | 12 + .../sql/execution/command/VacuumCommand.scala | 35 ++ .../PartitioningAwareFileIndex.scala | 9 +- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../DatabricksAtomicCommitProtocolSuite.scala | 339 ++++++++++++++++++ .../datasources/HadoopFsRelationSuite.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 122 ++++--- .../ParquetPartitionDiscoverySuite.scala | 9 +- .../parquet/ParquetQuerySuite.scala | 5 + .../hive/orc/OrcHadoopFsRelationSuite.scala | 4 +- .../sql/sources/HadoopFsRelationTest.scala | 77 ++-- .../ParquetHadoopFsRelationSuite.scala | 6 +- 15 files changed, 1048 insertions(+), 103 deletions(-) create mode 100644 sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala create mode 100644 sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala create mode 100644 sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index fccd6b2302021..7f85a82c224ad 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -150,6 +150,7 @@ statement | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration + | VACUUM path=STRING (RETAIN number HOURS)? #vacuumPath | unsupportedHiveNativeCommands .*? #failNativeCommand ; @@ -701,6 +702,7 @@ nonReserved | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN | UNBOUNDED | WHEN | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP + | VACUUM | RETAIN | HOURS ; SELECT: 'SELECT'; @@ -810,6 +812,9 @@ START: 'START'; TRANSACTION: 'TRANSACTION'; COMMIT: 'COMMIT'; ROLLBACK: 'ROLLBACK'; +VACUUM: 'VACUUM'; +RETAIN: 'RETAIN'; +HOURS: 'HOURS'; MACRO: 'MACRO'; IF: 'IF'; diff --git a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala new file mode 100644 index 0000000000000..2b150e5dec797 --- /dev/null +++ b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala @@ -0,0 +1,240 @@ +/* Copyright © 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package org.apache.spark.sql.transaction + +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.util.control.NonFatal + +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapreduce._ +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.SparkSession + +/** + * File commit protocol optimized for cloud storage. Files are written directly to their final + * locations. Their commit status is determined by the presence of specially named marker files. + * + * Job commit proceeds as follows: + * + * 1) When tasks request a file to write, we create a `_started_$txnId` marker in the output + * directory. The output files, which have $txnId embedded in their name, are hidden from + * readers while the start marker is present. + * 2) We commit the job by creating a new `_committed_$txnId` marker that contains a list of + * files added and removed in that directory. + * + * Note that this is only atomic per-directory, and that we only provide snapshot isolation and + * not serializability. + */ +class DatabricksAtomicCommitProtocol(jobId: String, path: String) + extends FileCommitProtocol with Serializable with Logging { + + import FileCommitProtocol._ + import DatabricksAtomicReadProtocol._ + import DatabricksAtomicCommitProtocol._ + + // Globally unique alphanumeric string. We decouple this from jobId for possible future use. + private val txnId: TxnId = math.abs(scala.util.Random.nextLong).toString + + // The list of files staged by this committer. These are collected to the driver on task commit. + private val stagedFiles = mutable.Set[String]() + + // The list of files staged for deletion by the driver. + @transient private val stagedDeletions = mutable.Set[Path]() + + override def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + if (dir.isDefined) { + newTaskTempFileAbsPath(taskContext, new Path(path, dir.get).toString, ext) + } else { + newTaskTempFileAbsPath(taskContext, new Path(path).toString, ext) + } + } + + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + val filename = getFilename(taskContext, ext) + val finalPath = new Path(absoluteDir, filename) + val fs = finalPath.getFileSystem(taskContext.getConfiguration) + val startMarker = new Path(finalPath.getParent, new Path(s"_started_$txnId")) + if (!fs.exists(startMarker)) { + fs.create(startMarker, true).close() + logDebug("Created start marker: " + startMarker) + } + stagedFiles += finalPath.toString + finalPath.toString + } + + private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + + // Include the job and task attempt ids so that file writes never collide. + val taskAttemptId = taskContext.getTaskAttemptID.getId + + // e.g. part-00001-tid-177723428-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.gz.parquet + f"part-$split%05d-tid-$txnId-$jobId-$taskAttemptId$ext" + } + + override def setupJob(jobContext: JobContext): Unit = { + val root = new Path(path) + root.getFileSystem(jobContext.getConfiguration).mkdirs(root) + } + + override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + logInfo("Committing job " + jobId) + val root = new Path(path) + val fs = root.getFileSystem(jobContext.getConfiguration) + def qualify(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + + // Collects start markers and staged task files. + taskCommits.foreach { t => + val task = t.obj.asInstanceOf[this.type] + stagedFiles ++= task.stagedFiles + } + + val addedByDir = stagedFiles.toSeq.map(new Path(_)).map(qualify) + .groupBy(_.getParent).map(kv => (kv._1, kv._2.map(_.getName.toString))) + + val removedByDir = stagedDeletions.toSeq.map(qualify) + .groupBy(_.getParent).map(kv => (kv._1, kv._2.map(_.getName.toString))) + + // Commit each updated directory in parallel. + val dirs = (addedByDir.keys ++ removedByDir.keys).toSet.par + dirs.tasksupport = DatabricksAtomicCommitProtocol.tasksupport + dirs.foreach { dir => + val commitMarker = new Path(dir, s"_committed_$txnId") + val output = fs.create(commitMarker) + try { + serializeFileChanges( + addedByDir.getOrElse(dir, Nil), removedByDir.getOrElse(dir, Nil), output) + } finally { + output.close() + } + // We don't delete the start marker here since from a correctness perspective, it is + // possible a concurrent reader sees neither the start nor end marker even with a re-list + } + logInfo("Job commit completed for " + jobId) + } + + override def abortJob(jobContext: JobContext): Unit = { + /* no-op */ + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + /* no-op */ + } + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { + new TaskCommitMessage(this) + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + // We must leave the start markers since other stray tasks may be writing to this same + // directory, and we need to ensure their files stay hidden. + stagedFiles.map(new Path(_)).foreach { f => + val fs = f.getFileSystem(taskContext.getConfiguration) + fs.delete(f, false) + } + } +} + +object DatabricksAtomicCommitProtocol extends Logging { + import DatabricksAtomicReadProtocol._ + + private val sparkSession = SparkSession.builder.getOrCreate() + + import scala.collection.parallel.ThreadPoolTaskSupport + import java.util.concurrent.{LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit} + + private lazy val tasksupport = new ThreadPoolTaskSupport({ + val pool = new ThreadPoolExecutor( + 100, + 100, + 100L, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable]) + pool.setThreadFactory(new ThreadFactory { + override def newThread(task: Runnable): Thread = { + val thread = new Thread(task, "DatabricksAtomicCommitProtocolWorker") + thread.setDaemon(true) + thread + } + }) + pool + }) + + /** + * Traverses the given directories and cleans up uncommitted or garbage files and markers. A + * horizon may be specified beyond which we assume pending jobs have failed. Files written by + * those jobs will be removed as well. Vacuuming will be done in parallel if possible. + * + * @return the list of deleted files + */ + def vacuum(path: Path, horizon: Long): List[Path] = { + val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + val (dirs, initialFiles) = fs.listStatus(path).partition(_.isDirectory) + + def checkPositive(time: Long): Long = { assert(time > 0); time } + var deletedPaths: List[Path] = Nil + def delete(p: Path): Unit = { + deletedPaths ::= p + fs.delete(p, false) + } + + val (state, resolvedFiles) = resolveCommitState(fs, path, initialFiles) + + // remove uncommitted and timed-out file outputs + for (file <- resolvedFiles) { + file.getPath.getName match { + // we wait for a horizon to avoid killing Spark jobs using those files + case name if state.getDeletionTime(name) > 0 && state.getDeletionTime(name) < horizon => + logInfo(s"Garbage collecting ${file.getPath} since it is marked as deleted.") + delete(file.getPath) + + case name @ FILE_WITH_TXN_ID(txnId) if state.isCommitted(txnId) && + !state.isFileCommitted(txnId, name) => + logInfo(s"Garbage collecting ${file.getPath} since it was written by a failed task.") + delete(file.getPath) + + case name @ FILE_WITH_TXN_ID(txnId) if !state.isCommitted(txnId) && + checkPositive(state.getStartTime(txnId)) < horizon => + logInfo(s"Garbage collecting ${file.getPath} since its job has timed out " + + s"(${state.getStartTime(txnId)} < $horizon).") + delete(file.getPath) + + case STARTED_MARKER(txnId) if state.isCommitted(txnId) && + checkPositive(file.getModificationTime) < horizon => + logInfo(s"Garbage collecting start marker ${file.getPath} of committed job.") + delete(file.getPath) + + case _ => + } + } + + // recurse + for (d <- dirs) { + deletedPaths :::= vacuum(d.getPath, horizon) + if (fs.listStatus(d.getPath).isEmpty) { + logInfo(s"Garbage collecting empty directory ${d.getPath}") + delete(d.getPath) + } + } + + deletedPaths + } +} diff --git a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala new file mode 100644 index 0000000000000..8165f538548c7 --- /dev/null +++ b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala @@ -0,0 +1,282 @@ +/* Copyright © 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package org.apache.spark.sql.transaction + +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.util.control.NonFatal + +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapreduce._ +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, SystemClock} + +/** + * Read-side support for DatabricksAtomicCommitProtocol. + */ +object DatabricksAtomicReadProtocol extends Logging { + type TxnId = String + + val STARTED_MARKER = "_started_(.*)".r + val COMMITTED_MARKER = "_committed_(.*)".r + val FILE_WITH_TXN_ID = "[^_].*-tid-([a-z0-9]+)-.*".r + + private implicit val formats = Serialization.formats(NoTypeHints) + + // Visible for testing. + private[spark] var testingFs: Option[FileSystem] = None + + // Visible for testing. + private[spark] var clock: Clock = new SystemClock + + /** + * Given a directory listing, filters out files that are uncommitted. A file is considered + * committed if it is named in a `_committed-$txnId` marker, OR if there is no corresponding + * `_committed-$txnId` or `_started-$txnId` marker for the file. + * + * @return the filtered list of files + */ + def filterDirectoryListing( + fs: FileSystem, dir: Path, initialFiles: Seq[FileStatus]): Seq[FileStatus] = { + // we use SparkEnv for this escape-hatch flag since this may be called on executors + if (!SparkEnv.get.conf.getBoolean("com.databricks.sql.enableFilterUncommitted", true)) { + return initialFiles + } + + val (state, resolvedFiles) = resolveCommitState(testingFs.getOrElse(fs), dir, initialFiles) + resolvedFiles.filter { f => + val name = f.getPath.getName + name match { + case _ if state.getDeletionTime(name) > 0 => + logInfo(s"Ignoring ${f.getPath} since it is marked as deleted.") + false + case FILE_WITH_TXN_ID(txnId) if !state.isFileCommitted(txnId, name) => + logInfo(s"Ignoring ${f.getPath} since it is not marked as committed.") + false + case _ => + true + } + } + } + + /** + * Holds the parsed commit state of files local to a single directory. + * + * @param lastModified max modification time of files in this dir + * @param trackedFiles list of all files with txn ids + * @param startMarkers set of start markers found, and their creation times + * @param commitMarkers set of commit markers found, and their added files + * @param corruptCommitMarkers set of commit markers we could not read + * @param deletedFiles set of files marked as deleted by committed transactions + */ + private[transaction] class CommitState( + val lastModified: Long, + trackedFiles: Map[String, TxnId], + startMarkers: Map[TxnId, Long], + commitMarkers: Map[TxnId, Set[String]], + corruptCommitMarkers: Set[TxnId], + deletedFiles: Map[String, Long]) { + + // The set of transaction ids from which we might be missing start markers. + val missingMarkers: Set[TxnId] = { + trackedFiles.values.toSet.diff( + (startMarkers.keys ++ commitMarkers.keys ++ corruptCommitMarkers).toSet) + } + + // The set of files which are should be present but are missing. + val missingDataFiles: Set[String] = { + commitMarkers.values.flatten.toSet -- trackedFiles.keys -- deletedFiles.keys + } + + /** + * @return whether the given transaction has committed (untracked txns are also committed). + */ + def isCommitted(txnId: TxnId): Boolean = { + commitMarkers.contains(txnId) || !startMarkers.contains(txnId) + } + + /** + * @return whether the given file is committed (untracked files are also considered committed). + */ + def isFileCommitted(txnId: TxnId, filename: String): Boolean = { + isCommitted(txnId) && + (!commitMarkers.contains(txnId) || commitMarkers(txnId).contains(filename)) + } + + /** + * @return the approximate start timestamp of the pending transaction, otherwise throws. + */ + def getStartTime(txnId: TxnId): Long = startMarkers(txnId) + + /** + * @return the deletion time of the file, or zero if it is not marked as deleted. + */ + def getDeletionTime(filename: String): Long = deletedFiles.getOrElse(filename, 0L) + } + + /** + * Given the list of files in a directory, parses and returns the per-file commit state. This + * may require addition IOs to resolve apparent write reordering and read commit file contents. + * + * Details on apparent write reordering: + * + * S3 will (soon) provide consistent LIST-after-PUT for single keys. This does not mean that + * readers will observe writes in order, however, due to the lack of snapshot isolation within + * a single LIST operation. + * + * Write order visibility is a problem if a start marker PUT get re-ordered after a data file + * write from the reader perspective. To work around this issue, we list the directory again + * if a start marker is suspected to be missing. + * + * The same issue can occur with data file writes re-ordered after commit marker creation. In + * this situation we also must re-list if data files are suspected to be missing. + */ + private[transaction] def resolveCommitState( + fs: FileSystem, + dir: Path, + initialFiles: Seq[FileStatus]): (CommitState, Seq[FileStatus]) = { + val state = resolveCommitState0(fs, dir, initialFiles) + + // Optimization: can assume the list request was atomic if the files have not changed recently. + val horizonMillis = SparkEnv.get.conf.getLong( + "com.databricks.sql.writeReorderingHorizonMillis", 5 * 60 * 1000) + + if ((state.missingMarkers.nonEmpty || state.missingDataFiles.nonEmpty) && + state.lastModified > clock.getTimeMillis - horizonMillis) { + logDebug("Repeating list request since some files are suspected to be missing.") + val newlyCommitted = mutable.Set[TxnId]() + val extraStatuses = fs.listStatus(dir).filter { f => + f.isFile && (f.getPath.getName match { + case COMMITTED_MARKER(txnId) if + f.getLen > 0 && state.missingMarkers.contains(txnId) => + // We choose to drop all files from transactions that committed during the re-list. + // Otherwise, we'd have to do another round of re-listing to resolve ordering issues. + newlyCommitted += txnId + false + case STARTED_MARKER(txnId) if state.missingMarkers.contains(txnId) => true + case name @ FILE_WITH_TXN_ID(_) if state.missingDataFiles.contains(name) => true + case _ => false + }) + } + + // log a debug message if data files are still missing + state.missingDataFiles.diff(extraStatuses.map(_.getPath.getName).toSet) match { + case missing if missing.nonEmpty => + logWarning( + "These files are still missing after a re-list (maybe manually deleted): " + missing) + case _ => + } + + if (extraStatuses.nonEmpty || newlyCommitted.nonEmpty) { + if (extraStatuses.nonEmpty) { + logWarning( + "Found these missing files on the second read: " + extraStatuses.map(_.getPath).toSeq) + } + if (newlyCommitted.nonEmpty) { + logWarning( + "Found these newly committed jobs on the second read: " + newlyCommitted) + } + val newFiles = (initialFiles ++ extraStatuses).filter { f => + f.getPath.getName match { + case name @ FILE_WITH_TXN_ID(txnId) if newlyCommitted.contains(txnId) => false + case _ => true + } + } + (resolveCommitState0(fs, dir, newFiles), newFiles) + } else { + (state, initialFiles) + } + } else { + (state, initialFiles) + } + } + + private def resolveCommitState0( + fs: FileSystem, + dir: Path, + filesAndMarkers: Seq[FileStatus]): CommitState = { + + var lastModified: Long = 0L + val trackedFiles = mutable.Map[String, TxnId]() + val startMarkers = mutable.Map[TxnId, Long]() + val commitMarkers = mutable.Map[TxnId, Set[String]]() + val corruptCommitMarkers = mutable.Set[TxnId]() + val deletedFiles = mutable.Map[String, Long]() + + filesAndMarkers.foreach { stat => + if (stat.getModificationTime > lastModified) { + lastModified = stat.getModificationTime + } + stat.getPath.getName match { + // We ignore zero-length commit markers (this is a commonly observed symptom of DBFS + // cancellation bugs in practice). + case COMMITTED_MARKER(txnId) if stat.getLen > 0 => + val commitFile = new Path(dir, "_committed_" + txnId) + try { + val (filesAdded, filesRemoved) = deserializeFileChanges(fs.open(commitFile)) + filesRemoved.foreach { file => + assert(stat.getModificationTime > 0) + deletedFiles(file) = stat.getModificationTime + } + commitMarkers(txnId) = filesAdded.toSet + } catch { + case NonFatal(e) => + // we use SparkEnv for this escape-hatch flag since this may be called on executors + if (SparkEnv.get.conf.getBoolean( + "com.databricks.sql.ignoreCorruptCommitMarkers", false)) { + logWarning("Failed to read job commit marker: " + stat, e) + corruptCommitMarkers += txnId + } else { + throw new IOException("Failed to read job commit marker: " + stat, e) + } + } + + case STARTED_MARKER(txnId) => + assert(stat.getModificationTime > 0) + startMarkers(txnId) = stat.getModificationTime + + case FILE_WITH_TXN_ID(txnId) => + trackedFiles(stat.getPath.getName) = txnId + + case _ => + } + } + + new CommitState( + lastModified, + trackedFiles.toMap, + startMarkers.toMap, + commitMarkers.toMap, + corruptCommitMarkers.toSet, + deletedFiles.toMap) + } + + def serializeFileChanges( + filesAdded: Seq[String], filesRemoved: Seq[String], out: OutputStream): Unit = { + val changes = Map("added" -> filesAdded, "removed" -> filesRemoved) + logDebug("Writing out file changes: " + changes) + Serialization.write(changes, out) + } + + def deserializeFileChanges(in: InputStream): (Seq[String], Seq[String]) = { + val reader = new InputStreamReader(in, StandardCharsets.UTF_8) + try { + val changes = Serialization.read[Map[String, Any]](reader) + (changes("added").asInstanceOf[Seq[String]], changes("removed").asInstanceOf[Seq[String]]) + } finally { + reader.close() + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 67d9a4fb33cce..086694f2b6a01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -86,6 +86,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ResetCommand } + /** + * Create a [[VacuumPath]] logical plan. + * Example SQL : + * {{{ + * VACUUM path [RETAIN number HOURS]; + * }}} + */ + override def visitVacuumPath( + ctx: VacuumPathContext): LogicalPlan = withOrigin(ctx) { + VacuumPath(string(ctx.path), Option(ctx.number).map(_.getText.toDouble).getOrElse(48.0)) + } + /** * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. * Example SQL for analyzing table : diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala new file mode 100644 index 0000000000000..58954858d05c8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.transaction.DatabricksAtomicCommitProtocol +import org.apache.spark.sql.types._ + +case class VacuumPath(path: String, horizonHours: Double) extends RunnableCommand { + override val output: Seq[Attribute] = + Seq(AttributeReference("path", StringType, nullable = true)()) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val horizon = System.currentTimeMillis - (horizonHours * 60 * 60 * 1000).toLong + DatabricksAtomicCommitProtocol.vacuum(new Path(path), horizon).map(p => Row(p.toString)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 825a0f70dda64..2fe9fe9b79f22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -30,6 +30,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.transaction._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -391,7 +392,10 @@ object PartitioningAwareFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { + val statuses = try { + // This is a hack to inject a filesystem for testing purposes. + DatabricksAtomicReadProtocol.testingFs.getOrElse(fs).listStatus(path) + } catch { case _: FileNotFoundException => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] @@ -405,7 +409,8 @@ object PartitioningAwareFileIndex extends Logging { case _ => dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) } - val allFiles = topLevelFiles ++ nestedFiles + val allFiles = DatabricksAtomicReadProtocol.filterDirectoryListing( + fs, path, topLevelFiles) ++ nestedFiles if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 34c74868146e6..ceb472e32000e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -30,8 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol +import org.apache.spark.sql.transaction.DatabricksAtomicCommitProtocol import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -380,7 +380,7 @@ object SQLConf { SQLConfigBuilder("spark.sql.sources.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName) + .createWithDefault(classOf[DatabricksAtomicCommitProtocol].getName) val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") diff --git a/sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala b/sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala new file mode 100644 index 0000000000000..392ebfe0d324b --- /dev/null +++ b/sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala @@ -0,0 +1,339 @@ +/* + * Copyright © 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.apache.spark.sql.transaction + +import java.io._ + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.InMemoryFileIndex +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.{ManualClock, SystemClock} + +class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContext { + test("read protocol ignores uncommitted jobs") { + withTempDir { dir => + create(dir, "_started_12345") + create(dir, "part-r-00001-tid-77777-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + create(dir, "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 1) + } + } + + test("read protocol can be flag disabled") { + withTempDir { dir => + create(dir, "_started_12345") + create(dir, "part-r-00001-tid-77777-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + create(dir, "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + try { + SparkEnv.get.conf.set("com.databricks.sql.enableFilterUncommitted", "false") + assert(spark.read.csv(dir.getAbsolutePath).count == 2) + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + } finally { + SparkEnv.get.conf.remove("com.databricks.sql.enableFilterUncommitted") + } + } + } + + test("read protocol ignores uncommitted files of committed job") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, f3) + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 3) + create(dir, "_committed_12345", s"""{"added": ["$f1", "$f2"], "removed": []}""") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + create(dir, "_started_12345") // shouldn't matter if _started marker exists + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + } + } + + test("read protocol ignores committed deletes") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f4 = "foo" + create(dir, f1) + create(dir, f2) + create(dir, f3) + create(dir, f4) + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 4) + create(dir, "_committed_12345", s"""{"added": ["$f1", "$f2", "$f3"], "removed": []}""") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 4) + create(dir, "_committed_99999", s"""{"added": ["foo"], "removed": ["$f3", "$f4"]}""") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + } + } + + test("vacuum removes files from failed tasks immediately") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, "_committed_12345", s"""{"added": ["$f2"], "removed": []}""") + assert(list(dir) == Set(f1, f2, "_committed_12345")) + // removes f1 + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 1) + assert(list(dir) == Set(f2, "_committed_12345")) + // removes nothing + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 0) + assert(list(dir) == Set(f2, "_committed_12345")) + } + } + + test("vacuum removes uncommitted files after timeout") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-9999-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, f3) + create(dir, "_started_9999") + create(dir, "_started_55555") + create(dir, "_committed_55555", s"""{"added": [], "removed": []}""") + create(dir, "_committed_12345", s"""{"added": ["$f2"], "removed": []}""") + assert(list(dir).size == 7) + // removes f1, since this is immediately useless + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 1) + assert(list(dir) == + Set(f2, f3, "_started_9999", "_started_55555", "_committed_55555", "_committed_12345")) + // removes f3, unnecessary start marker after horizon + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 2) + assert(list(dir) == + Set(f2, "_started_9999", "_committed_55555", "_committed_12345")) + } + } + + test("vacuum removes deleted files after timeout") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, f3) + create(dir, "_committed_99999", s"""{"added": ["foo"], "removed": ["$f3"]}""") + assert(list(dir).size == 4) + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 0) + assert(list(dir).size == 4) + // removes f3 + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 1) + assert(list(dir) == Set(f1, f2, "_committed_99999")) + } + } + + test("zero-length commit markers are ignored") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, "_committed_12345", "") + assert(spark.read.csv(dir.getAbsolutePath).count == 1) + } + } + + test("zero-length commit markers mean txn is pending") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, "_started_12345") + create(dir, "_committed_12345", "") + // nothing removed with normal horizon since the txn counts as pending + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 0) + // removes f1 + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 1) + } + } + + test("corrupt commit markers raises error unless configured not to") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, "_committed_12345", "corrupt_file") + val error = intercept[IOException] { + spark.read.csv(dir.getAbsolutePath) + } + assert(error.getMessage.contains("Failed to read job commit marker")) + try { + SparkEnv.get.conf.set("com.databricks.sql.ignoreCorruptCommitMarkers", "true") + assert(spark.read.csv(dir.getAbsolutePath).count == 1) + } finally { + SparkEnv.get.conf.remove("com.databricks.sql.ignoreCorruptCommitMarkers") + } + } + } + + test("reader re-lists directory when files may be missing from the initial list") { + withTempDir { dir => + var listCount = 0 + val fs = new RawLocalFileSystem() { + override def listStatus(path: Path): Array[FileStatus] = { + listCount += 1 + super.listStatus(path) + } + } + fs.initialize(dir.toURI, new Configuration()) + val testPath = new Path(dir.getAbsolutePath) + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + + // should trigger a re-list since f1 had no associated marker + val in1 = fs.listStatus(testPath) + listCount = 0 + val (out1, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 1) + assert(out1.isCommitted("12345")) // couldn't find any marker, assumed committed + + // should trigger a re-list that picks up _started_12345 + create(dir, "_started_12345") + listCount = 0 + val in2 = in1 + val (out2, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in2) + assert(listCount == 1) + assert(!out2.isCommitted("12345")) // marker found on the second list + + // should NOT trigger a re-list since f1 had an associated marker + val in3 = fs.listStatus(testPath) + listCount = 0 + val (out3, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in3) + assert(listCount == 0) + assert(!out3.isCommitted("12345")) + + // also should not trigger a re-list + fs.delete(new Path(testPath, "_started_12345"), false) + create(dir, "_committed_12345", s"""{"added": [], "removed": []}""") + val in4 = fs.listStatus(testPath) + listCount = 0 + val (out4, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in4) + assert(listCount == 0) + assert(out4.isCommitted("12345")) + + // should trigger a re-list that picks up f2 + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, "_committed_12345", s"""{"added": ["$f2"], "removed": []}""") + val in5 = fs.listStatus(testPath) + create(dir, f2) + listCount = 0 + val (out5, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in5) + assert(listCount == 1) + assert(out5.isCommitted("12345")) + assert(out5.isFileCommitted("12345", f2)) + } + } + + test("re-list is avoided after a grace period") { + withTempDir { dir => + var listCount = 0 + val fs = new RawLocalFileSystem() { + override def listStatus(path: Path): Array[FileStatus] = { + listCount += 1 + super.listStatus(path) + } + } + fs.initialize(dir.toURI, new Configuration()) + val testPath = new Path(dir.getAbsolutePath) + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + + try { + val clock = new ManualClock(System.currentTimeMillis) + DatabricksAtomicReadProtocol.clock = clock + + // should trigger a re-list since f1 had no associated marker + val in1 = fs.listStatus(testPath) + listCount = 0 + DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 1) + + clock.advance(3 * 60 * 1000) + DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 2) + + clock.advance(3 * 60 * 1000) + DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 2) + } finally { + DatabricksAtomicReadProtocol.clock = new SystemClock + } + } + } + + test("randomized consistency stress test") { + val seed = System.currentTimeMillis + val random = new scala.util.Random(seed) + // scalastyle:off println + println("Random seed used was: " + seed) + // scalastyle:on println + + /** + * Emulates S3 list consistency guarantees. We assume read-after-write for single keys, + * however a list call is not atomic and so may observe writes out of order. + */ + var numLists = 0 + val inconsistentFs = new RawLocalFileSystem() { + val consistentFiles = mutable.Set[Path]() + override def listStatus(path: Path): Array[FileStatus] = { + numLists += 1 + super.listStatus(path).filter { stat => + stat.getPath match { + case path if consistentFiles.contains(path) => true + case path => + consistentFiles.add(path) + random.nextDouble > 0.5 // emulate write re-ordering + } + } + } + } + def countFiles(dir: File): Long = { + val idx = new InMemoryFileIndex(spark, Seq(new Path(dir.getAbsolutePath)), Map.empty, None) + idx.allFiles().length + } + inconsistentFs.initialize(new File("/").toURI, new Configuration()) + + // tests retry on missing data file or start marker + for (i <- 1 to 10) { + withTempDir { dir => + spark.range(10).repartition(3).write.mode("overwrite").parquet(dir.getAbsolutePath) + try { + DatabricksAtomicReadProtocol.testingFs = Some(inconsistentFs) + assert(Set(0L, 3L).contains(countFiles(dir))) // should never see {1, 2} + assert(countFiles(dir) == 3) + } finally { + DatabricksAtomicReadProtocol.testingFs = None + } + } + } + + // check we actually used the inconsistent test fs + assert(numLists >= 10) + } + + private def create(dir: File, name: String, contents: String = "foo"): Unit = { + val printWriter = new PrintWriter(new File(dir, name)) + try { + printWriter.print(contents) + } finally { + printWriter.close() + } + } + + private def list(dir: File): Set[String] = { + dir.listFiles().map(_.getName).filterNot(_.startsWith(".")).toSet + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index 89d57653adcbd..631176d60142a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -31,7 +31,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { // ignore hidden files val allFiles = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !name.startsWith(".") + !name.startsWith(".") && !name.startsWith("_") } }) val totalSize = allFiles.map(_.length()).sum diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 716269401fe1a..aa40b4623ec47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -466,16 +467,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { - val extraOptions = Map( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, - "spark.sql.parquet.output.committer.class" -> - classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName - ) - withTempPath { dir => - val message = intercept[SparkException] { - spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(message === "Intentional exception for testing purposes") + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + val extraOptions = Map( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, + "spark.sql.parquet.output.committer.class" -> + classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName + ) + withTempPath { dir => + val message = intercept[SparkException] { + spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(message === "Intentional exception for testing purposes") + } } } @@ -492,58 +496,64 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - // Using a output committer that always fail when committing a task, so that both - // `commitTask()` and `abortTask()` are invoked. - val extraOptions = Map[String, String]( - "spark.sql.parquet.output.committer.class" -> - classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName - ) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // Using a output committer that always fail when committing a task, so that both + // `commitTask()` and `abortTask()` are invoked. + val extraOptions = Map[String, String]( + "spark.sql.parquet.output.committer.class" -> + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName + ) + + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. - // Before fixing SPARK-7837, the following code results in an NPE because both - // `commitTask()` and `abortTask()` try to close output writers. - - withTempPath { dir => - val m1 = intercept[SparkException] { - spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m1.contains("Intentional exception for testing purposes")) - } + withTempPath { dir => + val m1 = intercept[SparkException] { + spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m1.contains("Intentional exception for testing purposes")) + } - withTempPath { dir => - val m2 = intercept[SparkException] { - val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) - df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m2.contains("Intentional exception for testing purposes")) + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) + df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m2.contains("Intentional exception for testing purposes")) + } } } test("SPARK-11044 Parquet writer version fixed as version1 ") { - // For dictionary encoding, Parquet changes the encoding types according to its writer - // version. So, this test checks one of the encoding types in order to ensure that - // the file is written with writer version2. - val extraOptions = Map[String, String]( - // Write a Parquet file with writer version2. - ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, - // By default, dictionary encoding is enabled from Parquet 1.2.0 but - // it is enabled just in case. - ParquetOutputFormat.ENABLE_DICTIONARY -> "true" - ) - - val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) - - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/part-r-0.parquet" - spark.range(1 << 16).selectExpr("(id % 4) AS i") - .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) - - val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head - val columnChunkMetadata = blockMetadata.getColumns.asScala.head - - // If the file is written with version2, this should include - // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY - assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // For dictionary encoding, Parquet changes the encoding types according to its writer + // version. So, this test checks one of the encoding types in order to ensure that + // the file is written with writer version2. + val extraOptions = Map[String, String]( + // Write a Parquet file with writer version2. + ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, + // By default, dictionary encoding is enabled from Parquet 1.2.0 but + // it is enabled just in case. + ParquetOutputFormat.ENABLE_DICTIONARY -> "true" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part-r-0.parquet" + spark.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + + // If the file is written with version2, this should include + // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f433a74da8cb9..420cff878fa0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -455,7 +455,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha assert(partDf.schema.map(_.name) === Seq("intField", "stringField")) path.listFiles().foreach { f => - if (f.getName.toLowerCase().endsWith(".parquet")) { + if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file val df = spark.read.parquet(f.getCanonicalPath) assert(df.schema.map(_.name) === Seq("intField", "stringField")) @@ -463,7 +463,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } path.listFiles().foreach { f => - if (f.getName.toLowerCase().endsWith(".parquet")) { + if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file but `basePath` is overridden to // the base path containing partitioning directories val df = spark @@ -932,7 +932,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha withTempPath { dir => val path = dir.getCanonicalPath - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withSQLConf( + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + "spark.sql.sources.commitProtocolClass" -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { spark.range(3).write.parquet(s"$path/p0=0/p1=0") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c4a7d86f2bd3..ee7f2d060e275 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -178,6 +179,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } withSQLConf( + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName, SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true", ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true" @@ -186,6 +189,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } withSQLConf( + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName, SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false" ) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 463c368fc42b1..ff35956674f85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -95,7 +95,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { // Check if this is compressed as ZLIB. val conf = spark.sessionState.newHadoopConf() val fs = FileSystem.getLocal(conf) - val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) + val maybeOrcFile = new File(path).listFiles().find { f => + !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc") + } assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString val expectedCompressionKind = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 22f13a494cd4c..770c16a82fe3d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -784,44 +784,47 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("SPARK-8578 specified custom output committer will not be used to append data") { - val extraOptions = Map[String, String]( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, - // Since Parquet has its own output committer setting, also set it - // to AlwaysFailParquetOutputCommitter at here. - "spark.sql.parquet.output.committer.class" -> - classOf[AlwaysFailParquetOutputCommitter].getName - ) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + val extraOptions = Map[String, String]( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, + // Since Parquet has its own output committer setting, also set it + // to AlwaysFailParquetOutputCommitter at here. + "spark.sql.parquet.output.committer.class" -> + classOf[AlwaysFailParquetOutputCommitter].getName + ) - val df = spark.range(1, 10).toDF("i") - withTempPath { dir => - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - // Because there data already exists, - // this append should succeed because we will use the output committer associated - // with file format and AlwaysFailOutputCommitter will not be used. - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - checkAnswer( - spark.read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .load(dir.getCanonicalPath), - df.union(df)) - - // This will fail because AlwaysFailOutputCommitter is used when we do append. - intercept[Exception] { - df.write.mode("overwrite") - .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + val df = spark.range(1, 10).toDF("i") + withTempPath { dir => + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + // Because there data already exists, + // this append should succeed because we will use the output committer associated + // with file format and AlwaysFailOutputCommitter will not be used. + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + spark.read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .load(dir.getCanonicalPath), + df.union(df)) + + // This will fail because AlwaysFailOutputCommitter is used when we do append. + intercept[Exception] { + df.write.mode("overwrite") + .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + } } - } - withTempPath { dir => - // Because there is no existing data, - // this append will fail because AlwaysFailOutputCommitter is used when we do append - // and there is no existing data. - intercept[Exception] { - df.write.mode("append") - .options(extraOptions) - .format(dataSourceName) - .save(dir.getCanonicalPath) + withTempPath { dir => + // Because there is no existing data, + // this append will fail because AlwaysFailOutputCommitter is used when we do append + // and there is no existing data. + intercept[Exception] { + df.write.mode("append") + .options(extraOptions) + .format(dataSourceName) + .save(dir.getCanonicalPath) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 8aa018d0a9ee5..03207ab869d12 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -125,7 +126,10 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-8604: Parquet data source should write summary file while doing appending") { - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withSQLConf( + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(0, 5).toDF()