-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update BQIO to a single scheduled executor service reduce threads #23234
Update BQIO to a single scheduled executor service reduce threads #23234
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
run java precommit |
Assigning reviewers. If you would like to opt out of this review, comment R: @kileys for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
Run Java_Examples_Dataflow_Java17 PreCommit |
r: @lukecwik can you take a look at this? I believe we will need to have a separate executor service to preserve backwards compatibility, because if someone provides a non-scheduled executor service then BQIO will fail |
*/ | ||
|
||
return Executors.newScheduledThreadPool( | ||
Math.max(4, Runtime.getRuntime().availableProcessors()), threadFactoryBuilder.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was your reasoning around choosing this as the minimum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dug into the default configuration in BQ. When we set the minimum to 0, threads weren't firing, so I looked to the known working configuration in BQ, and used that
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Show resolved
Hide resolved
*/ | ||
@JsonIgnore | ||
@Description( | ||
"The ScheduledExecutorService instance to use to create threads, can be overridden to specify " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes a lot of sense to not declare this pipeline option in GcsOptions since the original choice of having this in GcsOptions was done a long time ago since GCS needed it but it is used in many places.
Since it is used both during pipeline construction and pipeline execution it makes sense to move it to its own options class in org/apache/beam/sdk/options/ExecutorOptions.java
in sdks/java/core
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, I'll refactor this
...e-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
Show resolved
Hide resolved
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All nits in the current review.
Since we are returning a ScheduledExecutorService now for GcsOptions#getExecutorService, do we not get test failures anymore (e.g. RemoteExecutionTest)?
In a follow-up can you migrate all usages of GcsOptions#getExecutorService to ExecutorOptions#getScheduledExecutorService?
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExecutorOptions.java
Outdated
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
ExecutorService getExecutorService(); | ||
|
||
/** | ||
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService(ScheduledExecutorService)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService(ScheduledExecutorService)} | |
* @deprecated use {@link ExecutorOptions#setScheduledExecutorService} |
I don't think we can do this, because if someone is using GcsOptions#setExecutorService, this change could break them |
This does not do what you want since a |
ah, thats unfortunate, but you're right. I'm not confident in implementing my own scheduler. Looking through some of the guarantees given by the interface definitions, it may be quite challenging. |
I don't think I can accurately implement this in < several weeks |
A number of the guarantees around periodic and delayed execution depend on methods that are package private, so I can't just copy the scheduled executor code either |
Give me a couple of hours to take a deeper look. |
I think I got an implementation for an unbounded ScheduledExecutorService with the properties that we want. |
@johnjcasey Ping me when you have merged your changes with the new |
…educe-gax-threads # Conflicts: # sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All my comments are minor, please fix and merge when addressed.
@@ -287,7 +287,8 @@ def buildAndPushDockerJavaContainer = tasks.register("buildAndPushDockerJavaCont | |||
commandLine "docker", "tag", "${defaultDockerImageName}", "${dockerJavaImageName}" | |||
} | |||
exec { | |||
commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}" | |||
// commandLine "gcloud", "docker", "--", "push", "${dockerJavaImageName}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was commited in error
import java.util.concurrent.TimeUnit; | ||
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; | ||
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; | ||
|
||
public interface ExecutorOptions extends PipelineOptions { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment and @Description
needs to be updated since the core pool size is irrelevant now.
import java.util.concurrent.ScheduledExecutorService; | ||
import org.apache.beam.sdk.util.UnboundedScheduledExecutorService; | ||
|
||
public interface ExecutorOptions extends PipelineOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class comment?
/* The SDK requires an unbounded thread pool because a step may create X writers | ||
* each requiring their own thread to perform the writes otherwise a writer may | ||
* block causing deadlock for the step because the writers buffer is full. | ||
* Also, the MapTaskExecutor launches the steps in reverse order and completes | ||
* them in forward order thus requiring enough threads so that each step's writers | ||
* can be active. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this comment into ScheduledExecutorServiceFactory just above new UnboundedScheduledExecutorService()
...cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
Outdated
Show resolved
Hide resolved
…org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
This PR causes the BigQuery client to hang on shutdown. |
…eads (apache#23234)" This reverts commit 8e2431c.
…duce threads (apache#23234)" (apache#23793)" This reverts commit 01da3fc.
…eads (apache#23234)" (apache#23793) This reverts commit 8e2431c. (cherry picked from commit 01da3fc)
* Merge pull request #23795: Revert 23234: issue #23794 * Revert "Update BQIO to a single scheduled executor service reduce threads (#23234)" (#23793) This reverts commit 8e2431c. (cherry picked from commit 01da3fc) * Merge pull request #23795: Revert 23234: issue #23794 (cherry picked from commit d38f577) Co-authored-by: reuvenlax <relax@google.com>
@reuvenlax We observed the behaviour of hanging threads in our Dataflow jobs during In our pipelines we see continously increasing number of threads with stack traces like these:
We think this is related to using a pretty old version of the dependency |
Shutdown hangs were definitely being caused by that PR. Note that the hang
seen in that PR was shutting down the RPC client, not shutting down the
StreamWriter. This appears to be a different issue.
FYI the current head Beam is depending on 2.24.2.
…On Mon, Nov 7, 2022 at 2:42 AM Thorsten Madlener ***@***.***> wrote:
This PR causes the BigQuery client to hang on shutdown.
@reuvenlax <https://github.com/reuvenlax> We observed the behaviour of
hanging threads in our Dataflow jobs during StreamWriter.close with Beam
Version 1.41.0. So maybe the PR does not cause this problem but might make
it more visible.
In our pipelines we see continously increasing number of threads with
stack traces like these:
"pool-3-thread-61" #1767 prio=5 os_prio=0 cpu=0.22ms elapsed=337556.83s tid=0x00007f2d982459d0 nid=0x6f7 in Object.wait() [0x00007f2d886f1000]
java.lang.Thread.State: WAITING (on object monitor)
at ***@***.***/Native Method)
- waiting on <no object reference available>
at ***@***.***/Thread.java:1304)
- locked <merged>(a java.lang.Thread)
at ***@***.***/Thread.java:1372)
at com.google.cloud.bigquery.storage.v1.StreamWriter.close(StreamWriter.java:369)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.close(BigQueryServicesImpl.java:1339)
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$$Lambda$692/0x00000008015dd4c8.run(Unknown Source)
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords.lambda$runAsyncIgnoreFailure$1(StorageApiWritesShardedRecords.java:138)
at org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$$Lambda$685/0x00000008015d6b58.run(Unknown Source)
at ***@***.***/Executors.java:539)
at ***@***.***/FutureTask.java:264)
at ***@***.***/ThreadPoolExecutor.java:1136)
at ***@***.***/ThreadPoolExecutor.java:635)
at ***@***.***/Thread.java:833)
We think this is related to using a pretty old version of the dependency
com.google.cloud:google-cloud-bigquerystorage:2.12.2. The library has
since made many changes in the StreamWriter class which could fix this
issue. Is there anything that prevents updating a newer released version?
—
Reply to this email directly, view it on GitHub
<#23234 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVNYIU64P4TB2UNE7XTWHDMIHANCNFSM6AAAAAAQMVGVQM>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
fixes #21368
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.