Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48903][SS] Set the RocksDB last snapshot version correctly on remote load #47363

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,16 @@ class RocksDB(
// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
synchronized {
if (latestSnapshot.isDefined) {
oldSnapshots += latestSnapshot.get
latestSnapshot = None
}
}
}

// reset the last snapshot version to the latest available snapshot version
lastSnapshotVersion = latestSnapshotVersion
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,19 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
}

// pick up from the last snapshot and the next upload will be for version 21
withDB(remoteDir, conf = conf) { db =>
db.load(18)
db.commit()
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 19))
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))

for (version <- 19 to 20) {
db.load(version)
db.commit()
}
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 21))
}
}

Expand Down Expand Up @@ -1645,7 +1653,6 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared

testWithChangelogCheckpointingEnabled("time travel 5 -" +
"validate successful RocksDB load when metadata file is not overwritten") {
// Ensure commit doesn't modify the latestSnapshot that doMaintenance will upload
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
"NoOverwriteFileSystemBasedCheckpointFileManager"
withTempDir { dir =>
Expand All @@ -1663,9 +1670,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
db.load(0)
db.put("a", "1")

// upload version 1 snapshot created above
// do not upload version 1 snapshot created previously
db.doMaintenance()
assert(snapshotVersionsPresent(remoteDir) == Seq(1))
anishshri-db marked this conversation as resolved.
Show resolved Hide resolved

db.commit() // create snapshot again

Expand Down