From 337785dac6e06a9b75751ab370df989a135d6968 Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Fri, 28 Jun 2024 17:07:18 -0700 Subject: [PATCH 1/2] address comments from Anish --- .../state/HDFSBackedStateStoreProvider.scala | 12 +- .../state/RocksDBStateStoreProvider.scala | 8 + .../streaming/state/StateStore.scala | 1 + .../streaming/state/StateStoreErrors.scala | 4 +- .../v2/state/StateDataSourceReadSuite.scala | 164 +++++++++--------- 5 files changed, 99 insertions(+), 90 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 3b8a647f371d4..6d31acb784aee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -891,15 +891,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } /** - * Get the state store of endVersion for reading by applying delta files on the snapshot of - * snapshotVersion. If snapshot for snapshotVersion does not exist, an error will be thrown. + * Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion. + * If snapshot for snapshotVersion does not exist, an error will be thrown. * * @param snapshotVersion checkpoint version of the snapshot to start with * @param endVersion checkpoint version to end with * @return [[HDFSBackedStateStore]] */ override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { - val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) + val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) logInfo(log"Retrieved snapshot at version " + log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + @@ -917,7 +917,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with */ override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore = { - val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion) + val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) logInfo(log"Retrieved snapshot at version " + log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + @@ -926,12 +926,12 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } /** - * Consturct the state at endVersion from snapshot from snapshotVersion. + * Construct the state map at endVersion from snapshot of version snapshotVersion. * Returns a new [[HDFSBackedStateStoreMap]] * @param snapshotVersion checkpoint version of the snapshot to start with * @param endVersion checkpoint version to end with */ - private def replayLoadedMapForStoreFromSnapshot(snapshotVersion: Long, endVersion: Long): + private def replayLoadedMapFromSnapshot(snapshotVersion: Long, endVersion: Long): HDFSBackedStateStoreMap = synchronized { try { if (snapshotVersion < 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 6eaad8add2bbc..a555f9a40044a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -369,6 +369,14 @@ private[sql] class RocksDBStateStoreProvider if (!condition) { throw new IllegalStateException(msg) } } + /** + * Get the state store of endVersion by applying delta files on the snapshot of snapshotVersion. + * If snapshot for snapshotVersion does not exist, an error will be thrown. + * + * @param snapshotVersion checkpoint version of the snapshot to start with + * @param endVersion checkpoint version to end with + * @return [[StateStore]] + */ override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { try { if (snapshotVersion < 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index c3d2c5f67aebb..b8739fb8e4cf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -445,6 +445,7 @@ object StateStoreProvider { * snapshotStartBatchId option in state data source. */ trait SupportsFineGrainedReplay { + /** * Return an instance of [[StateStore]] representing state data of the given version. * The State Store will be constructed from the snapshot at snapshotVersion, and applying delta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 4e623d0fd5ea8..026a1eac166ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -272,7 +272,7 @@ class StateStoreValueSchemaNotCompatible( "newValueSchema" -> newValueSchema)) class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String) - extends SparkUnsupportedOperationException( + extends SparkRuntimeException( errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE", messageParameters = Map( "fileToRead" -> fileToRead, @@ -280,7 +280,7 @@ class StateStoreSnapshotFileNotFound(fileToRead: String, clazz: String) class StateStoreSnapshotPartitionNotFound( snapshotPartitionId: Long, operatorId: Int, checkpointLocation: String) - extends SparkUnsupportedOperationException( + extends SparkRuntimeException( errorClass = "CANNOT_LOAD_STATE_STORE.SNAPSHOT_PARTITION_ID_NOT_FOUND", messageParameters = Map( "snapshotPartitionId" -> snapshotPartitionId.toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 7872d8de8b8a5..e6cdd0dce9efa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -1065,27 +1065,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnLimitState(providerName: String): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = inputData.toDF().limit(10) - testStream(query)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (7, 7L), (8, 8L), (9, 9L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (10, 10L), (11, 11L), (12, 12L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = inputData.toDF().limit(10) + testStream(query)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (7, 7L), (8, 8L), (9, 9L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (10, 10L), (11, 11L), (12, 12L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/" @@ -1096,27 +1096,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnAggregateState(providerName: String): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = inputData.toDF().groupBy("_1").count() - testStream(query, OutputMode.Update)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = inputData.toDF().groupBy("_1").count() + testStream(query, OutputMode.Update)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/" @@ -1127,27 +1127,27 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnDeduplicateState(providerName: String): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = inputData.toDF().dropDuplicates("_1") - testStream(query)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = inputData.toDF().dropDuplicates("_1") + testStream(query)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (2, 2L), (3, 3L), (4, 4L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (3, 3L), (4, 4L), (5, 5L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (4, 4L), (5, 5L), (6, 6L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/dedup/" @@ -1158,25 +1158,25 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass protected def testSnapshotOnJoinState(providerName: String, stateVersion: Int): Unit = { /** The golden files are generated by: - withSQLConf({ - SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" - }) { - val inputData = MemoryStream[(Int, Long)] - val query = getStreamStreamJoinQuery(inputData) - testStream(query)( - StartStream(checkpointLocation = <...>), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) } - ) - } + withSQLConf({ + SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> stateVersion.toString + SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100" + }) { + val inputData = MemoryStream[(Int, Long)] + val query = getStreamStreamJoinQuery(inputData) + testStream(query)( + StartStream(checkpointLocation = <...>), + AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) }, + AddData(inputData, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), + ProcessAllAvailable(), + Execute { _ => Thread.sleep(2000) } + ) + } */ val resourceUri = this.getClass.getResource( s"/structured-streaming/checkpoint-version-4.0.0/$providerName/join$stateVersion/" From 9dbe2959ee1c046b0502d579a090983ce7242ebc Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Mon, 1 Jul 2024 21:54:33 -0700 Subject: [PATCH 2/2] minor --- .../streaming/state/HDFSBackedStateStoreProvider.scala | 4 ++-- .../sql/execution/streaming/state/StateStoreErrors.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 6d31acb784aee..c4a41ceb4caf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -901,7 +901,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = { val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) logInfo(log"Retrieved snapshot at version " + - log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + + log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " + log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for update") new HDFSBackedStateStore(endVersion, newMap) @@ -919,7 +919,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with ReadStateStore = { val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion) logInfo(log"Retrieved snapshot at version " + - log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" + + log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " + log"${MDC(LogKeys.STATE_STORE_VERSION, endVersion)} of " + log"${MDC(LogKeys.STATE_STORE_PROVIDER, HDFSBackedStateStoreProvider.this)} for read-only") new HDFSBackedReadStateStore(endVersion, newMap) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 026a1eac166ae..0844a87739765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -298,6 +298,6 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String) messageParameters = Map("errorMsg" -> errorMsg)) class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String) - extends SparkUnsupportedOperationException( - errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY", - messageParameters = Map("inputClass" -> inputClass)) + extends SparkUnsupportedOperationException( + errorClass = "STATE_STORE_PROVIDER_DOES_NOT_SUPPORT_FINE_GRAINED_STATE_REPLAY", + messageParameters = Map("inputClass" -> inputClass))