Skip to content

Commit

Permalink
Merge branch 'skipSnapshotAtBatch' into state-cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jul 2, 2024
2 parents 6d6d511 + 9dbe295 commit 7354408
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -891,17 +891,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}

/**
* Get the state store of endVersion for reading by applying delta files on the snapshot of
* snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown.
* Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion.
* If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update")
new HDFSBackedStateStore(endVersion, newMap)
Expand All @@ -917,21 +917,21 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*/
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " +
log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for read-only")
new HDFSBackedReadStateStore(endVersion, newMap)
}

/**
* Consturct the state at endVersion from snapshot from snapshotVersion.
* Construct the state map at endVersion from snapshot of version snapshotVersion.
* Returns a new [[HDFSBackedStateStoreMap]]
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
private def replayLoadedMapForStoreFromSnapshot(snapshotVersion: Long, endVersion: Long):
private def replayLoadedMapFromSnapshot(snapshotVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = synchronized {
try {
if (snapshotVersion < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ private[sql] class RocksDBStateStoreProvider
if (!condition) { throw new IllegalStateException(msg) }
}

/**
* Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion.
* If snapshot for snapshotVersion does not exist, an error will be thrown.
*
* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
* @return [[StateStore]]
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
try {
if (snapshotVersion < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ object StateStoreProvider {
* snapshotStartBatchId or readChangeFeed.
*/
trait SupportsFineGrainedReplay {

/**
* Return an instance of [[StateStore]] representing state data of the given version.
* The State Store will be constructed from the snapshot at snapshotVersion, and applying delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,15 @@ class StateStoreValueSchemaNotCompatible(
"newValueSchema" -> newValueSchema))

class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String)
extends SparkUnsupportedOperationException(
extends SparkRuntimeException(
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead,
"clazz" -> clazz))

class StateStoreSnapshotPartitionNotFound(
snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String)
extends SparkUnsupportedOperationException(
extends SparkRuntimeException(
errorClass = "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND",
messageParameters = Map(
"snapshotPartitionId" -> snapshotPartitionId.toString,
Expand All @@ -298,6 +298,6 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String)
messageParameters = Map("errorMsg" -> errorMsg))

class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String)
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
messageParameters = Map("inputClass" -> inputClass))
extends SparkUnsupportedOperationException(
errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY",
messageParameters = Map("inputClass" -> inputClass))
Original file line number Diff line number Diff line change
Expand Up @@ -1065,27 +1065,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnLimitState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().limit(10)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (7, 7L), (8, 8L), (9, 9L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (10, 10L), (11, 11L), (12, 12L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().limit(10)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (7, 7L), (8, 8L), (9, 9L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (10, 10L), (11, 11L), (12, 12L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/"
Expand All @@ -1096,27 +1096,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnAggregateState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().groupBy("_1").count()
testStream(query, OutputMode.Update)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().groupBy("_1").count()
testStream(query, OutputMode.Update)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/"
Expand All @@ -1127,27 +1127,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnDeduplicateState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().dropDuplicates("_1")
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = inputData.toDF().dropDuplicates("_1")
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (4, 4L), (5, 5L), (6, 6L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/"
Expand All @@ -1158,25 +1158,25 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass

protected def testSnapshotOnJoinState(providerName: String, stateVersion: Int): Unit = {
/** The golden files are generated by:
withSQLConf({
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = getStreamStreamJoinQuery(inputData)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
withSQLConf({
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100"
}) {
val inputData = MemoryStream[(Int, Long)]
val query = getStreamStreamJoinQuery(inputData)
testStream(query)(
StartStream(checkpointLocation = <...>),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) },
AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)),
ProcessAllAvailable(),
Execute { _ => Thread.sleep(2000) }
)
}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/join$stateVersion/"
Expand Down

0 comments on commit 7354408

Please sign in to comment.