Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44872][CONNECT][FOLLOWUP] Deflake ReattachableExecuteSuite and increase retry buffer #42908

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ object Connect {
"With any value greater than 0, the last sent response will always be buffered.")
.version("3.5.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")
.createWithDefaultString("10m")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the purpose is to Deflake ReattachableExecuteSuite and increase retry buffer, shall we increase this only at ReattachableExecuteSuite instead of touching the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained in the PR description that I think that increasing this default is a genuine improvement that will help make reconnects more robust in case of actual network issues, while not increasing memory pressure in a normal scenario (where the client controls the flow with ReleaseExecute)
Since Spark 3.5 released before that suite was added, making this change now is low risk change at this point before the next release, and it will have good baking-in time before next release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not be changing it in the suite, because that suite is suppose to stress test how the actual client behaves when faced with disconnects. If we changed it in the suite, that would be sweeping under the carpet that I think from the experiments performed now that it was a bit too small for retries robustness.
This is not a major issue, and this in practice only applies in situations when connect faces real intermittent connectivity issues, where before this was implemented, it would just fail.


val CONNECT_EXTENSIONS_RELATION_CLASSES =
buildStaticConf("spark.connect.extensions.relation.classes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.test.SharedSparkSession
* Base class and utilities for a test suite that starts and tests the real SparkConnectService
* with a real SparkConnectClient, communicating over RPC, but both in-process.
*/
class SparkConnectServerTest extends SharedSparkSession {
trait SparkConnectServerTest extends SharedSparkSession {
Copy link
Contributor Author

@juliuszsompolski juliuszsompolski Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having it as a class was making it execute as a suite with no tests, but still doing the beforeAll / afterAll.


// Server port
val serverPort: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.grpc.StatusRuntimeException
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.sql.connect.SparkConnectServerTest
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.service.SparkConnectService
Expand All @@ -32,7 +32,7 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
// Tests assume that this query will result in at least a couple ExecutePlanResponses on the
// stream. If this is no longer the case because of changes in how much is returned in a single
// ExecutePlanResponse, it may need to be adjusted.
val MEDIUM_RESULTS_QUERY = "select * from range(1000000)"
val MEDIUM_RESULTS_QUERY = "select * from range(10000000)"

test("reattach after initial RPC ends") {
withClient { client =>
Expand Down Expand Up @@ -138,13 +138,12 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
val reattachIter = stub.reattachExecute(
buildReattachExecuteRequest(operationId, Some(response.getResponseId)))
assert(reattachIter.hasNext)
reattachIter.next()

// Nevertheless, the original iterator will handle the INVALID_CURSOR.DISCONNECTED error
iter.next()
// iterator changed because it had to reconnect
assert(reattachableIter.innerIterator ne initialInnerIter)
}

// Nevertheless, the original iterator will handle the INVALID_CURSOR.DISCONNECTED error
iter.next()
// iterator changed because it had to reconnect
assert(reattachableIter.innerIterator ne initialInnerIter)
}
}

Expand Down Expand Up @@ -246,19 +245,26 @@ class ReattachableExecuteSuite extends SparkConnectServerTest {
val iter = stub.executePlan(
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId))
var lastSeenResponse: String = null
val serverRetryBuffer = SparkEnv.get.conf
.get(Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
.toLong

iter.hasNext // open iterator
val execution = getExecutionHolder

// after consuming enough from the iterator, server should automatically start releasing
var lastSeenIndex = 0
while (iter.hasNext && execution.responseObserver.releasedUntilIndex == 0) {
var totalSizeSeen = 0
while (iter.hasNext && totalSizeSeen <= 1.1 * serverRetryBuffer) {
val r = iter.next()
lastSeenResponse = r.getResponseId()
totalSizeSeen += r.getSerializedSize
lastSeenIndex += 1
}
assert(iter.hasNext)
assert(execution.responseObserver.releasedUntilIndex > 0)
Eventually.eventually(timeout(eventuallyTimeout)) {
assert(execution.responseObserver.releasedUntilIndex > 0)
}

// Reattach from the beginning is not available.
val reattach = stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
Expand Down