Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-46547][SS] Swallow non-fatal exception in maintenance task to …
…avoid deadlock between maintenance thread and streaming aggregation operator ### What changes were proposed in this pull request? Swallow non-fatal exception in maintenance task to avoid deadlock between maintenance thread and streaming aggregation operator ### Why are the changes needed? This change fixes a race condition that causes a deadlock between the task thread and the maintenance thread. This is primarily only possible with the streaming aggregation operator. In this case, we use 2 physical operators - `StateStoreRestoreExec` and `StateStoreSaveExec`. The first one opens the store in read-only mode and the 2nd one does the actual commit. However, the following sequence of events creates an issue 1. Task thread runs the `StateStoreRestoreExec` and gets the store instance and thereby the DB instance lock 2. Maintenance thread fails with an error for some reason 3. Maintenance thread takes the `loadedProviders` lock and tries to call `close` on all the loaded providers 4. Task thread tries to execute the StateStoreRDD for the `StateStoreSaveExec` operator and tries to acquire the `loadedProviders` lock which is held by the thread above So basically if the maintenance thread is interleaved between the `restore/save` operations, there is a deadlock condition based on the `loadedProviders` lock and the DB instance lock. The fix proposes to simply release the resources at the end of the `StateStoreRestoreExec` operator (note that `abort` for `ReadStateStore` is likely a misnomer - but we choose to follow the already provided API in this case) Relevant Logs: Link - https://github.com/anishshri-db/spark/actions/runs/7356847259/job/20027577445?pr=4 ``` 2023-12-27T09:59:02.6362466Z 09:59:02.635 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error in maintenanceThreadPool 2023-12-27T09:59:02.6365616Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist 2023-12-27T09:59:02.6367861Z at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733) 2023-12-27T09:59:02.6369383Z at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177) 2023-12-27T09:59:02.6370693Z at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571) 2023-12-27T09:59:02.6371781Z at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940) 2023-12-27T09:59:02.6372876Z at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936) 2023-12-27T09:59:02.6373967Z at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) 2023-12-27T09:59:02.6375104Z at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942) 2023-12-27T09:59:02.6376676Z 09:59:02.636 WARN org.apache.spark.sql.execution.streaming.state.StateStore: Error running maintenance thread 2023-12-27T09:59:02.6379079Z java.io.FileNotFoundException: File file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1 does not exist 2023-12-27T09:59:02.6381083Z at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733) 2023-12-27T09:59:02.6382490Z at org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177) 2023-12-27T09:59:02.6383816Z at org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571) 2023-12-27T09:59:02.6384875Z at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940) 2023-12-27T09:59:02.6386294Z at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936) 2023-12-27T09:59:02.6387439Z at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) 2023-12-27T09:59:02.6388674Z at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942) ... 2023-12-27T10:01:02.4292831Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- changing schema of state when restarting query - state format version 2 (RocksDBStateStore) *** FAILED *** (2 minutes)�[0m�[0m 2023-12-27T10:01:02.4295311Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m Timed out waiting for stream: The code passed to failAfter did not complete within 120 seconds.�[0m�[0m 2023-12-27T10:01:02.4297271Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m java.base/java.lang.Thread.getStackTrace(Thread.java:1619)�[0m�[0m 2023-12-27T10:01:02.4299084Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)�[0m�[0m 2023-12-27T10:01:02.4300948Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m ... 2023-12-27T10:01:02.6474472Z 10:01:02.646 WARN org.apache.spark.sql.execution.streaming.state.RocksDB StateStoreId(opId=0,partId=0,name=default): Error closing RocksDB 2023-12-27T10:01:02.6482792Z org.apache.spark.SparkException: [CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR] An error occurred during loading state. StateStoreId(opId=0,partId=0,name=default): RocksDB instance could not be acquired by [ThreadId: Some(1858)] as it was not released by [ThreadId: Some(3835), task: partition 0.0 in stage 513.0, TID 1369] after 120009 ms. 2023-12-27T10:01:02.6488483Z Thread holding the lock has trace: app//org.apache.spark.sql.execution.streaming.state.StateStore$.getStateStoreProvider(StateStore.scala:577) 2023-12-27T10:01:02.6490896Z app//org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:565) 2023-12-27T10:01:02.6493072Z app//org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:128) 2023-12-27T10:01:02.6494915Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 2023-12-27T10:01:02.6496232Z app//org.apache.spark.rdd.RDD.iterator(RDD.scala:329) 2023-12-27T10:01:02.6497655Z app//org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 2023-12-27T10:01:02.6499153Z app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) 2023-12-27T10:01:02.6556758Z 10:01:02.654 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 513.0 (TID 1369) (localhost executor driver): TaskKilled (Stage cancelled: [SPARK_JOB_CANCELLED] Job 260 cancelled part of cancelled job group cf26288c-0158-48ce-8a86-00a596dd45d8 SQLSTATE: XXKDA) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` [info] Run completed in 6 minutes, 20 seconds. [info] Total number of tests run: 80 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? Yes Closes apache#44542 from anishshri-db/task/SPARK-46547. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
- Loading branch information