From 4b2f2f6580c67edfd75938a3820dd5f4e98e8aa1 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Fri, 12 Jul 2024 14:32:27 -0700 Subject: [PATCH 1/2] [SPARK-48888] Remove snapshot creation based on changelog ops size --- .../apache/spark/sql/execution/streaming/state/RocksDB.scala | 3 +-- .../sql/execution/streaming/state/StateStoreChangelog.scala | 5 ----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 6215d1aaf4b55..c748e53467191 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -573,8 +573,7 @@ class RocksDB( if (enableChangelogCheckpointing) { assert(changelogWriter.isDefined) val newVersion = loadedVersion + 1 - newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot || - changelogWriter.get.size > 10000 + newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot } else true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index d189daa6e841b..9c8803c8a17c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -96,7 +96,6 @@ abstract class StateStoreChangelogWriter( protected var backingFileStream: CancellableFSDataOutputStream = fm.createAtomic(file, overwriteIfPossible = true) protected var compressedStream: DataOutputStream = compressStream(backingFileStream) - var size = 0 def put(key: Array[Byte], value: Array[Byte]): Unit @@ -147,7 +146,6 @@ class StateStoreChangelogWriterV1( compressedStream.write(key) compressedStream.writeInt(value.size) compressedStream.write(value) - size += 1 } override def delete(key: Array[Byte]): Unit = { @@ -156,7 +154,6 @@ class StateStoreChangelogWriterV1( compressedStream.write(key) // -1 in the value field means record deletion. compressedStream.writeInt(-1) - size += 1 } override def merge(key: Array[Byte], value: Array[Byte]): Unit = { @@ -208,7 +205,6 @@ class StateStoreChangelogWriterV2( compressedStream.write(key) // -1 in the value field means record deletion. compressedStream.writeInt(-1) - size += 1 } override def merge(key: Array[Byte], value: Array[Byte]): Unit = { @@ -225,7 +221,6 @@ class StateStoreChangelogWriterV2( compressedStream.write(key) compressedStream.writeInt(value.size) compressedStream.write(value) - size += 1 } def commit(): Unit = { From 90446b334ec5024eef963bbd8ec1996941a5b4d9 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Fri, 12 Jul 2024 15:11:28 -0700 Subject: [PATCH 2/2] Fix test --- .../streaming/state/RocksDBSuite.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index e309b3842cd6f..5e9a5ecf44fc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -291,25 +291,26 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.doMaintenance() assert(snapshotVersionsPresent(remoteDir) === Seq(3)) db.load(3) - for (i <- 1 to 10001) { - db.put(i.toString, i.toString) - } - db.commit() - db.doMaintenance() - // Snapshot should be created this time because the size of the change log > 1000 - assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4)) - for (version <- 4 to 7) { + + for (version <- 3 to 7) { db.load(version) db.commit() db.doMaintenance() } - assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7)) - for (version <- 8 to 20) { + assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6)) + for (version <- 8 to 17) { db.load(version) db.commit() } db.doMaintenance() - assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7, 19)) + assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18)) + } + + withDB(remoteDir, conf = conf) { db => + db.load(18) + db.commit() + db.doMaintenance() + assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 19)) } }