diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 7ec45db2cdf17..869c4f3c2e5e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -891,17 +891,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } /** - * Get the state store of endVersion for reading by applying delta files on the snapshot of - * snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown. + * Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion. + * If snapshot for snapshotVersion does not exist, an error will be thrown. * * @param snapshotVersion checkpoint version of the snapshot to start with * @param endVersion checkpoint version to end with * @return [[HDFSBackedStateStore]] */ override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { - val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) + val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) logInfo(log"Retrieved snapshot at version " + - log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + + log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " + log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update") new HDFSBackedStateStore(endVersion, newMap) @@ -917,21 +917,21 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with */ override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore = { - val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) + val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) logInfo(log"Retrieved snapshot at version " + - log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + + log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " + log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for read-only") new HDFSBackedReadStateStore(endVersion, newMap) } /** - * Consturct the state at endVersion from snapshot from snapshotVersion. + * Construct the state map at endVersion from snapshot of version snapshotVersion. * Returns a new [[HDFSBackedStateStoreMap]] * @param snapshotVersion checkpoint version of the snapshot to start with * @param endVersion checkpoint version to end with */ - private def replayLoadedMapForStoreFromSnapshot(snapshotVersion: Long, endVersion: Long): + private def replayLoadedMapFromSnapshot(snapshotVersion: Long, endVersion: Long): HDFSBackedStateStoreMap = synchronized { try { if (snapshotVersion < 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 4281c0ce24477..3d2606f3ed0e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -373,6 +373,14 @@ private[sql] class RocksDBStateStoreProvider if (!condition) { throw new IllegalStateException(msg) } } + /** + * Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion. + * If snapshot for snapshotVersion does not exist, an error will be thrown. + * + * @param snapshotVersion checkpoint version of the snapshot to start with + * @param endVersion checkpoint version to end with + * @return [[StateStore]] + */ override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { try { if (snapshotVersion < 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 3afd2b38591f5..00a697fec3729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -445,6 +445,7 @@ object StateStoreProvider { * snapshotStartBatchId or readChangeFeed. */ trait SupportsFineGrainedReplay { + /** * Return an instance of [[StateStore]] representing state data of the given version. * The State Store will be constructed from the snapshot at snapshotVersion, and applying delta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 4e623d0fd5ea8..0844a87739765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -272,7 +272,7 @@ class StateStoreValueSchemaNotCompatible( "newValueSchema" -> newValueSchema)) class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String) - extends SparkUnsupportedOperationException( + extends SparkRuntimeException( errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE", messageParameters = Map( "fileToRead" -> fileToRead, @@ -280,7 +280,7 @@ class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String) class StateStoreSnapshotPartitionNotFound( snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String) - extends SparkUnsupportedOperationException( + extends SparkRuntimeException( errorClass = "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND", messageParameters = Map( "snapshotPartitionId" -> snapshotPartitionId.toString, @@ -298,6 +298,6 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String) messageParameters = Map("errorMsg" -> errorMsg)) class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) - extends SparkUnsupportedOperationException( - errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY", - messageParameters = Map("inputClass" -> inputClass)) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY", + messageParameters = Map("inputClass" -> inputClass)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 7872d8de8b8a5..e6cdd0dce9efa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -1065,27 +1065,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnLimitState(providerName: String): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = inputData.toDF().limit(10) - testStream(query)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (7, 7L), (8, 8L), (9, 9L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (10, 10L), (11, 11L), (12, 12L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = inputData.toDF().limit(10) + testStream(query)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (7, 7L), (8, 8L), (9, 9L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (10, 10L), (11, 11L), (12, 12L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/" @@ -1096,27 +1096,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnAggregateState(providerName: String): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = inputData.toDF().groupBy("_1").count() - testStream(query, OutputMode.Update)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = inputData.toDF().groupBy("_1").count() + testStream(query, OutputMode.Update)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/" @@ -1127,27 +1127,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnDeduplicateState(providerName: String): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = inputData.toDF().dropDuplicates("_1") - testStream(query)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = inputData.toDF().dropDuplicates("_1") + testStream(query)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/" @@ -1158,25 +1158,25 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnJoinState(providerName: String, stateVersion: Int): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = getStreamStreamJoinQuery(inputData) - testStream(query)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = getStreamStreamJoinQuery(inputData) + testStream(query)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/join$stateVersion/"