Skip to content

Commit

Permalink
[DELTA-OSS-EXTERNAL] Improved Delta concurrency with finer-grained co…
Browse files Browse the repository at this point in the history
…nflict detection in OptTxnImpl

This is a modified PR from the original PR delta-io#114 by `tomasbartalos` (kudos, it was a very good PR!). This PR tracks transaction changes at a finer granularity (no new columns required in RemoveFile action) thus allowing more concurrent operations to succeed.

closes delta-io#228 and delta-io#72

This PR improves the conflict detection logic in OptTxn using the following strategy.
- OptTxn tracks two additional things
  - All the partitions read by the query using the OptTxn
  - All the files read by the query
- When committing a txn, it checks this txn's actions against the actions of concurrently committed txns using the following strategy:
  1. If any of the concurrently added files are in the partitions read by this txn, then fail because this txn should have read them.
      -It’s okay for files to have been removed from the partitions read by this txn as long as this txn never read those files. This is checked by the next rule.
  2. If any of the files read by this txn have already been removed by concurrent txns, then fail.
  3. If any of the files removed by this txn have already been removed by concurrent txns, then fail.
- In addition, I have made another change where setting `dataChange` to `false` in all the actions (enabled by delta-io#223) will ensure the txn will not conflict with any other concurrent txn based on predicates.

Tests written by `tomasbartalos` in the original PR. Some tests were changed because some scenarios that were blocked in the original PR are now allowed, thanks to more granular and permissive conflict detection logic. Some test names tweaked to ensure clarity.

GitOrigin-RevId: f02a8f48838f86d256a86cd40241cdbfa74addb4

Lead-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
Co-authored-by: Tomas Bartalos <tomas.bartalos@nike.sk>
  • Loading branch information
2 people authored and zsxwing committed Dec 10, 2019
1 parent 48a74fa commit f328300
Show file tree
Hide file tree
Showing 4 changed files with 805 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ object DeltaOperations {
"newSchema" -> JsonUtils.toJson(newSchema))
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
"name" -> UnresolvedAttribute(colPath :+ field.name).name,
Expand Down
152 changes: 128 additions & 24 deletions src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.ConcurrentModificationException
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.control.NonFatal

import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS
Expand Down Expand Up @@ -145,6 +145,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
/** Tracks the appIds that have been seen by this transaction. */
protected val readTxn = new ArrayBuffer[String]

/**
* Tracks the data that could have been seen by recording the partition
* predicates by which files have been queried by by this transaction.
*/
protected val readPredicates = new ArrayBuffer[Expression]

/** Tracks specific files that have been seen by this transaction. */
protected val readFiles = new HashSet[AddFile]

/** Tracks if this transaction has already committed. */
protected var committed = false

Expand All @@ -155,12 +164,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
protected var commitStartNano = -1L
protected var commitInfo: CommitInfo = _

/**
* Tracks if this transaction depends on any data files. This flag must be set if this transaction
* reads any data explicitly or implicitly (e.g., delete, update and overwrite).
*/
protected var dependsOnFiles: Boolean = false

/** The version that this transaction is reading from. */
def readVersion: Long = snapshot.version

Expand Down Expand Up @@ -213,13 +216,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite {

/** Returns files matching the given predicates. */
def filterFiles(filters: Seq[Expression]): Seq[AddFile] = {
dependsOnFiles = true
snapshot.filesForScan(Nil, filters).files
val scan = snapshot.filesForScan(Nil, filters)
val partitionFilters = filters.filter { f =>
DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark)
}
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
readFiles ++= scan.files
scan.files
}

/** Mark the entire table as tainted by this transaction. */
def readWholeTable(): Unit = {
dependsOnFiles = true
readPredicates += Literal(true)
}

/**
Expand All @@ -246,7 +254,19 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
// Try to commit at the next version.
var finalActions = prepareCommit(actions, op)

// Find the isolation level to use for this commit
val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false)
val isolationLevelToUse = if (noDataChanged) {
// If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation
// provides Serializable guarantee. Hence, allow reduced conflict detection by using
// SnapshotIsolation of what the table isolation level is.
SnapshotIsolation
} else {
Serializable
}

val isBlindAppend = {
val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty
val onlyAddFiles =
finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])
onlyAddFiles && !dependsOnFiles
Expand All @@ -270,7 +290,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
registerPostCommitHook(GenerateSymlinkManifest)
}

val commitVersion = doCommit(snapshot.version + 1, finalActions, 0)
val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse)
logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
postCommit(commitVersion, finalActions)
commitVersion
Expand Down Expand Up @@ -358,13 +378,17 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
private def doCommit(
attemptVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long = deltaLog.lockInterruptibly {
attemptNumber: Int,
isolationLevel: IsolationLevel): Long = deltaLog.lockInterruptibly {
try {
logDebug(s"Attempting to commit version $attemptVersion with ${actions.size} actions")
logDebug(
s"Attempting to commit version $attemptVersion with ${actions.size} actions with " +
s"$isolationLevel isolation level")

deltaLog.store.write(
deltaFile(deltaLog.logPath, attemptVersion),
actions.map(_.json).toIterator)

val commitTime = System.nanoTime()
val postCommitSnapshot = deltaLog.update()
if (postCommitSnapshot.version < attemptVersion) {
Expand Down Expand Up @@ -406,7 +430,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
attemptVersion
} catch {
case e: java.nio.file.FileAlreadyExistsException =>
checkAndRetry(attemptVersion, actions, attemptNumber)
checkAndRetry(attemptVersion, actions, attemptNumber, isolationLevel)
}
}

Expand All @@ -418,22 +442,38 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
protected def checkAndRetry(
checkVersion: Long,
actions: Seq[Action],
attemptNumber: Int): Long = recordDeltaOperation(
attemptNumber: Int,
commitIsolationLevel: IsolationLevel): Long = recordDeltaOperation(
deltaLog,
"delta.commit.retry",
tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) {
deltaLog.update()
val nextAttempt = deltaLog.snapshot.version + 1

(checkVersion until nextAttempt).foreach { version =>
import _spark.implicits._

val nextAttemptVersion = getNextAttemptVersion(checkVersion)
(checkVersion until nextAttemptVersion).foreach { version =>
// Actions of a commit which went in before ours
val winningCommitActions =
deltaLog.store.read(deltaFile(deltaLog.logPath, version)).map(Action.fromJson)

// Categorize all the actions that have happened since the transaction read.
val metadataUpdates = winningCommitActions.collect { case a: Metadata => a }
val removedFiles = winningCommitActions.collect { case a: RemoveFile => a }
val txns = winningCommitActions.collect { case a: SetTransaction => a }
val protocol = winningCommitActions.collect { case a: Protocol => a }
val commitInfo = winningCommitActions.collectFirst { case a: CommitInfo => a }.map(
ci => ci.copy(version = Some(version)))
val fileActions = winningCommitActions.collect { case f: FileAction => f }

val blindAppendAddedFiles = mutable.ArrayBuffer[AddFile]()
val changedDataAddedFiles = mutable.ArrayBuffer[AddFile]()

val isBlindAppendOption = commitInfo.flatMap(_.isBlindAppend)
if (isBlindAppendOption.getOrElse(false)) {
blindAppendAddedFiles ++= winningCommitActions.collect { case a: AddFile => a }
} else {
changedDataAddedFiles ++= winningCommitActions.collect { case a: AddFile => a }
}

// If the log protocol version was upgraded, make sure we are still okay.
// Fail the transaction if we're trying to upgrade protocol ourselves.
if (protocol.nonEmpty) {
Expand All @@ -446,22 +486,86 @@ trait OptimisticTransactionImpl extends TransactionalWrite {
case _ =>
}
}

// Fail if the metadata is different than what the txn read.
if (metadataUpdates.nonEmpty) {
throw new MetadataChangedException(commitInfo)
}
// Fail if the data is different than what the txn read.
if (dependsOnFiles && fileActions.nonEmpty) {
throw new ConcurrentWriteException(commitInfo)

// Fail if new files have been added that the txn should have read.
val addedFilesToCheckForConflicts = commitIsolationLevel match {
case Serializable => changedDataAddedFiles ++ blindAppendAddedFiles
case WriteSerializable => changedDataAddedFiles // don't conflict with blind appends
case SnapshotIsolation => Seq.empty
}
val predicatesMatchingAddedFiles = ExpressionSet(readPredicates).iterator.flatMap { p =>
val conflictingFile = DeltaLog.filterFileList(
metadata.partitionColumns,
addedFilesToCheckForConflicts.toDF(), p :: Nil).as[AddFile].take(1)

conflictingFile.headOption.map(f => getPrettyPartitionMessage(f.partitionValues))
}.take(1).toArray

if (predicatesMatchingAddedFiles.nonEmpty) {
val isWriteSerializable = commitIsolationLevel == WriteSerializable
val onlyAddFiles =
winningCommitActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile])

val retryMsg =
if (isWriteSerializable && onlyAddFiles && isBlindAppendOption.isEmpty) {
// This transaction was made by an older version which did not set `isBlindAppend` flag.
// So even if it looks like an append, we don't know for sure if it was a blind append
// or not. So we suggest them to upgrade all there workloads to latest version.
Some(
"Upgrading all your concurrent writers to use the latest Delta Lake may " +
"avoid this error. Please upgrade and then retry this operation again.")
} else None
throw new ConcurrentAppendException(commitInfo, predicatesMatchingAddedFiles.head, retryMsg)
}

// Fail if files have been deleted that the txn read.
val readFilePaths = readFiles.map(f => f.path -> f.partitionValues).toMap
val deleteReadOverlap = removedFiles.find(r => readFilePaths.contains(r.path))
if (deleteReadOverlap.nonEmpty) {
val filePath = deleteReadOverlap.get.path
val partition = getPrettyPartitionMessage(readFilePaths(filePath))
throw new ConcurrentDeleteReadException(commitInfo, s"$filePath in $partition")
}

// Fail if a file is deleted twice.
val txnDeletes = actions.collect { case r: RemoveFile => r }.map(_.path).toSet
val deleteOverlap = removedFiles.map(_.path).toSet intersect txnDeletes
if (deleteOverlap.nonEmpty) {
throw new ConcurrentDeleteDeleteException(commitInfo, deleteOverlap.head)
}

// Fail if idempotent transactions have conflicted.
val txnOverlap = txns.map(_.appId).toSet intersect readTxn.toSet
if (txnOverlap.nonEmpty) {
throw new ConcurrentTransactionException(commitInfo)
}
}
logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttempt), retrying.")
doCommit(nextAttempt, actions, attemptNumber + 1)

logInfo(s"No logical conflicts with deltas [$checkVersion, $nextAttemptVersion), retrying.")
doCommit(nextAttemptVersion, actions, attemptNumber + 1, commitIsolationLevel)
}

/** Returns the next attempt version given the last attempted version */
protected def getNextAttemptVersion(previousAttemptVersion: Long): Long = {
deltaLog.update()
deltaLog.snapshot.version + 1
}

/** A helper function for pretty printing a specific partition directory. */
protected def getPrettyPartitionMessage(partitionValues: Map[String, String]): String = {
if (metadata.partitionColumns.isEmpty) {
"the root of the table"
} else {
val partition = metadata.partitionColumns.map { name =>
s"$name=${partitionValues(name)}"
}.mkString("[", ", ", "]")
s"partition ${partition}"
}
}

/** Register a hook that will be executed once a commit is successful. */
Expand Down
91 changes: 91 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/isolationLevels.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2019 Databricks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

/**
* Trait that defines the level consistency guarantee is going to be provided by
* `OptimisticTransaction.commit()`. [[Serializable]] is the most
* strict level and [[SnapshotIsolation]] is the least strict one.
*
* @see [[IsolationLevel.allLevelsInDescOrder]] for all the levels in the descending order
* of strictness and [[IsolationLevel.DEFAULT]] for the default table isolation level.
*/
sealed trait IsolationLevel {
override def toString: String = this.getClass.getSimpleName.stripSuffix("$")
}

/**
* This isolation level will ensure serializability between all read and write operations.
* Specifically, for write operations, this mode will ensure that the result of
* the table will be perfectly consistent with the visible history of operations, that is,
* as if all the operations were executed sequentially one by one.
*/
case object Serializable extends IsolationLevel

/**
* This isolation level will ensure snapshot isolation consistency guarantee between write
* operations only. In other words, if only the write operations are considered, then
* there exists a serializable sequence between them that would produce the same result
* as seen in the table. However, if both read and write operations are considered, then
* there may not exist a serializable sequence that would explain all the observed reads.
*
* This provides a lower consistency guarantee than [[Serializable]] but a higher
* availability than that. For example, unlike [[Serializable]], this level allows an UPDATE
* operation to be committed even if there was a concurrent INSERT operation that has already
* added data that should have been read by the UPDATE. It will be as if the UPDATE was executed
* before the INSERT even if the former was committed after the latter. As a side effect,
* the visible history of operations may not be consistent with the
* result expected if these operations were executed sequentially one by one.
*/
case object WriteSerializable extends IsolationLevel

/**
* This isolation level will ensure that all reads will see a consistent
* snapshot of the table and any transactional write will successfully commit only
* if the values updated by the transaction have not been changed externally since
* the snapshot was read by the transaction.
*
* This provides a lower consistency guarantee than [[WriteSerializable]] but a higher
* availability than that. For example, unlike [[WriteSerializable]], this level allows two
* concurrent UPDATE operations reading the same data to be committed successfully as long as
* they don't modify the same data.
*
* Note that for operations that do not modify data in the table, Snapshot isolation is same
* as Serializablity. Hence such operations can be safely committed with Snapshot isolation level.
*/
case object SnapshotIsolation extends IsolationLevel


object IsolationLevel {

val DEFAULT = WriteSerializable

/** All possible isolation levels in descending order of guarantees provided */
val allLevelsInDescOrder: Seq[IsolationLevel] = Seq(
Serializable,
WriteSerializable,
SnapshotIsolation)

/** All the valid isolation levels that can be specified as the table isolation level */
val validTableIsolationLevels = Set[IsolationLevel](Serializable, WriteSerializable)

def fromString(s: String): IsolationLevel = {
allLevelsInDescOrder.find(_.toString.equalsIgnoreCase(s)).getOrElse {
throw new IllegalArgumentException(s"invalid isolation level '$s'")
}
}
}
Loading

0 comments on commit f328300

Please sign in to comment.