Skip to content

Commit

Permalink
Merge branch 'skipSnapshotAtBatch' into state-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 27, 2024
2 parents 271b98e + d140708 commit cd6a39b
Show file tree
Hide file tree
Showing 889 changed files with 187 additions and 386 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3775,7 +3775,7 @@
},
"STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY" : {
"message" : [
"The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplayFromSnapshot.",
"The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.SupportsFineGrainedReplay.",
"Therefore, it does not support option snapshotStartBatchId in state data source."
],
"sqlState" : "42K06"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ class StatePartitionReader(
case None => provider.getReadStore(partition.sourceOptions.batchId + 1)

case Some(snapshotStartBatchId) =>
if (!provider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
if (!provider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
provider.getClass.toString)
}
provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
provider.asInstanceOf[SupportsFineGrainedReplay]
.replayReadStateFromSnapshot(
snapshotStartBatchId + 1,
partition.sourceOptions.batchId + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ import org.apache.spark.util.ArrayImplicits._
* store.
*/
private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging
with SupportsFineGrainedReplayFromSnapshot with SupportsStateStoreChangeDataFeed {
with SupportsFineGrainedReplay {

private val providerName = "HDFSBackedStateStoreProvider"

Expand Down Expand Up @@ -892,86 +892,89 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* startVersion. If snapshot for startVersion does not exist, an error will be thrown.
* snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param startVersion checkpoint version of the snapshot to start with
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(startVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, startVersion)} to " +
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update")
new HDFSBackedStateStore(endVersion, newMap)
}

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* startVersion. If snapshot for startVersion does not exist, an error will be thrown.
* snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param startVersion checkpoint version of the snapshot to start with
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedReadStateStore]]
*/
override def replayReadStateFromSnapshot(startVersion: Long, endVersion: Long): ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(startVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, startVersion)} to " +
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for readonly")
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for read-only")
new HDFSBackedReadStateStore(endVersion, newMap)
}

