Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48889][SS] testStream to unload state stores before finishing #47339

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,13 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
case (key, None) => sparkSession.conf.unset(key)
}
sparkSession.streams.removeListener(listener)
// The state store is stopped here to unload all state stores and terminate all maintenance
// threads. It is necessary because the temp directory used by the checkpoint directory
// may be deleted soon after, and the maintenance thread may see unexpected error and
// cause unexpected behavior. Doing it after a test finishes might be too late because
// sometimes the checkpoint directory is under `withTempDir`, and in this case the temp
// directory is deleted before the test finishes.
StateStore.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we leave a code comment for the reason we put this here? We already have StateStore.stop() in afterEach in various test suites, and future reviewer would like to understand why we can't simply put StateStore.stop() in afterEach. (I get that, just wanted to help future reviewers.)

}
}

Expand Down