Skip to content

Commit

Permalink
[SC-5834][DIRECTORYCOMMIT] Move directory commit protocol configs to …
Browse files Browse the repository at this point in the history
…DatabricksSQLConf

## What changes were proposed in this pull request?

There were a number of hard-coded config params used for the _Directory Atomic Commit_ protocol implementation. We're hereby moving them to the newly created `DatabricksSLQConf` file, for better vizibility and long-term maintainability.

## How was this patch tested?

`testOnly *DatabricksAtomicCommitProtocolSuite`

Author: Adrian Ionescu <adrian@databricks.com>

Closes apache#240 from adrian-ionescu/SC-5834.
  • Loading branch information
adrian-ionescu authored and rxin committed Feb 21, 2017
1 parent f687d15 commit 45652f6
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 65 deletions.
129 changes: 94 additions & 35 deletions sql/core/src/main/scala/com/databricks/sql/DatabricksSQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,86 @@ import org.apache.spark.sql.internal.SQLConf.buildStaticConf
*/
object DatabricksSQLConf {

val FILES_ASYNC_IO = buildConf("spark.sql.files.asyncIO")
.internal()
.doc("If true, attempts to asynchronously do IO when reading data.")
.booleanConf
.createWithDefault(true)

val DYNAMIC_PARTITION_PRUNING = buildConf("spark.sql.dynamicPartitionPruning")
.internal()
.doc("When true, we will generate predicate for partition column when it's used as join key")
.booleanConf
.createWithDefault(true)
val FILES_ASYNC_IO =
buildConf("spark.sql.files.asyncIO")
.internal()
.doc("If true, attempts to asynchronously do IO when reading data.")
.booleanConf
.createWithDefault(true)

val DYNAMIC_PARTITION_PRUNING =
buildConf("spark.sql.dynamicPartitionPruning")
.internal()
.doc("When true, we will generate predicate for partition column when it's used as join key")
.booleanConf
.createWithDefault(true)

/**
* Use an optimizer rule doing advanced query pushdown into Redshift.
*
* The rule is injected into extraOptimizations of the [[org.apache.spark.sql.SparkSession]]
* the first time a RedshiftRelation is constructed.
*/
val REDSHIFT_ADVANCED_PUSHDOWN = buildConf("spark.databricks.redshift.pushdown")
.internal()
.doc("When true, advanced query pushdown into Redshift is used.")
.booleanConf
.createWithDefault(true)
val REDSHIFT_ADVANCED_PUSHDOWN =
buildConf("spark.databricks.redshift.pushdown")
.internal()
.doc("When true, advanced query pushdown into Redshift is used.")
.booleanConf
.createWithDefault(true)

val DIRECTORY_COMMIT_FILTER_UNCOMMITTED =
buildConf("spark.databricks.directoryCommit.enableFilterUncommitted")
.internal()
.doc("If true, enable the read protocol, ensuring that files pertaining to uncommitted " +
"transactions are filtered out.")
.booleanConf
.createWithDefault(true)

val DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS =
buildConf("spark.databricks.directoryCommit.ignoreCorruptCommitMarkers")
.internal()
.doc("If true, unreadable commit markers will be ignored rather than raising an error.")
.booleanConf
.createWithDefault(false)

val DIRECTORY_COMMIT_ENABLE_LOGICAL_DELETE =
buildConf("spark.databricks.directoryCommit.enableLogicalDelete")
.internal()
.doc("Flag specifying whether or not atomic overwrites should be enabled.")
.booleanConf
.createWithDefault(true)

val DIRECTORY_COMMIT_AUTO_VACUUM_ON_COMMIT =
buildConf("spark.databricks.directoryCommit.autoVacuumOnCommit")
.internal()
.doc("If true, every Commit will trigger a Vacuum operation on all the affected paths.")
.booleanConf
.createWithDefault(true)

val DIRECTORY_COMMIT_VACUUM_DATA_HORIZON_HRS =
buildConf("spark.databricks.directoryCommit.vacuum.dataHorizonHours")
.internal()
.doc("Pending jobs which are older than the specified number of hours will be considered " +
"failed and any files written by them will be vacuumed as well.")
.doubleConf
.createWithDefault(48.0) // 2 days

val DIRECTORY_COMMIT_VACUUM_METADATA_HORIZON_HRS =
buildConf("spark.databricks.directoryCommit.vacuum.metadataHorizonHours")
.internal()
.doc("Vacuum will remove commit markers that are older than this number of hours. " +
"This should be greater than the max amount of time we think a zombie executor can " +
"hang around and write output after the job has finished.")
.doubleConf
.createWithDefault(0.5) // 30 minutes

val DIRECTORY_COMMIT_WRITE_REORDERING_HORIZON_MS =
buildConf("spark.databricks.directoryCommit.writeReorderingHorizon")
.internal()
.doc("A List operation is considered unaffected by write reordering issues if all files " +
"are older than the specified number of milliseconds. Otherwise an extra List is issued.")
.longConf
.createWithDefault(5 * 60 * 1000) // 5 minutes
}


