diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index fccd6b2302021..7f85a82c224ad 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -150,6 +150,7 @@ statement | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration + | VACUUM path=STRING (RETAIN number HOURS)? #vacuumPath | unsupportedHiveNativeCommands .*? #failNativeCommand ; @@ -701,6 +702,7 @@ nonReserved | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN | UNBOUNDED | WHEN | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP + | VACUUM | RETAIN | HOURS ; SELECT: 'SELECT'; @@ -810,6 +812,9 @@ START: 'START'; TRANSACTION: 'TRANSACTION'; COMMIT: 'COMMIT'; ROLLBACK: 'ROLLBACK'; +VACUUM: 'VACUUM'; +RETAIN: 'RETAIN'; +HOURS: 'HOURS'; MACRO: 'MACRO'; IF: 'IF'; diff --git a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala new file mode 100644 index 0000000000000..2b150e5dec797 --- /dev/null +++ b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocol.scala @@ -0,0 +1,240 @@ +/* Copyright © 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package org.apache.spark.sql.transaction + +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.util.control.NonFatal + +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapreduce._ +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.SparkSession + +/** + * File commit protocol optimized for cloud storage. Files are written directly to their final + * locations. Their commit status is determined by the presence of specially named marker files. + * + * Job commit proceeds as follows: + * + * 1) When tasks request a file to write, we create a `_started_$txnId` marker in the output + * directory. The output files, which have $txnId embedded in their name, are hidden from + * readers while the start marker is present. + * 2) We commit the job by creating a new `_committed_$txnId` marker that contains a list of + * files added and removed in that directory. + * + * Note that this is only atomic per-directory, and that we only provide snapshot isolation and + * not serializability. + */ +class DatabricksAtomicCommitProtocol(jobId: String, path: String) + extends FileCommitProtocol with Serializable with Logging { + + import FileCommitProtocol._ + import DatabricksAtomicReadProtocol._ + import DatabricksAtomicCommitProtocol._ + + // Globally unique alphanumeric string. We decouple this from jobId for possible future use. + private val txnId: TxnId = math.abs(scala.util.Random.nextLong).toString + + // The list of files staged by this committer. These are collected to the driver on task commit. + private val stagedFiles = mutable.Set[String]() + + // The list of files staged for deletion by the driver. + @transient private val stagedDeletions = mutable.Set[Path]() + + override def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + if (dir.isDefined) { + newTaskTempFileAbsPath(taskContext, new Path(path, dir.get).toString, ext) + } else { + newTaskTempFileAbsPath(taskContext, new Path(path).toString, ext) + } + } + + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + val filename = getFilename(taskContext, ext) + val finalPath = new Path(absoluteDir, filename) + val fs = finalPath.getFileSystem(taskContext.getConfiguration) + val startMarker = new Path(finalPath.getParent, new Path(s"_started_$txnId")) + if (!fs.exists(startMarker)) { + fs.create(startMarker, true).close() + logDebug("Created start marker: " + startMarker) + } + stagedFiles += finalPath.toString + finalPath.toString + } + + private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + + // Include the job and task attempt ids so that file writes never collide. + val taskAttemptId = taskContext.getTaskAttemptID.getId + + // e.g. part-00001-tid-177723428-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.gz.parquet + f"part-$split%05d-tid-$txnId-$jobId-$taskAttemptId$ext" + } + + override def setupJob(jobContext: JobContext): Unit = { + val root = new Path(path) + root.getFileSystem(jobContext.getConfiguration).mkdirs(root) + } + + override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + logInfo("Committing job " + jobId) + val root = new Path(path) + val fs = root.getFileSystem(jobContext.getConfiguration) + def qualify(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + + // Collects start markers and staged task files. + taskCommits.foreach { t => + val task = t.obj.asInstanceOf[this.type] + stagedFiles ++= task.stagedFiles + } + + val addedByDir = stagedFiles.toSeq.map(new Path(_)).map(qualify) + .groupBy(_.getParent).map(kv => (kv._1, kv._2.map(_.getName.toString))) + + val removedByDir = stagedDeletions.toSeq.map(qualify) + .groupBy(_.getParent).map(kv => (kv._1, kv._2.map(_.getName.toString))) + + // Commit each updated directory in parallel. + val dirs = (addedByDir.keys ++ removedByDir.keys).toSet.par + dirs.tasksupport = DatabricksAtomicCommitProtocol.tasksupport + dirs.foreach { dir => + val commitMarker = new Path(dir, s"_committed_$txnId") + val output = fs.create(commitMarker) + try { + serializeFileChanges( + addedByDir.getOrElse(dir, Nil), removedByDir.getOrElse(dir, Nil), output) + } finally { + output.close() + } + // We don't delete the start marker here since from a correctness perspective, it is + // possible a concurrent reader sees neither the start nor end marker even with a re-list + } + logInfo("Job commit completed for " + jobId) + } + + override def abortJob(jobContext: JobContext): Unit = { + /* no-op */ + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + /* no-op */ + } + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { + new TaskCommitMessage(this) + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + // We must leave the start markers since other stray tasks may be writing to this same + // directory, and we need to ensure their files stay hidden. + stagedFiles.map(new Path(_)).foreach { f => + val fs = f.getFileSystem(taskContext.getConfiguration) + fs.delete(f, false) + } + } +} + +object DatabricksAtomicCommitProtocol extends Logging { + import DatabricksAtomicReadProtocol._ + + private val sparkSession = SparkSession.builder.getOrCreate() + + import scala.collection.parallel.ThreadPoolTaskSupport + import java.util.concurrent.{LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit} + + private lazy val tasksupport = new ThreadPoolTaskSupport({ + val pool = new ThreadPoolExecutor( + 100, + 100, + 100L, + TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable]) + pool.setThreadFactory(new ThreadFactory { + override def newThread(task: Runnable): Thread = { + val thread = new Thread(task, "DatabricksAtomicCommitProtocolWorker") + thread.setDaemon(true) + thread + } + }) + pool + }) + + /** + * Traverses the given directories and cleans up uncommitted or garbage files and markers. A + * horizon may be specified beyond which we assume pending jobs have failed. Files written by + * those jobs will be removed as well. Vacuuming will be done in parallel if possible. + * + * @return the list of deleted files + */ + def vacuum(path: Path, horizon: Long): List[Path] = { + val fs = path.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) + val (dirs, initialFiles) = fs.listStatus(path).partition(_.isDirectory) + + def checkPositive(time: Long): Long = { assert(time > 0); time } + var deletedPaths: List[Path] = Nil + def delete(p: Path): Unit = { + deletedPaths ::= p + fs.delete(p, false) + } + + val (state, resolvedFiles) = resolveCommitState(fs, path, initialFiles) + + // remove uncommitted and timed-out file outputs + for (file <- resolvedFiles) { + file.getPath.getName match { + // we wait for a horizon to avoid killing Spark jobs using those files + case name if state.getDeletionTime(name) > 0 && state.getDeletionTime(name) < horizon => + logInfo(s"Garbage collecting ${file.getPath} since it is marked as deleted.") + delete(file.getPath) + + case name @ FILE_WITH_TXN_ID(txnId) if state.isCommitted(txnId) && + !state.isFileCommitted(txnId, name) => + logInfo(s"Garbage collecting ${file.getPath} since it was written by a failed task.") + delete(file.getPath) + + case name @ FILE_WITH_TXN_ID(txnId) if !state.isCommitted(txnId) && + checkPositive(state.getStartTime(txnId)) < horizon => + logInfo(s"Garbage collecting ${file.getPath} since its job has timed out " + + s"(${state.getStartTime(txnId)} < $horizon).") + delete(file.getPath) + + case STARTED_MARKER(txnId) if state.isCommitted(txnId) && + checkPositive(file.getModificationTime) < horizon => + logInfo(s"Garbage collecting start marker ${file.getPath} of committed job.") + delete(file.getPath) + + case _ => + } + } + + // recurse + for (d <- dirs) { + deletedPaths :::= vacuum(d.getPath, horizon) + if (fs.listStatus(d.getPath).isEmpty) { + logInfo(s"Garbage collecting empty directory ${d.getPath}") + delete(d.getPath) + } + } + + deletedPaths + } +} diff --git a/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala new file mode 100644 index 0000000000000..8165f538548c7 --- /dev/null +++ b/sql/core/src/main/scala/com/databricks/sql/transaction/DatabricksAtomicReadProtocol.scala @@ -0,0 +1,282 @@ +/* Copyright © 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +package org.apache.spark.sql.transaction + +import java.io._ +import java.nio.charset.StandardCharsets + +import scala.collection.mutable +import scala.util.control.NonFatal + +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapreduce._ +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, SystemClock} + +/** + * Read-side support for DatabricksAtomicCommitProtocol. + */ +object DatabricksAtomicReadProtocol extends Logging { + type TxnId = String + + val STARTED_MARKER = "_started_(.*)".r + val COMMITTED_MARKER = "_committed_(.*)".r + val FILE_WITH_TXN_ID = "[^_].*-tid-([a-z0-9]+)-.*".r + + private implicit val formats = Serialization.formats(NoTypeHints) + + // Visible for testing. + private[spark] var testingFs: Option[FileSystem] = None + + // Visible for testing. + private[spark] var clock: Clock = new SystemClock + + /** + * Given a directory listing, filters out files that are uncommitted. A file is considered + * committed if it is named in a `_committed-$txnId` marker, OR if there is no corresponding + * `_committed-$txnId` or `_started-$txnId` marker for the file. + * + * @return the filtered list of files + */ + def filterDirectoryListing( + fs: FileSystem, dir: Path, initialFiles: Seq[FileStatus]): Seq[FileStatus] = { + // we use SparkEnv for this escape-hatch flag since this may be called on executors + if (!SparkEnv.get.conf.getBoolean("com.databricks.sql.enableFilterUncommitted", true)) { + return initialFiles + } + + val (state, resolvedFiles) = resolveCommitState(testingFs.getOrElse(fs), dir, initialFiles) + resolvedFiles.filter { f => + val name = f.getPath.getName + name match { + case _ if state.getDeletionTime(name) > 0 => + logInfo(s"Ignoring ${f.getPath} since it is marked as deleted.") + false + case FILE_WITH_TXN_ID(txnId) if !state.isFileCommitted(txnId, name) => + logInfo(s"Ignoring ${f.getPath} since it is not marked as committed.") + false + case _ => + true + } + } + } + + /** + * Holds the parsed commit state of files local to a single directory. + * + * @param lastModified max modification time of files in this dir + * @param trackedFiles list of all files with txn ids + * @param startMarkers set of start markers found, and their creation times + * @param commitMarkers set of commit markers found, and their added files + * @param corruptCommitMarkers set of commit markers we could not read + * @param deletedFiles set of files marked as deleted by committed transactions + */ + private[transaction] class CommitState( + val lastModified: Long, + trackedFiles: Map[String, TxnId], + startMarkers: Map[TxnId, Long], + commitMarkers: Map[TxnId, Set[String]], + corruptCommitMarkers: Set[TxnId], + deletedFiles: Map[String, Long]) { + + // The set of transaction ids from which we might be missing start markers. + val missingMarkers: Set[TxnId] = { + trackedFiles.values.toSet.diff( + (startMarkers.keys ++ commitMarkers.keys ++ corruptCommitMarkers).toSet) + } + + // The set of files which are should be present but are missing. + val missingDataFiles: Set[String] = { + commitMarkers.values.flatten.toSet -- trackedFiles.keys -- deletedFiles.keys + } + + /** + * @return whether the given transaction has committed (untracked txns are also committed). + */ + def isCommitted(txnId: TxnId): Boolean = { + commitMarkers.contains(txnId) || !startMarkers.contains(txnId) + } + + /** + * @return whether the given file is committed (untracked files are also considered committed). + */ + def isFileCommitted(txnId: TxnId, filename: String): Boolean = { + isCommitted(txnId) && + (!commitMarkers.contains(txnId) || commitMarkers(txnId).contains(filename)) + } + + /** + * @return the approximate start timestamp of the pending transaction, otherwise throws. + */ + def getStartTime(txnId: TxnId): Long = startMarkers(txnId) + + /** + * @return the deletion time of the file, or zero if it is not marked as deleted. + */ + def getDeletionTime(filename: String): Long = deletedFiles.getOrElse(filename, 0L) + } + + /** + * Given the list of files in a directory, parses and returns the per-file commit state. This + * may require addition IOs to resolve apparent write reordering and read commit file contents. + * + * Details on apparent write reordering: + * + * S3 will (soon) provide consistent LIST-after-PUT for single keys. This does not mean that + * readers will observe writes in order, however, due to the lack of snapshot isolation within + * a single LIST operation. + * + * Write order visibility is a problem if a start marker PUT get re-ordered after a data file + * write from the reader perspective. To work around this issue, we list the directory again + * if a start marker is suspected to be missing. + * + * The same issue can occur with data file writes re-ordered after commit marker creation. In + * this situation we also must re-list if data files are suspected to be missing. + */ + private[transaction] def resolveCommitState( + fs: FileSystem, + dir: Path, + initialFiles: Seq[FileStatus]): (CommitState, Seq[FileStatus]) = { + val state = resolveCommitState0(fs, dir, initialFiles) + + // Optimization: can assume the list request was atomic if the files have not changed recently. + val horizonMillis = SparkEnv.get.conf.getLong( + "com.databricks.sql.writeReorderingHorizonMillis", 5 * 60 * 1000) + + if ((state.missingMarkers.nonEmpty || state.missingDataFiles.nonEmpty) && + state.lastModified > clock.getTimeMillis - horizonMillis) { + logDebug("Repeating list request since some files are suspected to be missing.") + val newlyCommitted = mutable.Set[TxnId]() + val extraStatuses = fs.listStatus(dir).filter { f => + f.isFile && (f.getPath.getName match { + case COMMITTED_MARKER(txnId) if + f.getLen > 0 && state.missingMarkers.contains(txnId) => + // We choose to drop all files from transactions that committed during the re-list. + // Otherwise, we'd have to do another round of re-listing to resolve ordering issues. + newlyCommitted += txnId + false + case STARTED_MARKER(txnId) if state.missingMarkers.contains(txnId) => true + case name @ FILE_WITH_TXN_ID(_) if state.missingDataFiles.contains(name) => true + case _ => false + }) + } + + // log a debug message if data files are still missing + state.missingDataFiles.diff(extraStatuses.map(_.getPath.getName).toSet) match { + case missing if missing.nonEmpty => + logWarning( + "These files are still missing after a re-list (maybe manually deleted): " + missing) + case _ => + } + + if (extraStatuses.nonEmpty || newlyCommitted.nonEmpty) { + if (extraStatuses.nonEmpty) { + logWarning( + "Found these missing files on the second read: " + extraStatuses.map(_.getPath).toSeq) + } + if (newlyCommitted.nonEmpty) { + logWarning( + "Found these newly committed jobs on the second read: " + newlyCommitted) + } + val newFiles = (initialFiles ++ extraStatuses).filter { f => + f.getPath.getName match { + case name @ FILE_WITH_TXN_ID(txnId) if newlyCommitted.contains(txnId) => false + case _ => true + } + } + (resolveCommitState0(fs, dir, newFiles), newFiles) + } else { + (state, initialFiles) + } + } else { + (state, initialFiles) + } + } + + private def resolveCommitState0( + fs: FileSystem, + dir: Path, + filesAndMarkers: Seq[FileStatus]): CommitState = { + + var lastModified: Long = 0L + val trackedFiles = mutable.Map[String, TxnId]() + val startMarkers = mutable.Map[TxnId, Long]() + val commitMarkers = mutable.Map[TxnId, Set[String]]() + val corruptCommitMarkers = mutable.Set[TxnId]() + val deletedFiles = mutable.Map[String, Long]() + + filesAndMarkers.foreach { stat => + if (stat.getModificationTime > lastModified) { + lastModified = stat.getModificationTime + } + stat.getPath.getName match { + // We ignore zero-length commit markers (this is a commonly observed symptom of DBFS + // cancellation bugs in practice). + case COMMITTED_MARKER(txnId) if stat.getLen > 0 => + val commitFile = new Path(dir, "_committed_" + txnId) + try { + val (filesAdded, filesRemoved) = deserializeFileChanges(fs.open(commitFile)) + filesRemoved.foreach { file => + assert(stat.getModificationTime > 0) + deletedFiles(file) = stat.getModificationTime + } + commitMarkers(txnId) = filesAdded.toSet + } catch { + case NonFatal(e) => + // we use SparkEnv for this escape-hatch flag since this may be called on executors + if (SparkEnv.get.conf.getBoolean( + "com.databricks.sql.ignoreCorruptCommitMarkers", false)) { + logWarning("Failed to read job commit marker: " + stat, e) + corruptCommitMarkers += txnId + } else { + throw new IOException("Failed to read job commit marker: " + stat, e) + } + } + + case STARTED_MARKER(txnId) => + assert(stat.getModificationTime > 0) + startMarkers(txnId) = stat.getModificationTime + + case FILE_WITH_TXN_ID(txnId) => + trackedFiles(stat.getPath.getName) = txnId + + case _ => + } + } + + new CommitState( + lastModified, + trackedFiles.toMap, + startMarkers.toMap, + commitMarkers.toMap, + corruptCommitMarkers.toSet, + deletedFiles.toMap) + } + + def serializeFileChanges( + filesAdded: Seq[String], filesRemoved: Seq[String], out: OutputStream): Unit = { + val changes = Map("added" -> filesAdded, "removed" -> filesRemoved) + logDebug("Writing out file changes: " + changes) + Serialization.write(changes, out) + } + + def deserializeFileChanges(in: InputStream): (Seq[String], Seq[String]) = { + val reader = new InputStreamReader(in, StandardCharsets.UTF_8) + try { + val changes = Serialization.read[Map[String, Any]](reader) + (changes("added").asInstanceOf[Seq[String]], changes("removed").asInstanceOf[Seq[String]]) + } finally { + reader.close() + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 67d9a4fb33cce..086694f2b6a01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -86,6 +86,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ResetCommand } + /** + * Create a [[VacuumPath]] logical plan. + * Example SQL : + * {{{ + * VACUUM path [RETAIN number HOURS]; + * }}} + */ + override def visitVacuumPath( + ctx: VacuumPathContext): LogicalPlan = withOrigin(ctx) { + VacuumPath(string(ctx.path), Option(ctx.number).map(_.getText.toDouble).getOrElse(48.0)) + } + /** * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. * Example SQL for analyzing table : diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala new file mode 100644 index 0000000000000..58954858d05c8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/VacuumCommand.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.transaction.DatabricksAtomicCommitProtocol +import org.apache.spark.sql.types._ + +case class VacuumPath(path: String, horizonHours: Double) extends RunnableCommand { + override val output: Seq[Attribute] = + Seq(AttributeReference("path", StringType, nullable = true)()) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val horizon = System.currentTimeMillis - (horizonHours * 60 * 60 * 1000).toLong + DatabricksAtomicCommitProtocol.vacuum(new Path(path), horizon).map(p => Row(p.toString)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 825a0f70dda64..2fe9fe9b79f22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -30,6 +30,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.transaction._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -391,7 +392,10 @@ object PartitioningAwareFileIndex extends Logging { // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { + val statuses = try { + // This is a hack to inject a filesystem for testing purposes. + DatabricksAtomicReadProtocol.testingFs.getOrElse(fs).listStatus(path) + } catch { case _: FileNotFoundException => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] @@ -405,7 +409,8 @@ object PartitioningAwareFileIndex extends Logging { case _ => dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) } - val allFiles = topLevelFiles ++ nestedFiles + val allFiles = DatabricksAtomicReadProtocol.filterDirectoryListing( + fs, path, topLevelFiles) ++ nestedFiles if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 34c74868146e6..ceb472e32000e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -30,8 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol +import org.apache.spark.sql.transaction.DatabricksAtomicCommitProtocol import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -380,7 +380,7 @@ object SQLConf { SQLConfigBuilder("spark.sql.sources.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName) + .createWithDefault(classOf[DatabricksAtomicCommitProtocol].getName) val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") diff --git a/sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala b/sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala new file mode 100644 index 0000000000000..392ebfe0d324b --- /dev/null +++ b/sql/core/src/test/scala/com/databricks/sql/transaction/DatabricksAtomicCommitProtocolSuite.scala @@ -0,0 +1,339 @@ +/* + * Copyright © 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package org.apache.spark.sql.transaction + +import java.io._ + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.InMemoryFileIndex +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.{ManualClock, SystemClock} + +class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContext { + test("read protocol ignores uncommitted jobs") { + withTempDir { dir => + create(dir, "_started_12345") + create(dir, "part-r-00001-tid-77777-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + create(dir, "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 1) + } + } + + test("read protocol can be flag disabled") { + withTempDir { dir => + create(dir, "_started_12345") + create(dir, "part-r-00001-tid-77777-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + create(dir, "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv") + try { + SparkEnv.get.conf.set("com.databricks.sql.enableFilterUncommitted", "false") + assert(spark.read.csv(dir.getAbsolutePath).count == 2) + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + } finally { + SparkEnv.get.conf.remove("com.databricks.sql.enableFilterUncommitted") + } + } + } + + test("read protocol ignores uncommitted files of committed job") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, f3) + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 3) + create(dir, "_committed_12345", s"""{"added": ["$f1", "$f2"], "removed": []}""") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + create(dir, "_started_12345") // shouldn't matter if _started marker exists + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + } + } + + test("read protocol ignores committed deletes") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f4 = "foo" + create(dir, f1) + create(dir, f2) + create(dir, f3) + create(dir, f4) + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 4) + create(dir, "_committed_12345", s"""{"added": ["$f1", "$f2", "$f3"], "removed": []}""") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 4) + create(dir, "_committed_99999", s"""{"added": ["foo"], "removed": ["$f3", "$f4"]}""") + assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2) + } + } + + test("vacuum removes files from failed tasks immediately") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, "_committed_12345", s"""{"added": ["$f2"], "removed": []}""") + assert(list(dir) == Set(f1, f2, "_committed_12345")) + // removes f1 + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 1) + assert(list(dir) == Set(f2, "_committed_12345")) + // removes nothing + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 0) + assert(list(dir) == Set(f2, "_committed_12345")) + } + } + + test("vacuum removes uncommitted files after timeout") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-9999-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, f3) + create(dir, "_started_9999") + create(dir, "_started_55555") + create(dir, "_committed_55555", s"""{"added": [], "removed": []}""") + create(dir, "_committed_12345", s"""{"added": ["$f2"], "removed": []}""") + assert(list(dir).size == 7) + // removes f1, since this is immediately useless + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 1) + assert(list(dir) == + Set(f2, f3, "_started_9999", "_started_55555", "_committed_55555", "_committed_12345")) + // removes f3, unnecessary start marker after horizon + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 2) + assert(list(dir) == + Set(f2, "_started_9999", "_committed_55555", "_committed_12345")) + } + } + + test("vacuum removes deleted files after timeout") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + val f3 = "part-r-00003-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, f2) + create(dir, f3) + create(dir, "_committed_99999", s"""{"added": ["foo"], "removed": ["$f3"]}""") + assert(list(dir).size == 4) + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 0) + assert(list(dir).size == 4) + // removes f3 + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 1) + assert(list(dir) == Set(f1, f2, "_committed_99999")) + } + } + + test("zero-length commit markers are ignored") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, "_committed_12345", "") + assert(spark.read.csv(dir.getAbsolutePath).count == 1) + } + } + + test("zero-length commit markers mean txn is pending") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, "_started_12345") + create(dir, "_committed_12345", "") + // nothing removed with normal horizon since the txn counts as pending + assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 0) + // removes f1 + assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 0.0 HOURS").count == 1) + } + } + + test("corrupt commit markers raises error unless configured not to") { + withTempDir { dir => + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + create(dir, "_committed_12345", "corrupt_file") + val error = intercept[IOException] { + spark.read.csv(dir.getAbsolutePath) + } + assert(error.getMessage.contains("Failed to read job commit marker")) + try { + SparkEnv.get.conf.set("com.databricks.sql.ignoreCorruptCommitMarkers", "true") + assert(spark.read.csv(dir.getAbsolutePath).count == 1) + } finally { + SparkEnv.get.conf.remove("com.databricks.sql.ignoreCorruptCommitMarkers") + } + } + } + + test("reader re-lists directory when files may be missing from the initial list") { + withTempDir { dir => + var listCount = 0 + val fs = new RawLocalFileSystem() { + override def listStatus(path: Path): Array[FileStatus] = { + listCount += 1 + super.listStatus(path) + } + } + fs.initialize(dir.toURI, new Configuration()) + val testPath = new Path(dir.getAbsolutePath) + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + + // should trigger a re-list since f1 had no associated marker + val in1 = fs.listStatus(testPath) + listCount = 0 + val (out1, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 1) + assert(out1.isCommitted("12345")) // couldn't find any marker, assumed committed + + // should trigger a re-list that picks up _started_12345 + create(dir, "_started_12345") + listCount = 0 + val in2 = in1 + val (out2, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in2) + assert(listCount == 1) + assert(!out2.isCommitted("12345")) // marker found on the second list + + // should NOT trigger a re-list since f1 had an associated marker + val in3 = fs.listStatus(testPath) + listCount = 0 + val (out3, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in3) + assert(listCount == 0) + assert(!out3.isCommitted("12345")) + + // also should not trigger a re-list + fs.delete(new Path(testPath, "_started_12345"), false) + create(dir, "_committed_12345", s"""{"added": [], "removed": []}""") + val in4 = fs.listStatus(testPath) + listCount = 0 + val (out4, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in4) + assert(listCount == 0) + assert(out4.isCommitted("12345")) + + // should trigger a re-list that picks up f2 + val f2 = "part-r-00002-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, "_committed_12345", s"""{"added": ["$f2"], "removed": []}""") + val in5 = fs.listStatus(testPath) + create(dir, f2) + listCount = 0 + val (out5, _) = DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in5) + assert(listCount == 1) + assert(out5.isCommitted("12345")) + assert(out5.isFileCommitted("12345", f2)) + } + } + + test("re-list is avoided after a grace period") { + withTempDir { dir => + var listCount = 0 + val fs = new RawLocalFileSystem() { + override def listStatus(path: Path): Array[FileStatus] = { + listCount += 1 + super.listStatus(path) + } + } + fs.initialize(dir.toURI, new Configuration()) + val testPath = new Path(dir.getAbsolutePath) + val f1 = "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv" + create(dir, f1) + + try { + val clock = new ManualClock(System.currentTimeMillis) + DatabricksAtomicReadProtocol.clock = clock + + // should trigger a re-list since f1 had no associated marker + val in1 = fs.listStatus(testPath) + listCount = 0 + DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 1) + + clock.advance(3 * 60 * 1000) + DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 2) + + clock.advance(3 * 60 * 1000) + DatabricksAtomicReadProtocol.resolveCommitState(fs, testPath, in1) + assert(listCount == 2) + } finally { + DatabricksAtomicReadProtocol.clock = new SystemClock + } + } + } + + test("randomized consistency stress test") { + val seed = System.currentTimeMillis + val random = new scala.util.Random(seed) + // scalastyle:off println + println("Random seed used was: " + seed) + // scalastyle:on println + + /** + * Emulates S3 list consistency guarantees. We assume read-after-write for single keys, + * however a list call is not atomic and so may observe writes out of order. + */ + var numLists = 0 + val inconsistentFs = new RawLocalFileSystem() { + val consistentFiles = mutable.Set[Path]() + override def listStatus(path: Path): Array[FileStatus] = { + numLists += 1 + super.listStatus(path).filter { stat => + stat.getPath match { + case path if consistentFiles.contains(path) => true + case path => + consistentFiles.add(path) + random.nextDouble > 0.5 // emulate write re-ordering + } + } + } + } + def countFiles(dir: File): Long = { + val idx = new InMemoryFileIndex(spark, Seq(new Path(dir.getAbsolutePath)), Map.empty, None) + idx.allFiles().length + } + inconsistentFs.initialize(new File("/").toURI, new Configuration()) + + // tests retry on missing data file or start marker + for (i <- 1 to 10) { + withTempDir { dir => + spark.range(10).repartition(3).write.mode("overwrite").parquet(dir.getAbsolutePath) + try { + DatabricksAtomicReadProtocol.testingFs = Some(inconsistentFs) + assert(Set(0L, 3L).contains(countFiles(dir))) // should never see {1, 2} + assert(countFiles(dir) == 3) + } finally { + DatabricksAtomicReadProtocol.testingFs = None + } + } + } + + // check we actually used the inconsistent test fs + assert(numLists >= 10) + } + + private def create(dir: File, name: String, contents: String = "foo"): Unit = { + val printWriter = new PrintWriter(new File(dir, name)) + try { + printWriter.print(contents) + } finally { + printWriter.close() + } + } + + private def list(dir: File): Set[String] = { + dir.listFiles().map(_.getName).filterNot(_.startsWith(".")).toSet + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala index 89d57653adcbd..631176d60142a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala @@ -31,7 +31,7 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext { // ignore hidden files val allFiles = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !name.startsWith(".") + !name.startsWith(".") && !name.startsWith("_") } }) val totalSize = allFiles.map(_.length()).sum diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 716269401fe1a..aa40b4623ec47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -466,16 +467,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") { - val extraOptions = Map( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, - "spark.sql.parquet.output.committer.class" -> - classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName - ) - withTempPath { dir => - val message = intercept[SparkException] { - spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(message === "Intentional exception for testing purposes") + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + val extraOptions = Map( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[ParquetOutputCommitter].getCanonicalName, + "spark.sql.parquet.output.committer.class" -> + classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName + ) + withTempPath { dir => + val message = intercept[SparkException] { + spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(message === "Intentional exception for testing purposes") + } } } @@ -492,58 +496,64 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - // Using a output committer that always fail when committing a task, so that both - // `commitTask()` and `abortTask()` are invoked. - val extraOptions = Map[String, String]( - "spark.sql.parquet.output.committer.class" -> - classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName - ) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // Using a output committer that always fail when committing a task, so that both + // `commitTask()` and `abortTask()` are invoked. + val extraOptions = Map[String, String]( + "spark.sql.parquet.output.committer.class" -> + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName + ) + + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. - // Before fixing SPARK-7837, the following code results in an NPE because both - // `commitTask()` and `abortTask()` try to close output writers. - - withTempPath { dir => - val m1 = intercept[SparkException] { - spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m1.contains("Intentional exception for testing purposes")) - } + withTempPath { dir => + val m1 = intercept[SparkException] { + spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m1.contains("Intentional exception for testing purposes")) + } - withTempPath { dir => - val m2 = intercept[SparkException] { - val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) - df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(m2.contains("Intentional exception for testing purposes")) + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) + df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m2.contains("Intentional exception for testing purposes")) + } } } test("SPARK-11044 Parquet writer version fixed as version1 ") { - // For dictionary encoding, Parquet changes the encoding types according to its writer - // version. So, this test checks one of the encoding types in order to ensure that - // the file is written with writer version2. - val extraOptions = Map[String, String]( - // Write a Parquet file with writer version2. - ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, - // By default, dictionary encoding is enabled from Parquet 1.2.0 but - // it is enabled just in case. - ParquetOutputFormat.ENABLE_DICTIONARY -> "true" - ) - - val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) - - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/part-r-0.parquet" - spark.range(1 << 16).selectExpr("(id % 4) AS i") - .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) - - val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head - val columnChunkMetadata = blockMetadata.getColumns.asScala.head - - // If the file is written with version2, this should include - // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY - assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // For dictionary encoding, Parquet changes the encoding types according to its writer + // version. So, this test checks one of the encoding types in order to ensure that + // the file is written with writer version2. + val extraOptions = Map[String, String]( + // Write a Parquet file with writer version2. + ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString, + // By default, dictionary encoding is enabled from Parquet 1.2.0 but + // it is enabled just in case. + ParquetOutputFormat.ENABLE_DICTIONARY -> "true" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part-r-0.parquet" + spark.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + + // If the file is written with version2, this should include + // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f433a74da8cb9..420cff878fa0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -455,7 +455,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha assert(partDf.schema.map(_.name) === Seq("intField", "stringField")) path.listFiles().foreach { f => - if (f.getName.toLowerCase().endsWith(".parquet")) { + if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file val df = spark.read.parquet(f.getCanonicalPath) assert(df.schema.map(_.name) === Seq("intField", "stringField")) @@ -463,7 +463,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } path.listFiles().foreach { f => - if (f.getName.toLowerCase().endsWith(".parquet")) { + if (!f.getName.startsWith("_") && f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file but `basePath` is overridden to // the base path containing partitioning directories val df = spark @@ -932,7 +932,10 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha withTempPath { dir => val path = dir.getCanonicalPath - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withSQLConf( + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + "spark.sql.sources.commitProtocolClass" -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { spark.range(3).write.parquet(s"$path/p0=0/p1=0") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c4a7d86f2bd3..ee7f2d060e275 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -178,6 +179,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } withSQLConf( + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName, SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true", ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true" @@ -186,6 +189,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } withSQLConf( + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName, SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false" ) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 463c368fc42b1..ff35956674f85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -95,7 +95,9 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { // Check if this is compressed as ZLIB. val conf = spark.sessionState.newHadoopConf() val fs = FileSystem.getLocal(conf) - val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) + val maybeOrcFile = new File(path).listFiles().find { f => + !f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc") + } assert(maybeOrcFile.isDefined) val orcFilePath = maybeOrcFile.get.toPath.toString val expectedCompressionKind = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 22f13a494cd4c..770c16a82fe3d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources.{FileScanRDD, HadoopFsRelation, LocalityTestFileSystem, LogicalRelation} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -784,44 +784,47 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("SPARK-8578 specified custom output committer will not be used to append data") { - val extraOptions = Map[String, String]( - SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, - // Since Parquet has its own output committer setting, also set it - // to AlwaysFailParquetOutputCommitter at here. - "spark.sql.parquet.output.committer.class" -> - classOf[AlwaysFailParquetOutputCommitter].getName - ) + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + val extraOptions = Map[String, String]( + SQLConf.OUTPUT_COMMITTER_CLASS.key -> classOf[AlwaysFailOutputCommitter].getName, + // Since Parquet has its own output committer setting, also set it + // to AlwaysFailParquetOutputCommitter at here. + "spark.sql.parquet.output.committer.class" -> + classOf[AlwaysFailParquetOutputCommitter].getName + ) - val df = spark.range(1, 10).toDF("i") - withTempPath { dir => - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - // Because there data already exists, - // this append should succeed because we will use the output committer associated - // with file format and AlwaysFailOutputCommitter will not be used. - df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) - checkAnswer( - spark.read - .format(dataSourceName) - .option("dataSchema", df.schema.json) - .options(extraOptions) - .load(dir.getCanonicalPath), - df.union(df)) - - // This will fail because AlwaysFailOutputCommitter is used when we do append. - intercept[Exception] { - df.write.mode("overwrite") - .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + val df = spark.range(1, 10).toDF("i") + withTempPath { dir => + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + // Because there data already exists, + // this append should succeed because we will use the output committer associated + // with file format and AlwaysFailOutputCommitter will not be used. + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + spark.read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .options(extraOptions) + .load(dir.getCanonicalPath), + df.union(df)) + + // This will fail because AlwaysFailOutputCommitter is used when we do append. + intercept[Exception] { + df.write.mode("overwrite") + .options(extraOptions).format(dataSourceName).save(dir.getCanonicalPath) + } } - } - withTempPath { dir => - // Because there is no existing data, - // this append will fail because AlwaysFailOutputCommitter is used when we do append - // and there is no existing data. - intercept[Exception] { - df.write.mode("append") - .options(extraOptions) - .format(dataSourceName) - .save(dir.getCanonicalPath) + withTempPath { dir => + // Because there is no existing data, + // this append will fail because AlwaysFailOutputCommitter is used when we do append + // and there is no existing data. + intercept[Exception] { + df.write.mode("append") + .options(extraOptions) + .format(dataSourceName) + .save(dir.getCanonicalPath) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 8aa018d0a9ee5..03207ab869d12 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -25,6 +25,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -125,7 +126,10 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-8604: Parquet data source should write summary file while doing appending") { - withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withSQLConf( + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { withTempPath { dir => val path = dir.getCanonicalPath val df = spark.range(0, 5).toDF()