Skip to content

Commit

Permalink
[SPARK-48903][SS] Set the RocksDB last snapshot version correctly on …
Browse files Browse the repository at this point in the history
…remote load

### What changes were proposed in this pull request?
Set the RocksDB last snapshot version correctly on remote load

### Why are the changes needed?
Avoid creating full snapshot on every first batch after restart and also reset a snapshot that is likely no longer valid

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests
```
===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true), ForkJoinPool.common...
[info] Run completed in 4 minutes, 40 seconds.
[info] Total number of tests run: 176
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47363 from anishshri-db/task/SPARK-48903.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and jingz-db committed Jul 22, 2024
1 parent c9c0ab2 commit b9254d0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,16 @@ class RocksDB(
// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
synchronized {
if (latestSnapshot.isDefined) {
oldSnapshots += latestSnapshot.get
latestSnapshot = None
}
}
}

// reset the last snapshot version to the latest available snapshot version
lastSnapshotVersion = latestSnapshotVersion
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,19 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
}

// pick up from the last snapshot and the next upload will be for version 21
withDB(remoteDir, conf = conf) { db =>
db.load(18)
db.commit()
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 19))
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))

for (version <- 19 to 20) {
db.load(version)
db.commit()
}
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 21))
}
}

Expand Down Expand Up @@ -1645,7 +1653,6 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared

testWithChangelogCheckpointingEnabled("time travel 5 -" +
"validate successful RocksDB load when metadata file is not overwritten") {
// Ensure commit doesn't modify the latestSnapshot that doMaintenance will upload
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
"NoOverwriteFileSystemBasedCheckpointFileManager"
withTempDir { dir =>
Expand All @@ -1663,9 +1670,9 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
db.load(0)
db.put("a", "1")

// upload version 1 snapshot created above
// do not upload version 1 snapshot created previously
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) == Seq(1))
assert(snapshotVersionsPresent(remoteDir) == Seq.empty)

db.commit() // create snapshot again

Expand Down

0 comments on commit b9254d0

Please sign in to comment.