-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44705][PYTHON] Make PythonRunner single-threaded #42385
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already reviewed this actually. LGTM if tests pass. This is a nice fix to have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @utkarsh39 . Could you address, @ueshin and @HyukjinKwon 's review comment?
Merged to master. |
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This PR caused the failure of the Scala 2.13 mima check. #42479 |
I found that this PR may caused some PySpark test cases to fail in the Java 17 daily tests(pyspark-sql and pyspark-connect module):
To verify this , I conducted some local testing using Java 17
The tests in
There are 34 test failures after this one merged. @utkarsh39 Do you have time to fix these test cases? For this, I have created SPARK-44797. Or should we revert this PR to restore the Java 17 daily tests first? @HyukjinKwon @ueshin @dongjoon-hyun |
I will try to get these tests fixed ASAP |
I think #42422 includes the fix. Could you take a look? |
I merged #42422. Let's see the next daily tests. Thanks. |
Thanks ~ |
*/ | ||
@DeveloperApi | ||
@deprecated("Only usage for Python evaluation is now extinct", "3.5.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@utkarsh39 This should be 4.0.0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR to fix it: #42494
https://github.com/apache/spark/actions/runs/5861115482/job/15890643041 Still some failed, can't determine the reason for now, further investigation is needed. |
@LuciferYang The error is different from the previous one that seems to be fixed. |
re-run the failed ones, there are 26 |
The test passed after retrying, thanks for your work ~ @ueshin |
### What changes were proposed in this pull request? #42385 deprecated `ContextAwareIterator` but the deprecation version was incorrectly set to 3.5. This PR fixes it to be 4.0. ### Why are the changes needed? Fix deprecation version. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Not needed. Closes #42494 from utkarsh39/SPARK-44705-fix-deprecation-version. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
val workerFactory = | ||
new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) | ||
val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this change about?
It broke stop()
method below.
cc: @WweiL, @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it breaks the stop() method below. It should be updated to like this:
https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala#L109-L113
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@utkarsh39 we will create a followup ticket to fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @WweiL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks guys.
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? apache#42385 deprecated `ContextAwareIterator` but the deprecation version was incorrectly set to 3.5. This PR fixes it to be 4.0. ### Why are the changes needed? Fix deprecation version. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Not needed. Closes apache#42494 from utkarsh39/SPARK-44705-fix-deprecation-version. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
### What changes were proposed in this pull request? PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded. More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving [race conditions](https://issues.apache.org/jira/browse/SPARK-33277) and [deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., [fix 1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 2](apache#30177), [fix 3](apache@243c321). Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon [monitor threads](https://github.com/apache/spark/blob/a3a32912be04d3760cb34eb4b79d6d481bbec502/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L579) to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. #### Current Execution Model in Spark for Python UDFs For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result. The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output. #### Proposed Fix The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model. In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s [SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html). See the `read()` method in the code below for approximately how this is achieved. ``` case class PythonUDFRunner { private var nextRow: Row = _ private var endOfStream = false private var childHasNext = true private var buffer: ByteBuffer = _ def hasNext(): Boolean = nextRow != null || { if (!endOfStream) { read(buffer) nextRow = deserialize(buffer) hasNext } else { false } } def next(): Row = { if (hasNext) { val outputRow = nextRow nextRow = null outputRow } else { null } } def read(buf: Array[Byte]): Row = { var n = 0 while (n == 0) { // Alternate between reading/writing to the Python worker using async I/O if (pythonWorker.isReadable) { n = pythonWorker.read(buf) } if (pythonWorker.isWritable) { consumeChildPlanAndWriteDataToPythonWorker() } } def consumeChildPlanAndWriteDataToPythonWorker(): Unit = { // Tracks whether the connection to the Python worker can be written to. var socketAcceptsInput = true while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) { if (!buffer.hasRemaining && childHasNext) { // Consume data from the child and buffer it. writeToBuffer(childPlan.next(), buffer) childHasNext = childPlan.hasNext() if (!childHasNext) { // Exhausted child plan’s output. Write a keyword to the Python worker signaling the end of data input. writeToBuffer(endOfStream) } } // Try to write as much buffered data as possible to the Python worker. while (buffer.hasRemaining && socketAcceptsInput) { val n = writeToPythonWorker(buffer) // `writeToPythonWorker()` returns 0 when the socket cannot accept more data right now. socketAcceptsInput = n > 0 } } } } ``` ### Why are the changes needed? This PR makes PythonRunner single-threaded making it easier to reason about and improving code health. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#42385 from utkarsh39/SPARK-44705. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? apache#42385 deprecated `ContextAwareIterator` but the deprecation version was incorrectly set to 3.5. This PR fixes it to be 4.0. ### Why are the changes needed? Fix deprecation version. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Not needed. Closes apache#42494 from utkarsh39/SPARK-44705-fix-deprecation-version. Authored-by: Utkarsh <utkarsh.agarwal@databricks.com> Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
What changes were proposed in this pull request?
PythonRunner, a utility that executes Python UDFs in Spark, uses two threads in a producer-consumer model today. This multi-threading model is problematic and confusing as Spark's execution model within a task is commonly understood to be single-threaded.
More importantly, this departure of a double-threaded execution resulted in a series of customer issues involving race conditions and deadlocks between threads as the code was hard to reason about. There have been multiple attempts to reign in these issues, viz., fix 1, fix 2, fix 3. Moreover, the fixes have made the code base somewhat abstruse by introducing multiple daemon monitor threads to detect deadlocks. This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.
Current Execution Model in Spark for Python UDFs
For queries containing Python UDFs, the main Java task thread spins up a new writer thread to pipe data from the child Spark plan into the Python worker evaluating the UDF. The writer thread runs in a tight loop: evaluates the child Spark plan, and feeds the resulting output to the Python worker. The main task thread simultaneously consumes the Python UDF’s output and evaluates the parent Spark plan to produce the final result.
The I/O to/from the Python worker uses blocking Java Sockets necessitating the use of two threads, one responsible for input to the Python worker and the other for output. Without two threads, it is easy to run into a deadlock. For example, the task can block forever waiting for the output from the Python worker. The output will never arrive until the input is supplied to the Python worker, which is not possible as the task thread is blocked while waiting on output.
Proposed Fix
The proposed fix is to move to the standard single-threaded execution model within a task, i.e., to do away with the writer thread. In addition to mitigating the crashes, the fix reduces the complexity of the existing code by doing away with many safety checks in place to track deadlocks in the double-threaded execution model.
In the new model, the main task thread alternates between consuming/feeding data to the Python worker using asynchronous I/O through Java’s SocketChannel. See the
read()
method in the code below for approximately how this is achieved.Why are the changes needed?
This PR makes PythonRunner single-threaded making it easier to reason about and improving code health.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.