Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper #43424

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2927,7 +2927,7 @@ class SparkConnectPlanner(
}

// This is filled when a foreach batch runner started for Python.
var foreachBatchRunnerCleaner: Option[StreamingForeachBatchHelper.RunnerCleaner] = None
var foreachBatchRunnerCleaner: Option[AutoCloseable] = None

if (writeOp.hasForeachBatch) {
val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ object StreamingForeachBatchHelper extends Logging {

type ForeachBatchFnType = (DataFrame, Long) => Unit

case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable {
// Visible for testing.
/** An AutoClosable to clean up resources on query termination. Stops Python worker. */
private[connect] case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable {
override def close(): Unit = {
try runner.stop()
catch {
Expand Down Expand Up @@ -98,11 +100,12 @@ object StreamingForeachBatchHelper extends Logging {
/**
* Starts up Python worker and initializes it with Python function. Returns a foreachBatch
* function that sets up the session and Dataframe cache and and interacts with the Python
* worker to execute user's function.
* worker to execute user's function. In addition, it returns an AutoClosable. The caller must
* ensure it is closed so that worker process and related resources are released.
*/
def pythonForeachBatchWrapper(
pythonFn: SimplePythonFunction,
sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = {
sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = {

val port = SparkConnectService.localPort
val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.api.python.SimplePythonFunction
import org.apache.spark.sql.IntegratedUDFTestUtils
import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.planner.{PythonStreamingQueryListener, StreamingForeachBatchHelper}
import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCleaner
import org.apache.spark.sql.test.SharedSparkSession

class SparkConnectSessionHolderSuite extends SharedSparkSession {
Expand Down Expand Up @@ -206,7 +207,8 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
sessionHolder.streamingForeachBatchRunnerCleanerCache
.registerCleanerForQuery(query2, cleaner2)

val (runner1, runner2) = (cleaner1.runner, cleaner2.runner)
val (runner1, runner2) =
(cleaner1.asInstanceOf[RunnerCleaner].runner, cleaner2.asInstanceOf[RunnerCleaner].runner)

// assert both python processes are running
assert(!runner1.isWorkerStopped().get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ProtobufCatalystDataConversionSuite
while (
data != null &&
(data.get(0) == defaultValue ||
(dt == BinaryType &&
(dt.fields(0).dataType == BinaryType &&
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes flake reported in #43413 (comment)
The type check was incorrect. So when the random value was empty array, we didn't skip it. Original intention is to skip default values for types. Verified with manually setting the seed to 399 (at line 128).
cc: @HeartSaVioR

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK that's actually failing consistently for specific value but the test itself is using random seed hence flaky...

data.get(0).asInstanceOf[Array[Byte]].isEmpty)))
data = generator().asInstanceOf[Row]

Expand Down