Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49363][SS][TESTS] Add unit tests for potential RocksDB state store SST file mismatch #47850

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.Random

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -1770,6 +1771,138 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

testWithChangelogCheckpointingEnabled("reloading the same version") {
// Keep executing the same batch for two or more times. Some queries with ForEachBatch
// will cause this behavior.
// The test was accidentally fixed by SPARK-48586 (https://github.com/apache/spark/pull/47130)
val remoteDir = Utils.createTempDir().toString
val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
new File(remoteDir).delete() // to make sure that the directory gets created
withDB(remoteDir, conf = conf) { db =>
// load the same version of pending snapshot uploading
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get how this test is simulating the scenario described here. We are talking about the race condition, but this test does not trigger the race condition because doMaintenance() is synchronous.

Same with randomized test in below. Both tests do not fail with reverting SPARK-48931.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry the ticket I gave was the wrong one. I updated it and it should now work.

The commit cannot be easily reverted now, but I applied the task to the parent hash of the commit (b5a55e4) and confirmed that it would fail:

[info] - reloading the same version (with changelog checkpointing) *** FAILED *** (292 milliseconds)
[info]   org.rocksdb.RocksDBException: Mismatch in unique ID on table file 8. Expected: {12521394303566436904,8218421418606057953} Actual: {12521394303566436907,7844327260652356763}  The file /home/siying.dong/spark2/target/tmp/spark-3438851b-7e84-4630-a96d-22b595c58b1b/workingDir-a30c2e57-73c7-49fa-b96b-5b6d8b863383/MANIFEST-000005 may be corrupted.
[info]   at org.rocksdb.RocksDB.open(Native Method)
[info]   at org.rocksdb.RocksDB.open(RocksDB.java:325)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:901)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:194)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$267(RocksDBSuite.scala:2052)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$267$adapted(RocksDBSuite.scala:2020)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.withDB(RocksDBSuite.scala:2411)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$266(RocksDBSuite.scala:2020)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(RocksDBSuite.scala:165)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:248)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:246)
[info]   at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.withSQLConf(RocksDBSuite.scala:165)
[info]   at org.apache.spark.sql.execution.streaming.state.AlsoTestWithChangelogCheckpointingEnabled.$anonfun$testWithChangelogCheckpointingEnabled$1(RocksDBSuite.scala:107)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)

and after applying the commit 40ad829, the test would pass.

I can explain why it fails in person. But it is nothing to do with data race (I hope I didn't say it in the commnts). It's related to the sequence of:

  1. create snapshot for version n
  2. reload another version n from cloud
  3. upload snapshot n (overwriting the existing one)
    and this uploading is problematic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah now I get the picture of it. Was confused due to incorrect JIRA ticket. Thanks for the detailed explanation!

// This is possible because after committing version x, we can continue to x+1, and replay
// x+1. The replay will load a checkpoint by version x. At this moment, the snapshot
// uploading may not be finished.
// Previously this generated a problem: new files generated by reloading are added to
// local -> cloud file map and the information is used to skip some files uploading, which is
// wrong because these files aren't a part of the RocksDB checkpoint.
// This test was accidentally fixed by
// SPARK-48931 (https://github.com/apache/spark/pull/47393)

db.load(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to ask about more effort, but shall we leave "walkthrough" code comment for future readers? I think it'd be much easier to understand than understanding the scenario described in above, and try to think through by themselves. Let's ensure that the test is understandable for moderate people.

Please refer to the test suites for stateful operator which we track watermark value (and state rows for complicated case) - we put code comment per microbatch to walkthrough.

db.put("foo", "bar")
// Snapshot checkpoint not needed
db.commit()

// Continue using local DB
db.load(1)
db.put("foo", "bar")
// Should create a local RocksDB snapshot
db.commit()
// Upload the local RocksDB snapshot to the cloud with 2.zip
db.doMaintenance()

// This will reload Db from the cloud.
db.load(1)
db.put("foo", "bar")
// Should create another local snapshot
db.commit()

// Continue using local DB
db.load(2)
db.put("foo", "bar")
// Snapshot checkpoint not needed
db.commit()

// Reload DB from the cloud, loading from 2.zip
db.load(2)
db.put("foo", "bar")
// Snapshot checkpoint not needed
db.commit()

// Will upload local snapshot and overwrite 2.zip
db.doMaintenance()

// Reload new 2.zip just uploaded to validate it is not corrupted.
db.load(2)
db.put("foo", "bar")
db.commit()

// Test the maintenance thread is delayed even after the next snapshot is created.
// There will be two outstanding snapshots.
for (batchVersion <- 3 to 6) {
db.load(batchVersion)
db.put("foo", "bar")
// In batchVersion 3 and 5, it will generate a local snapshot but won't be uploaded.
db.commit()
}
db.doMaintenance()

// Test the maintenance is called after each batch. This tests a common case where
// maintenance tasks finish quickly.
for (batchVersion <- 7 to 10) {
for (j <- 0 to 1) {
db.load(batchVersion)
db.put("foo", "bar")
db.commit()
db.doMaintenance()
}
}
}
}

for (randomSeed <- 1 to 8) {
for (ifTestSkipBatch <- 0 to 1) {
testWithChangelogCheckpointingEnabled(
s"randomized snapshotting $randomSeed ifTestSkipBatch $ifTestSkipBatch") {
// The unit test simulates the case where batches can be reloaded and maintenance tasks
// can be delayed. After each batch, we randomly decide whether we would move onto the
// next batch, and whetehr maintenance task is executed.
val remoteDir = Utils.createTempDir().toString
val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = false)
new File(remoteDir).delete() // to make sure that the directory gets created
withDB(remoteDir, conf = conf) { db =>
// A second DB is opened to simulate another executor that runs some batches that
// skipped in the current DB.
withDB(remoteDir, conf = conf) { db2 =>
val random = new Random(randomSeed)
var curVer: Int = 0
for (i <- 1 to 100) {
db.load(curVer)
db.put("foo", "bar")
db.commit()
// For a one in five chance, maintenance task is executed. The chance is created to
// simulate the case where snapshot isn't immediatelly uploaded, and even delayed
// so that the next snapshot is ready. We create a snapshot in every 3 batches, so
// with 1/5 chance, it is more likely to create longer maintenance delay.
if (random.nextInt(5) == 0) {
db.doMaintenance()
}
// For half the chance, we move to the next version, and half the chance we keep the
// same version. When the same version is kept, the DB will be reloaded.
if (random.nextInt(2) == 0) {
val inc = if (ifTestSkipBatch == 1) {
random.nextInt(3)
} else {
1
}
if (inc > 1) {
// Create changelog files in the gap
for (j <- 1 to inc - 1) {
db2.load(curVer + j)
db2.put("foo", "bar")
db2.commit()
}
}
curVer = curVer + inc
}
}
}
}
}
}
}

test("validate Rocks DB SST files do not have a VersionIdMismatch" +
" when metadata file is not overwritten - scenario 1") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
Expand Down