/**
* Consturct the state at endVersion from snapshot from startVersion.
* Consturct the state at endVersion from snapshot from snapshotVersion.
* Returns a new [[HDFSBackedStateStoreMap]]
* @param startVersion checkpoint version of the snapshot to start with
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
private def replayLoadedMapForStoreFromSnapshot(startVersion: Long, endVersion: Long):
private def replayLoadedMapForStoreFromSnapshot(snapshotVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = synchronized {
try {
if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < startVersion || endVersion < 0) {
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}

val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
if (endVersion != 0) {
newMap.putAll(constructMapFromSnapshot(startVersion, endVersion))
}
newMap.putAll(constructMapFromSnapshot(snapshotVersion, endVersion))

newMap
}
catch {
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

private def constructMapFromSnapshot(startVersion: Long, endVersion: Long):
private def constructMapFromSnapshot(snapshotVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = {
val (result, elapsedMs) = Utils.timeTakenMs {
val startVersionMap = synchronized { Option(loadedMaps.get(startVersion)) } match {
val startVersionMap = synchronized { Option(loadedMaps.get(snapshotVersion)) } match {
case Some(value) => Option(value)
case None => readSnapshotFile(startVersion)
case None => readSnapshotFile(snapshotVersion)
}
if (startVersionMap.isEmpty) {
throw StateStoreErrors.stateStoreSnapshotFileNotFound(
snapshotFile(startVersion).toString, toString())
snapshotFile(snapshotVersion).toString, toString())
}

// Load all the deltas from the version after the start version up to the end version.
val resultMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
resultMap.putAll(startVersionMap.get)
for (deltaVersion <- startVersion + 1 to endVersion) {
for (deltaVersion <- snapshotVersion + 1 to endVersion) {
updateFromDeltaFile(deltaVersion, resultMap)
}

resultMap
}

logDebug(s"Loading state from $startVersion to $endVersion takes $elapsedMs ms.")
logDebug(s"Loading snapshot at version $snapshotVersion and apply delta files to version " +
s"$endVersion takes $elapsedMs ms.")

result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,25 @@ class RocksDB(
* end version. Note that this will copy all the necessary files from DFS to local disk as needed,
* and possibly restart the native RocksDB instance.
*
* @param startVersion version of the snapshot to start with
* @param snapshotVersion version of the snapshot to start with
* @param endVersion end version
* @return A RocksDB instance loaded with the state endVersion replayed from startVersion.
* @return A RocksDB instance loaded with the state endVersion replayed from snapshotVersion.
* Note that the instance will be read-only since this method is only used in State Data
* Source.
*/
def loadFromSnapshot(startVersion: Long, endVersion: Long): RocksDB = {
assert(startVersion >= 0 && endVersion >= startVersion)
def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = {
assert(snapshotVersion >= 0 && endVersion >= snapshotVersion)
acquire(LoadStore)
recordedMetrics = None
logInfo(
log"Loading ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, startVersion)}")
log"Loading snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
try {
replayFromCheckpoint(startVersion, endVersion)
replayFromCheckpoint(snapshotVersion, endVersion)

logInfo(
log"Loaded ${MDC(LogKeys.VERSION_NUM, endVersion)} from " +
log"${MDC(LogKeys.VERSION_NUM, startVersion)}")
log"Loaded snapshot at version ${MDC(LogKeys.VERSION_NUM, snapshotVersion)} and apply " +
log"changelog files to version ${MDC(LogKeys.VERSION_NUM, endVersion)}.")
} catch {
case t: Throwable =>
loadedVersion = -1 // invalidate loaded data
Expand All @@ -263,33 +263,31 @@ class RocksDB(
* end version.
* If the start version does not exist, it will throw an exception.
*
* @param startVersion start checkpoint version
* @param snapshotVersion start checkpoint version
* @param endVersion end version
*/
private def replayFromCheckpoint(startVersion: Long, endVersion: Long): Any = {
if (loadedVersion != startVersion) {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(startVersion, workingDir)
loadedVersion = startVersion

// reset last snapshot version
if (lastSnapshotVersion > startVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = {
closeDB()
val metadata = fileManager.loadCheckpointFromDfs(snapshotVersion, workingDir)
loadedVersion = snapshotVersion

// reset last snapshot version
if (lastSnapshotVersion > snapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
latestSnapshot = None
}
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being track
-1L
} else if (metadata.numKeys < 0) {
// we track the total number of rows, but the snapshot doesn't have tracking number
// need to count keys now
countKeys()
} else {
metadata.numKeys
}
if (loadedVersion != endVersion) replayChangelog(endVersion)
// After changelog replay the numKeysOnWritingVersion will be updated to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils

private[sql] class RocksDBStateStoreProvider
extends StateStoreProvider with Logging with Closeable
with SupportsFineGrainedReplayFromSnapshot with SupportsStateStoreChangeDataFeed {
with SupportsFineGrainedReplay {
import RocksDBStateStoreProvider._

class RocksDBStateStore(lastVersion: Long) extends StateStore {
Expand Down Expand Up @@ -371,31 +371,15 @@ private[sql] class RocksDBStateStoreProvider
if (!condition) { throw new IllegalStateException(msg) }
}

override def replayStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
if (snapshotVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
}
if (endVersion < startVersion) {
if (endVersion < snapshotVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(startVersion, endVersion)
new RocksDBStateStore(endVersion)
}
catch {
case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
}
}

override def replayReadStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
try {
if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
}
if (endVersion < startVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(startVersion, endVersion)
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
new RocksDBStateStore(endVersion)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,32 +444,31 @@ object StateStoreProvider {
* grained state data which is replayed from a specific snapshot version. It is used by the
* snapshotStartBatchId option in state data source.
*/
trait SupportsFineGrainedReplayFromSnapshot {
trait SupportsFineGrainedReplay {
/**
* Used by snapshotStartBatchId option when reading state generated by join operation as data
* source.
* Return an instance of [[StateStore]] representing state data of the given version.
* The State Store will be constructed from the snapshot at startVersion, and applying delta files
* up to the endVersion. If there is no snapshot file at startVersion, an exception will be
* thrown.
* The State Store will be constructed from the snapshot at snapshotVersion, and applying delta
* files up to the endVersion. If there is no snapshot file at snapshotVersion, an exception will
* be thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore

/**
* Used by snapshotStartBatchId option when reading state generated by all stateful operations
* except join as data source.
* Return an instance of [[ReadStateStore]] representing state data of the given version.
* The State Store will be constructed from the snapshot at startVersion, and applying delta files
* up to the endVersion. If there is no snapshot file at startVersion, an exception will be
* thrown.
* The State Store will be constructed from the snapshot at snapshotVersion, and applying delta
* files up to the endVersion. If there is no snapshot file at snapshotVersion, an exception will
* be thrown.
* Only implement this if there is read-only optimization for the state store.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore = {
new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, endVersion))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,11 @@ class SymmetricHashJoinStateManager(
// This class will manage the state store provider by itself.
initializeStateStoreProvider(keySchema, valueSchema)
if (snapshotStartVersion.isDefined) {
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplay]) {
throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
stateStoreProvider.getClass.toString)
}
stateStoreProvider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
stateStoreProvider.asInstanceOf[SupportsFineGrainedReplay]
.replayStateFromSnapshot(snapshotStartVersion.get, stateInfo.get.storeVersion)
} else {
stateStoreProvider.getStore(stateInfo.get.storeVersion)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit cd6a39b

Please sign in to comment.