Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation #43413

Closed
wants to merge 2 commits into from

Conversation

anishshri-db
Copy link
Contributor

What changes were proposed in this pull request?

Ensure that store instance is not used after calling commit within output mode streaming aggregation

Why are the changes needed?

Without these changes, we were accessing the store instance to retrieve the iterator even after the commit was called. When commit is called, we release the DB instance lock. So its possible task retries can acquire the instance lock and close the DB instance. So when the original thread tries to access the DB, it might run into a null pointer exception. This change fixes the issue

        org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 492) (ip-10-110-25-116.us-west-2.compute.internal executor driver): java.lang.NullPointerException
	at org.apache.spark.sql.execution.streaming.state.RocksDB.iterator(RocksDB.scala:337)
	at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.iterator(RocksDBStateStoreProvider.scala:79)
	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV1.values(StreamingAggregationStateManager.scala:130)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:543)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)

Does this PR introduce any user-facing change?

No

How was this patch tested?

RocksDBStreamingAggregationSuite

18:12:00.242 WARN org.apache.spark.sql.streaming.RocksDBStateStoreStreamingAggregationSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.RocksDBStateStoreStreamingAggregationSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinP...
[info] Run completed in 5 minutes, 8 seconds.
[info] Total number of tests run: 80
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

StreamingSessionWindowSuite

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.StreamingSessionWindowSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), state-store-maintenance-thread-0 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), state-store-maintenance-thread-1 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoin...
[info] Run completed in 3 minutes, 38 seconds.
[info] Total number of tests run: 48
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 48, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

Was this patch authored or co-authored using generative AI tooling?

No

…mmit within output mode streaming aggregation
@anishshri-db anishshri-db changed the title [SPARK-45582] Ensure that store instance is not used after calling commit within output mode streaming aggregation [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation Oct 18, 2023
@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - PTAL, thx !

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Nice finding!

@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - failure seems same as Kafka one couple days back. Will retry running the action again

@HeartSaVioR
Copy link
Contributor

@anishshri-db We wasn't lucky. Could you push a empty commit to retrigger CI?

@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - I don't think the failure is related to my change

2023-10-18T18:50:15.1625340Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- single StructType(StructField(bytes_type,BinaryType,true)) with seed 399 *** FAILED *** (14 milliseconds)�[0m�[0m
2023-10-18T18:50:15.1647561Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  (Eval check with Java class name) Incorrect evaluation (codegen off): from_protobuf(to_protobuf([[B@3ac95708], org.apache.spark.sql.protobuf.protos.CatalystTypes$BytesMsg, None), org.apache.spark.sql.protobuf.protos.CatalystTypes$BytesMsg, None), actual: [null], expected: [[B@3061d6e0] (ExpressionEvalHelper.scala:254)�[0m�[0m
2023-10-18T18:50:15.1651529Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.scalatest.exceptions.TestFailedException:�[0m�[0m
2023-10-18T18:50:15.1654278Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)�[0m�[0m
2023-10-18T18:50:15.1658438Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)�[0m�[0m
2023-10-18T18:50:15.1661293Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1564)�[0m�[0m
2023-10-18T18:50:15.1663356Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.fail(Assertions.scala:933)�[0m�[0m
2023-10-18T18:50:15.1664990Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.fail$(Assertions.scala:929)�[0m�[0m
2023-10-18T18:50:15.1672419Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1564)�[0m�[0m
2023-10-18T18:50:15.1680257Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper.checkEvaluationWithoutCodegen(ExpressionEvalHelper.scala:254)�[0m�[0m
2023-10-18T18:50:15.1689021Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper.checkEvaluationWithoutCodegen$(ExpressionEvalHelper.scala:244)�[0m�[0m
2023-10-18T18:50:15.1700429Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.protobuf.ProtobufCatalystDataConversionSuite.checkEvaluationWithoutCodegen(ProtobufCatalystDataConversionSuite.scala:34)�[0m�[0m
2023-10-18T18:50:15.1709843Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper.checkEvaluation(ExpressionEvalHelper.scala:87)�[0m�[0m

I ran the tests locally and they seem fine

[info] ProtobufCatalystDataConversionSuite:
[info] Passed: Total 0, Failed 0, Errors 0, Passed 0
[info] No tests to run for sql / Test / testOnly
[info] - single StructType(StructField(int32_type,IntegerType,true)) with seed 305 (916 milliseconds)
[info] - single StructType(StructField(double_type,DoubleType,true)) with seed 710 (145 milliseconds)
[info] - single StructType(StructField(float_type,FloatType,true)) with seed 900 (146 milliseconds)
[info] - single StructType(StructField(bytes_type,BinaryType,true)) with seed 128 (143 milliseconds)
[info] - single StructType(StructField(string_type,StringType,true)) with seed 856 (139 milliseconds)
[info] - Handle unsupported input of message type (369 milliseconds)
[info] - filter push-down to Protobuf deserializer (176 milliseconds)
[info] - ProtobufDeserializer with binary type (3 milliseconds)
[info] - Full names for message using descriptor file (2 milliseconds)
[warn] 22 warnings found
[info] Run completed in 4 seconds, 546 milliseconds.
[info] Total number of tests run: 9
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

So seems like some flakiness on this test

@HeartSaVioR
Copy link
Contributor

cc. @rangadi
https://github.com/anishshri-db/spark/actions/runs/6556849347/job/17830698193
Looks like protobuf test is flaky. Would you mind looking into it? I can file a JIRA ticket if you prefer to. Thanks!

@HeartSaVioR
Copy link
Contributor

I'm going to merge this as the test failure is not related. Thanks! Merging to master.

@rangadi
Copy link
Contributor

rangadi commented Oct 18, 2023

@HeartSaVioR thanks. I hadn't see the flake with Protobuf tests. Will take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants