Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43952][CORE][CONNECT][SQL] Add SparkContext APIs for query cancellation by tag #41440

Closed
wants to merge 28 commits into from

Conversation

juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Jun 2, 2023

What changes were proposed in this pull request?

Currently, the only way to cancel running Spark Jobs is by using SparkContext.cancelJobGroup, using a job group name that was previously set using SparkContext.setJobGroup. This is problematic if multiple different parts of the system want to do cancellation, and set their own ids.

For example, BroadcastExchangeExec sets it's own job group, which may override job group set by user. This way, if user cancels the job group they set in the "parent" execution, it will not cancel these broadcast jobs launches from within their jobs. It would also be useful in e.g. Spark Connect to be able to cancel jobs without overriding jobGroupId, which may be used and needed for other purposes.

As a solution, consider add API to set tags on jobs, and to cancel jobs using tags:

  • SparkContext.addJobTag(tag: String): Unit
  • SparkContext.removeJobTag(tag: String): Unit
  • SparkContext.getJobTags(): Set[String]
  • SparkContext.clearJobTags(): Unit
  • SparkContext.cancelJobsWithTag(tag: String): Unit
  • DAGScheduler.cancelJobsWithTag(tag: String): Unit

Also added SparkContext.setInterruptOnCancel(interruptOnCancel: Boolean): Unit, which previously could only be set in setJobGroup.

The tags are also added to JobData and AppStatusTracker. A new API is added to SparkStatusTracker:

  • SparkStatusTracker.getJobIdsForTag(jobTag: String): Array[Int]

Use the new API internally in BroadcastExchangeExec instead of cancellation using job group, to fix the issue with these not being cancelled by user-set jobgroupid. Now, the user set jobgroupid should propagate into broadcast execution.

Also, switch cancellation in Spark Connect to use tag instead of jobgroup.

Why are the changes needed?

Currently, there may be multiple places that want to cancel a set of jobs, with different scopes.

Does this PR introduce any user-facing change?

The APIs described above are added.

How was this patch tested?

Added test to JobCancellationSuite.

@juliuszsompolski
Copy link
Contributor Author

@hvanhovell @cloud-fan

@juliuszsompolski
Copy link
Contributor Author

If we don't want to add public APIs like that, I'm also fine with having all that as private[spark]; my planned use of it is inside Spark in Spark Connect.

@HyukjinKwon HyukjinKwon changed the title [SPARK-43952] Add SparkContext APIs for query cancellation by tag [SPARK-43952][CORE][CONNECT] Add SparkContext APIs for query cancellation by tag Jun 4, 2023
@HyukjinKwon
Copy link
Member

If we don't want to add public APIs like that, I'm also fine with having all that as private[spark]; my planned use of it is inside Spark in Spark Connect.

I am fine with adding them as a public API

@juliuszsompolski juliuszsompolski changed the title [SPARK-43952][CORE][CONNECT] Add SparkContext APIs for query cancellation by tag [SPARK-43952][CORE][CONNECT][SQL] Add SparkContext APIs for query cancellation by tag Jun 5, 2023
@juliuszsompolski
Copy link
Contributor Author

cc @gengliangwang - I also added the job tags to AppStatusTracker.

@juliuszsompolski
Copy link
Contributor Author

@gengliangwang it does complain about the incompatible constructor to JobData in MIMA check:

[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.12:3.4.0! Found 1 potential problems (filtered 3979)
[error]  * method this(Int,java.lang.String,scala.Option,scala.Option,scala.Option,scala.collection.Seq,scala.Option,org.apache.spark.JobExecutionStatus,Int,Int,Int,Int,Int,Int,Int,Int,Int,Int,Int,scala.collection.immutable.Map)Unit in class org.apache.spark.status.api.v1.JobData does not have a correspondent in current version
[error]    filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this")
[error] java.lang.RuntimeException: Failed binary compatibility check against org.apache.spark:spark-core_2.12:3.4.0! Found 1 potential problems (filtered 3979)
[error] 	at scala.sys.package$.error(package.scala:30)
[error] 	at com.typesafe.tools.mima.plugin.SbtMima$.reportModuleErrors(SbtMima.scala:89)
[error] 	at com.typesafe.tools.mima.plugin.MimaPlugin$.$anonfun$projectSettings$2(MimaPlugin.scala:36)
[error] 	at com.typesafe.tools.mima.plugin.MimaPlugin$.$anonfun$projectSettings$2$adapted(MimaPlugin.scala:26)
[error] 	at scala.collection.Iterator.foreach(Iterator.scala:943)
[error] 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
[error] 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[error] 	at com.typesafe.tools.mima.plugin.MimaPlugin$.$anonfun$projectSettings$1(MimaPlugin.scala:26)
[error] 	at com.typesafe.tools.mima.plugin.MimaPlugin$.$anonfun$projectSettings$1$adapted(MimaPlugin.scala:25)
[error] 	at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error] 	at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error] 	at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error] 	at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error] 	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error] 	at sbt.Execute.work(Execute.scala:291)
[error] 	at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error] 	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error] 	at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[error] 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[error] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[error] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] 	at java.lang.Thread.run(Thread.java:750)
[error] (core / mimaReportBinaryIssues) Failed binary compatibility check against org.apache.spark:spark-core_2.12:3.4.0! Found 1 potential problems (filtered 3979)

is this a part of API that needs a compatible constructor to be added, or is it something to filter and ignore?

@github-actions github-actions bot added the BUILD label Jun 7, 2023
@juliuszsompolski
Copy link
Contributor Author

Resolved problem after merge.
@gengliangwang @rednaxelafx could you let me know what additional testing of SHS than what described in #41440 would be needed?

@juliuszsompolski
Copy link
Contributor Author

@gengliangwang @rednaxelafx I updated the SHS tests canons. I claim it's fine because it's and additive change that should be backwards compatible (older versions ignoring this field) and forwards compatible (new versions reading old data with this missing will get empty)

@gengliangwang
Copy link
Member

LGTM on the UI-related changes.

@juliuszsompolski
Copy link
Contributor Author

https://github.com/juliuszsompolski/apache-spark/actions/runs/5278491450/jobs/9547955928

======================================================================
FAIL [6.087s]: test_read_images (pyspark.ml.tests.test_image.ImageFileFormatTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/__w/apache-spark/apache-spark/python/pyspark/ml/tests/test_image.py", line 34, in test_read_images
    self.assertEqual(df.count(), 4)
AssertionError: 0 != 4

----------------------------------------------------------------------
Ran 1 test in 10.116s

FAILED (failures=1)

flaky again, retrigger again

@juliuszsompolski
Copy link
Contributor Author

https://github.com/juliuszsompolski/apache-spark/actions/runs/5280624891/jobs/9553060736
Flake: SparkR tests failed to initialize containers.

@HyukjinKwon @hvanhovell I have restarted CI on this PR 4 times, and each time I got a failed run due to a different flake. Do I need to keep restarting until I get a clean run?

@HyukjinKwon
Copy link
Member

It should be deflake now.

*
* @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
* being called on the job's executor threads. This is useful to help ensure that the tasks
* are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone should check if this is still a thing :)

Copy link
Contributor Author

@juliuszsompolski juliuszsompolski Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HDFS-1208 bug is still open... but multiple places in core of Spark has by now elected to just pass true here, so it likely doesn't make sense for the user to set it to false, as these places would generate interrupts anyway... But removing it completely would be orthogonal to this PR.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@jaceklaskowski jaceklaskowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just "nitting" 😉

@juliuszsompolski
Copy link
Contributor Author

https://github.com/juliuszsompolski/apache-spark/actions/runs/5321069534/jobs/9636800137

2023-06-20T12:23:27.6854898Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- compositeReadLimit *** FAILED *** (2 seconds, 830 milliseconds)�[0m�[0m
2023-06-20T12:23:27.6926259Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  == Results ==�[0m�[0m
2023-06-20T12:23:27.6927160Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  !== Correct Answer - 52 ==   == Spark Answer - 39 ==�[0m�[0m
2023-06-20T12:23:27.6931587Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   struct<value:int>           struct<value:int>�[0m�[0m
2023-06-20T12:23:27.6939281Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [100]                       [100]�[0m�[0m
2023-06-20T12:23:27.6948081Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [101]                       [101]�[0m�[0m
2023-06-20T12:23:27.6986974Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [102]                       [102]�[0m�[0m
2023-06-20T12:23:27.6987587Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [103]                       [103]�[0m�[0m
2023-06-20T12:23:27.7024490Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [104]                       [104]�[0m�[0m
2023-06-20T12:23:27.7024971Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [105]                       [105]�[0m�[0m
2023-06-20T12:23:27.7027078Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [106]                       [106]�[0m�[0m
2023-06-20T12:23:27.7032152Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [107]                       [107]�[0m�[0m
2023-06-20T12:23:27.7038067Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [108]                       [108]�[0m�[0m
2023-06-20T12:23:27.7043610Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [109]                       [109]�[0m�[0m
2023-06-20T12:23:27.7049026Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [10]                        [10]�[0m�[0m
2023-06-20T12:23:27.7054505Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [110]                       [110]�[0m�[0m
2023-06-20T12:23:27.7060152Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [111]                       [111]�[0m�[0m
2023-06-20T12:23:27.7065593Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [112]                       [112]�[0m�[0m
2023-06-20T12:23:27.7070921Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [113]                       [113]�[0m�[0m
2023-06-20T12:23:27.7076073Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [114]                       [114]�[0m�[0m
2023-06-20T12:23:27.7081871Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [115]                       [115]�[0m�[0m
2023-06-20T12:23:27.7092317Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [116]                       [116]�[0m�[0m
2023-06-20T12:23:27.7098149Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [117]                       [117]�[0m�[0m
2023-06-20T12:23:27.7103556Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [118]                       [118]�[0m�[0m
2023-06-20T12:23:27.7108938Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [119]                       [119]�[0m�[0m
2023-06-20T12:23:27.7114333Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [11]                        [11]�[0m�[0m
2023-06-20T12:23:27.7119937Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [120]                       [120]�[0m�[0m
2023-06-20T12:23:27.7125889Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m   [121]                       [121]�[0m�[0m
2023-06-20T12:23:27.7131643Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![122]                       [12]�[0m�[0m
2023-06-20T12:23:27.7137652Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![123]                       [13]�[0m�[0m
2023-06-20T12:23:27.7143399Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![124]                       [14]�[0m�[0m
2023-06-20T12:23:27.7149201Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![125]                       [15]�[0m�[0m
2023-06-20T12:23:27.7155263Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![126]                       [16]�[0m�[0m
2023-06-20T12:23:27.7167073Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![127]                       [17]�[0m�[0m
2023-06-20T12:23:27.7176374Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![128]                       [18]�[0m�[0m
2023-06-20T12:23:27.7191241Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![12]                        [19]�[0m�[0m
2023-06-20T12:23:27.7200348Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![13]                        [1]�[0m�[0m
2023-06-20T12:23:27.7207912Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![14]                        [20]�[0m�[0m
2023-06-20T12:23:27.7217239Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![15]                        [21]�[0m�[0m
2023-06-20T12:23:27.7231895Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![16]                        [22]�[0m�[0m
2023-06-20T12:23:27.7261733Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![17]                        [23]�[0m�[0m
2023-06-20T12:23:27.7268062Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![18]                        [24]�[0m�[0m
2023-06-20T12:23:27.7273907Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![19]                        [2]�[0m�[0m
2023-06-20T12:23:27.7320971Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![1]                         �[0m�[0m
2023-06-20T12:23:27.7333031Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![20]                        �[0m�[0m
2023-06-20T12:23:27.7376440Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![21]                        �[0m�[0m
2023-06-20T12:23:27.7403408Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![22]                        �[0m�[0m
2023-06-20T12:23:27.7403887Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![23]                        �[0m�[0m
2023-06-20T12:23:27.7404287Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![24]                        �[0m�[0m
2023-06-20T12:23:27.7407961Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![25]                        �[0m�[0m
2023-06-20T12:23:27.7411327Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![26]                        �[0m�[0m
2023-06-20T12:23:27.7415603Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![27]                        �[0m�[0m
2023-06-20T12:23:27.7446756Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![28]                        �[0m�[0m
2023-06-20T12:23:27.7449990Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![29]                        �[0m�[0m
2023-06-20T12:23:27.7471816Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![2]                         �[0m�[0m
2023-06-20T12:23:27.7491805Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  ![30]                        �[0m�[0m
...
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceWithAdminSuite

Unrelated failure.

@juliuszsompolski
Copy link
Contributor Author

@juliuszsompolski
Copy link
Contributor Author

I guess https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/builds/47350391 has just issues at the moment.

Start-FileDownloadInternal : Error downloading remote file: One or more errors occurred.
Inner Exception: Remote server returned 404: Not Found

@HyukjinKwon
Copy link
Member

AppVeyor is fine.

Merged to master

HyukjinKwon pushed a commit that referenced this pull request Jul 3, 2023
### What changes were proposed in this pull request?

Add APIs from #41440 to SparkR:
* addJobTag(tag)
* removeJobTag(tag)
* getJobTags()
* clearJobTags()
* cancelJobsWithTag()
* setInterruptOnCancel(tag)

Additionally:
* fix a bug in removeJobTag when the last tag is removed (should be left with empty tags, not an empty string tag)
* fix comments to cancelJobsWithTag
* add a few defensive reinforcements against an empty string tag as a result of missing property / removing last tag.

### Why are the changes needed?

SparkR parity.

### Does this PR introduce _any_ user-facing change?

Yes, introduce the APIs introduced in Scala in #41440 to SparkR

### How was this patch tested?

Added test.

Closes #41742 from juliuszsompolski/SPARK-44195.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
interruptOnCancel = true)
// Setup a job tag here so later it may get cancelled by tag if necessary.
sparkContext.addJobTag(jobTag)
sparkContext.setInterruptOnCancel(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ulysses-you FYI. Now we can cancel the jobs in boradcast like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad to see we support cancel job with tag!

@shuwang21
Copy link
Contributor

Hi @juliuszsompolski, I got a local build failure related to the master branch (5d840eb). For example, when I try to run ClientDistributedCacheManagerSuite. The error indicates associated with the following two lines. I found nothing related to addJobTags and getJobTagsList. Do you have any suggestions? Thank you.

value addJobTags is not a member of org.apache.spark.status.protobuf.StoreTypes.JobData.Builder
    jobData.jobTags.foreach(jobDataBuilder.addJobTags)
value getJobTagsList is not a member of org.apache.spark.status.protobuf.StoreTypes.JobData
      jobTags = info.getJobTagsList.asScala,

@juliuszsompolski
Copy link
Contributor Author

@shuwang21 these functions come from code generated by https://github.com/apache/spark/pull/41440/files#diff-28490b011ea75c4442d8ac95a3d8599abaffcd050672ed3416d7d259a22057c7R50
If it does not get generated, maybe something is wrong with the protobuf compiler in your dev env?

@shuwang21
Copy link
Contributor

I see. Thank you. @juliuszsompolski Solved. Need to recompile the project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants