From f5ba2b3988de000e682aa3e75de8f593e131f3c6 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Tue, 17 Oct 2023 22:48:00 -0700 Subject: [PATCH 1/4] Add documentation for in inner class --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +- .../sql/connect/planner/StreamingForeachBatchHelper.scala | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index fa964c02a253e..299f4f8830ad8 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2927,7 +2927,7 @@ class SparkConnectPlanner( } // This is filled when a foreach batch runner started for Python. - var foreachBatchRunnerCleaner: Option[StreamingForeachBatchHelper.RunnerCleaner] = None + var foreachBatchRunnerCleaner: Option[AutoCloseable] = None if (writeOp.hasForeachBatch) { val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index b8097b235503f..9995c6ff72d06 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -40,7 +40,8 @@ object StreamingForeachBatchHelper extends Logging { type ForeachBatchFnType = (DataFrame, Long) => Unit - case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { + /** An AutoClosable to clean up resources on query termination. Stops Python worker. */ + private case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { override def close(): Unit = { try runner.stop() catch { @@ -98,11 +99,12 @@ object StreamingForeachBatchHelper extends Logging { /** * Starts up Python worker and initializes it with Python function. Returns a foreachBatch * function that sets up the session and Dataframe cache and and interacts with the Python - * worker to execute user's function. + * worker to execute user's function. In addition, it returns an AutoClosable. The caller + * should close it */ def pythonForeachBatchWrapper( pythonFn: SimplePythonFunction, - sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = { + sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = { val port = SparkConnectService.localPort val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}" From e3058a917d83b1e8d589b0509009ae874fdb3fa5 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 18 Oct 2023 11:50:50 -0700 Subject: [PATCH 2/4] Fix build --- .../sql/connect/planner/StreamingForeachBatchHelper.scala | 5 +++-- .../connect/service/SparkConnectSessionHodlerSuite.scala | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index 9995c6ff72d06..c06c17e3c3c70 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -40,8 +40,9 @@ object StreamingForeachBatchHelper extends Logging { type ForeachBatchFnType = (DataFrame, Long) => Unit + // Visible for testing. /** An AutoClosable to clean up resources on query termination. Stops Python worker. */ - private case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { + private[connect] case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { override def close(): Unit = { try runner.stop() catch { @@ -100,7 +101,7 @@ object StreamingForeachBatchHelper extends Logging { * Starts up Python worker and initializes it with Python function. Returns a foreachBatch * function that sets up the session and Dataframe cache and and interacts with the Python * worker to execute user's function. In addition, it returns an AutoClosable. The caller - * should close it + * must ensure it is closed so that worker process and related resources are released. */ def pythonForeachBatchWrapper( pythonFn: SimplePythonFunction, diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala index a6451de8fc27b..2e34923768ff7 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.api.python.SimplePythonFunction import org.apache.spark.sql.IntegratedUDFTestUtils import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.connect.planner.{PythonStreamingQueryListener, StreamingForeachBatchHelper} +import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCleaner import org.apache.spark.sql.test.SharedSparkSession class SparkConnectSessionHolderSuite extends SharedSparkSession { @@ -206,7 +207,10 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { sessionHolder.streamingForeachBatchRunnerCleanerCache .registerCleanerForQuery(query2, cleaner2) - val (runner1, runner2) = (cleaner1.runner, cleaner2.runner) + val (runner1, runner2) = ( + cleaner1.asInstanceOf[RunnerCleaner].runner, + cleaner2.asInstanceOf[RunnerCleaner].runner + ) // assert both python processes are running assert(!runner1.isWorkerStopped().get) From 998429cdbdf74cbabe3c0e83f382d716aac3f659 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 18 Oct 2023 18:08:05 -0700 Subject: [PATCH 3/4] Fix flake in Protobuf test --- .../sql/protobuf/ProtobufCatalystDataConversionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index d3e63a11a66bf..6135cb2d59246 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -137,7 +137,7 @@ class ProtobufCatalystDataConversionSuite while ( data != null && (data.get(0) == defaultValue || - (dt == BinaryType && + (dt.fields(0).dataType == BinaryType && data.get(0).asInstanceOf[Array[Byte]].isEmpty))) data = generator().asInstanceOf[Row] From 085dee3d85e61a201b4bd0a9624f5e3ecb212341 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Wed, 18 Oct 2023 18:10:17 -0700 Subject: [PATCH 4/4] linter fix --- .../sql/connect/planner/StreamingForeachBatchHelper.scala | 4 ++-- .../connect/service/SparkConnectSessionHodlerSuite.scala | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index c06c17e3c3c70..ce75ba3eb5986 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -100,8 +100,8 @@ object StreamingForeachBatchHelper extends Logging { /** * Starts up Python worker and initializes it with Python function. Returns a foreachBatch * function that sets up the session and Dataframe cache and and interacts with the Python - * worker to execute user's function. In addition, it returns an AutoClosable. The caller - * must ensure it is closed so that worker process and related resources are released. + * worker to execute user's function. In addition, it returns an AutoClosable. The caller must + * ensure it is closed so that worker process and related resources are released. */ def pythonForeachBatchWrapper( pythonFn: SimplePythonFunction, diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala index 2e34923768ff7..910c2a2650c6b 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala @@ -207,10 +207,8 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { sessionHolder.streamingForeachBatchRunnerCleanerCache .registerCleanerForQuery(query2, cleaner2) - val (runner1, runner2) = ( - cleaner1.asInstanceOf[RunnerCleaner].runner, - cleaner2.asInstanceOf[RunnerCleaner].runner - ) + val (runner1, runner2) = + (cleaner1.asInstanceOf[RunnerCleaner].runner, cleaner2.asInstanceOf[RunnerCleaner].runner) // assert both python processes are running assert(!runner1.isWorkerStopped().get)