Skip to content

Commit

Permalink
[SPARK-44421][CONNECT][FOLLOWUP] Minor comment improvements
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Improve some comments about iterator retries.

### Why are the changes needed?

Improve comments based on followup questions.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No code changes, only comment changes.

Closes apache#42281 from juliuszsompolski/SPARK-44624-comment-only.

Lead-authored-by: Juliusz Sompolski <julek@databricks.com>
Co-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and ragnarok56 committed Mar 2, 2024
1 parent 294e04d commit c72645c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ import org.apache.spark.internal.Logging
* ExecutePlanResponse on the iterator to return a new iterator from server that continues after
* that.
*
* Since in reattachable execute the server does buffer some responses in case the client needs to
* backtrack
* In reattachable execute the server does buffer some responses in case the client needs to
* backtrack. To let server release this buffer sooner, this iterator asynchronously sends
* ReleaseExecute RPCs that instruct the server to release responses that it already processed.
*
* Note: If the initial ExecutePlan did not even reach the server and execution didn't start, the
* ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, failing the whole
* operation.
*/
class ExecutePlanResponseReattachableIterator(
request: proto.ExecutePlanRequest,
Expand Down Expand Up @@ -86,6 +91,8 @@ class ExecutePlanResponseReattachableIterator(
private var responseComplete: Boolean = false

// Initial iterator comes from ExecutePlan request.
// Note: This is not retried, because no error would ever be thrown here, and GRPC will only
// throw error on first iterator.hasNext() or iterator.next()
private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
rawBlockingStub.executePlan(initialRequest)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private[client] class GrpcRetryHandler(private val retryPolicy: GrpcRetryHandler
extends StreamObserver[U] {

private var opened = false // only retries on first call

// Note: This is not retried, because no error would ever be thrown here, and GRPC will only
// throw error on first iterator.hasNext() or iterator.next()
private var streamObserver = call(request)

override def onNext(v: U): Unit = {
Expand Down

0 comments on commit c72645c

Please sign in to comment.