Expand All @@ -49,24 +105,27 @@ object DatabricksSQLConf {
*/
object DatabricksStaticSQLConf {

val ACL_PROVIDER = buildStaticConf("spark.databricks.acl.provider")
.internal()
.doc("Name of the AclProvider. This class is responsible for creating an AclClient. This " +
"class should implement the com.databricks.sql.acl.AclProvider trait and provide a " +
"no-args constructor.")
.stringConf
.createOptional

val ACL_CLIENT_BACKEND = buildStaticConf("spark.databricks.acl.client")
.internal()
.doc("Name of the ACL client backend used by the ReflectionBackedAclClient.")
.stringConf
.createOptional

val ACL_ENABLED = buildStaticConf("spark.databricks.acl.enabled")
.internal()
.doc("Whether the SQL-based Access Control is enabled.")
.booleanConf
.createWithDefault(false)
val ACL_PROVIDER =
buildStaticConf("spark.databricks.acl.provider")
.internal()
.doc("Name of the AclProvider. This class is responsible for creating an AclClient. This " +
"class should implement the com.databricks.sql.acl.AclProvider trait and provide a " +
"no-args constructor.")
.stringConf
.createOptional

val ACL_CLIENT_BACKEND =
buildStaticConf("spark.databricks.acl.client")
.internal()
.doc("Name of the ACL client backend used by the ReflectionBackedAclClient.")
.stringConf
.createOptional

val ACL_ENABLED =
buildStaticConf("spark.databricks.acl.enabled")
.internal()
.doc("Whether the SQL-based Access Control is enabled.")
.booleanConf
.createWithDefault(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import java.nio.charset.StandardCharsets
import scala.collection.mutable
import scala.util.control.NonFatal

import com.databricks.sql.DatabricksSQLConf._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem => HadoopFileSystem, _}
import org.apache.hadoop.mapreduce._
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -82,8 +82,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)
override def deleteWithJob(_fs: HadoopFileSystem, path: Path, recursive: Boolean): Boolean = {
val fs = testingFs.getOrElse(_fs)
val sparkSession = SparkSession.getActiveSession.get
if (!sparkSession.sqlContext.getConf(
"spark.databricks.sql.enableLogicalDelete", "true").toBoolean) {
if (!sparkSession.sessionState.conf.getConf(DIRECTORY_COMMIT_ENABLE_LOGICAL_DELETE)) {
return super.deleteWithJob(fs, path, recursive)
}
if (recursive && fs.getFileStatus(path).isFile) {
Expand Down Expand Up @@ -169,8 +168,7 @@ class DatabricksAtomicCommitProtocol(jobId: String, path: String)

// Optional auto-vacuum.
val sparkSession = SparkSession.getActiveSession.get
if (sparkSession.sqlContext.getConf(
"spark.databricks.sql.autoVacuumOnCommit", "true").toBoolean) {
if (sparkSession.sessionState.conf.getConf(DIRECTORY_COMMIT_AUTO_VACUUM_ON_COMMIT)) {
logInfo("Auto-vacuuming directories updated by " + jobId)
try {
vacuum(sparkSession, dirs.seq.toSeq, None)
Expand Down Expand Up @@ -216,16 +214,16 @@ object DatabricksAtomicCommitProtocol extends Logging {
def vacuum(
sparkSession: SparkSession, paths: Seq[Path], horizonHours: Option[Double]): List[Path] = {
val now = clock.getTimeMillis
val defaultDataHorizonHours = sparkSession.sqlContext.getConf(
"spark.databricks.sql.vacuum.dataHorizonHours", "48.0").toDouble
val defaultDataHorizonHours = sparkSession.sessionState.conf.getConf(
DIRECTORY_COMMIT_VACUUM_DATA_HORIZON_HRS)
val dataHorizonHours = horizonHours.getOrElse(defaultDataHorizonHours)
val dataHorizon = now - (dataHorizonHours * 60 * 60 * 1000).toLong

// Vacuum will start removing commit markers after this time has passed, so this should be
// greater than the max amount of time we think a zombie executor can hang around and write
// output after the job has finished. TODO(ekl) move to spark edge conf
val metadataHorizonHours = sparkSession.sqlContext.getConf(
"spark.databricks.sql.vacuum.metadataHorizonHours", "0.5").toDouble
// output after the job has finished.
val metadataHorizonHours = sparkSession.sessionState.conf.getConf(
DIRECTORY_COMMIT_VACUUM_METADATA_HORIZON_HRS)
val metadataHorizon = math.max(
dataHorizon,
now - metadataHorizonHours * 60 * 60 * 1000).toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import scala.collection.mutable
import scala.util.Try
import scala.util.control.NonFatal

import com.databricks.sql.DatabricksSQLConf._
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

Expand Down Expand Up @@ -57,7 +57,7 @@ object DatabricksAtomicReadProtocol extends Logging {
def filterDirectoryListing(
fs: FileSystem, dir: Path, initialFiles: Seq[FileStatus]): Seq[FileStatus] = {
// we use SparkEnv for this escape-hatch flag since this may be called on executors
if (!SparkEnv.get.conf.getBoolean("spark.databricks.sql.enableFilterUncommitted", true)) {
if (!SparkEnv.get.conf.get(DIRECTORY_COMMIT_FILTER_UNCOMMITTED)) {
return initialFiles
}

Expand Down Expand Up @@ -180,8 +180,7 @@ object DatabricksAtomicReadProtocol extends Logging {
val state = resolveCommitState0(fs, dir, initialFiles)

// Optimization: can assume the list request was atomic if the files have not changed recently.
val horizonMillis = SparkEnv.get.conf.getLong(
"spark.databricks.sql.writeReorderingHorizonMillis", 5 * 60 * 1000)
val horizonMillis = SparkEnv.get.conf.get(DIRECTORY_COMMIT_WRITE_REORDERING_HORIZON_MS)

if ((state.missingMarkers.nonEmpty || state.missingDataFiles.nonEmpty) &&
state.lastModified > clock.getTimeMillis - horizonMillis) {
Expand Down Expand Up @@ -288,8 +287,7 @@ object DatabricksAtomicReadProtocol extends Logging {

case NonFatal(e) =>
// we use SparkEnv for this escape-hatch flag since this may be called on executors
if (SparkEnv.get.conf.getBoolean(
"spark.databricks.sql.ignoreCorruptCommitMarkers", false)) {
if (SparkEnv.get.conf.get(DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS)) {
logWarning("Failed to read job commit marker: " + stat, e)
corruptCommitMarkers.add(txnId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import java.io._

import scala.collection.mutable

import com.databricks.sql.DatabricksSQLConf._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

Expand All @@ -37,11 +38,11 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
create(dir, "part-r-00001-tid-77777-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv")
create(dir, "part-r-00001-tid-12345-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-0_00003.csv")
try {
SparkEnv.get.conf.set("spark.databricks.sql.enableFilterUncommitted", "false")
SparkEnv.get.conf.set(DIRECTORY_COMMIT_FILTER_UNCOMMITTED.key, "false")
assert(spark.read.csv(dir.getAbsolutePath).count == 2)
assert(spark.read.csv(dir.getAbsolutePath).inputFiles.length == 2)
} finally {
SparkEnv.get.conf.remove("spark.databricks.sql.enableFilterUncommitted")
SparkEnv.get.conf.remove(DIRECTORY_COMMIT_FILTER_UNCOMMITTED.key)
}
}
}
Expand Down Expand Up @@ -119,13 +120,13 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex

test("logical delete can be flag disabled") {
withTempDir { dir =>
withSQLConf("spark.databricks.sql.enableLogicalDelete" -> "true") {
withSQLConf(DIRECTORY_COMMIT_ENABLE_LOGICAL_DELETE.key -> "true") {
spark.range(10).repartition(1).write.mode("overwrite").parquet(dir.getAbsolutePath)
assert(dir.listFiles().count(_.getName.startsWith("part")) == 1)
spark.range(10).repartition(1).write.mode("overwrite").parquet(dir.getAbsolutePath)
assert(dir.listFiles().count(_.getName.startsWith("part")) == 2)
}
withSQLConf("spark.databricks.sql.enableLogicalDelete" -> "false") {
withSQLConf(DIRECTORY_COMMIT_ENABLE_LOGICAL_DELETE.key -> "false") {
spark.range(10).repartition(1).write.mode("overwrite").parquet(dir.getAbsolutePath)
assert(dir.listFiles().count(_.getName.startsWith("part")) == 1)
}
Expand Down Expand Up @@ -223,10 +224,10 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
}
assert(error.getMessage.contains("Failed to read job commit marker"))
try {
SparkEnv.get.conf.set("spark.databricks.sql.ignoreCorruptCommitMarkers", "true")
SparkEnv.get.conf.set(DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS.key, "true")
assert(spark.read.csv(dir.getAbsolutePath).count == 1)
} finally {
SparkEnv.get.conf.remove("spark.databricks.sql.ignoreCorruptCommitMarkers")
SparkEnv.get.conf.remove(DIRECTORY_COMMIT_IGNORE_CORRUPT_MARKERS.key)
}
}
}
Expand Down Expand Up @@ -368,7 +369,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
}

test("non-overwrite metadata files deleted before data files are") {
withSQLConf("spark.databricks.sql.vacuum.metadataHorizonHours" -> (1d / 60).toString) {
withSQLConf(DIRECTORY_COMMIT_VACUUM_METADATA_HORIZON_HRS.key -> (1d / 60).toString) {
withTempDir { dir =>
withFakeClockAndFs { clock =>
spark.range(10).repartition(1).write.mode("append").parquet(dir.getAbsolutePath)
Expand Down Expand Up @@ -407,7 +408,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
}

test("auto-vacuum cleans up all markers early in append loop") {
withSQLConf("spark.databricks.sql.vacuum.metadataHorizonHours" -> (1d / 60).toString) {
withSQLConf(DIRECTORY_COMMIT_VACUUM_METADATA_HORIZON_HRS.key -> (1d / 60).toString) {
withTempDir { dir =>
withFakeClockAndFs { clock =>
for (i <- 1 to 20) {
Expand All @@ -423,7 +424,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
}

test("auto-vacuum does not clean up committed markers early in overwrite loop") {
withSQLConf("spark.databricks.sql.vacuum.metadataHorizonHours" -> (1d / 60).toString) {
withSQLConf(DIRECTORY_COMMIT_VACUUM_METADATA_HORIZON_HRS.key -> (1d / 60).toString) {
withTempDir { dir =>
withFakeClockAndFs { clock =>
for (i <- 1 to 20) {
Expand All @@ -441,8 +442,8 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex

test("auto-vacuum does clean up everything past horizon in overwrite loop") {
withSQLConf(
"spark.databricks.sql.vacuum.dataHorizonHours" -> (1d / 60).toString,
"spark.databricks.sql.vacuum.metadataHorizonHours" -> (1d / 60).toString) {
DIRECTORY_COMMIT_VACUUM_DATA_HORIZON_HRS.key -> (1d / 60).toString,
DIRECTORY_COMMIT_VACUUM_METADATA_HORIZON_HRS.key -> (1d / 60).toString) {
withTempDir { dir =>
withFakeClockAndFs { clock =>
for (i <- 1 to 20) {
Expand All @@ -463,7 +464,7 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
df.write.partitionBy("A", "B").mode("overwrite").parquet(dir.getAbsolutePath)
df.write.partitionBy("A", "B").mode("overwrite").parquet(dir.getAbsolutePath)
assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 0)
withSQLConf("spark.databricks.sql.vacuum.dataHorizonHours" -> "0.0") {
withSQLConf(DIRECTORY_COMMIT_VACUUM_DATA_HORIZON_HRS.key -> "0.0") {
assert(sql(s"VACUUM '${dir.getAbsolutePath}' RETAIN 1 HOURS").count == 0)
// removes the data files and commit markers from the first job, start markers from 2nd
assert(sql(s"VACUUM '${dir.getAbsolutePath}'").count == 30)
Expand All @@ -482,13 +483,13 @@ class DatabricksAtomicCommitProtocolSuite extends QueryTest with SharedSQLContex
assert(new File(dir, "A=0/B=0").listFiles().count(_.getName.startsWith("part")) == 4)

// autovacuum will remove the prior 4 files
withSQLConf("spark.databricks.sql.vacuum.dataHorizonHours" -> "0.0") {
withSQLConf(DIRECTORY_COMMIT_VACUUM_DATA_HORIZON_HRS.key -> "0.0") {
df.write.partitionBy("A", "B").mode("overwrite").parquet(dir.getAbsolutePath)
assert(new File(dir, "A=0/B=0").listFiles().count(_.getName.startsWith("part")) == 1)
}

// autovacuum disabled
withSQLConf("spark.databricks.sql.autoVacuumOnCommit" -> "false") {
withSQLConf(DIRECTORY_COMMIT_AUTO_VACUUM_ON_COMMIT.key -> "false") {
df.write.partitionBy("A", "B").mode("overwrite").parquet(dir.getAbsolutePath)
assert(new File(dir, "A=0/B=0").listFiles().count(_.getName.startsWith("part")) == 2)
}
Expand Down

0 comments on commit 45652f6

Please sign in to comment.