-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6488] Portable Flink runner support for running cross-language … #7709
Conversation
run java precommit |
run java postcommit |
…transforms Multi-language support in DefaultJobBundleFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! This is great because it allows us to run multiple Environment
s of the same type but with different configuration, e.g. Python/Java/Go environments in cross-language pipelines.
A few comments inline.
CC @tweise
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Outdated
Show resolved
Hide resolved
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Outdated
Show resolved
Hide resolved
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Outdated
Show resolved
Hide resolved
@@ -164,7 +164,7 @@ public void createsMultipleEnvironmentOfSingleType() throws Exception { | |||
verify(envFactoryA, Mockito.times(0)).createEnvironment(environmentAA); | |||
|
|||
bundleFactory.forStage(getExecutableStage(environmentAA)); | |||
verify(environmentProviderFactoryA, Mockito.times(1)) | |||
verify(environmentProviderFactoryA, Mockito.times(2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem correct. The factory should only be provided once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your feedback! This PR allows to create multiple ServerInfo
s and their matching EnvironmentFactory
s when two environments have same URNs but different payloads (for example, environment(urn: "docker", payload: "beam/python")
and environment(urn: "docker", payload: "beam/java")
). Is there any reason I didn't know that environments with same URNs should use a single EnvironmentFactory
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense for two Environments with the same URN also to share the same EnvironmentFactory. I don't think it is necessary to store the environment factory with the ServerInfo because there will only every be one factory anyway. We don't allow multiple EnvironmentFactories.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw you changed the code to allow different environment types, e.g. Docker and Process-based environment at the same time. I don't think this is necessary because it is sufficient to have a single EnvironmentFactory (i.e. Docker) to run cross-language pipelines.
However, I don't see why we would restrict to only one environment type. So fine with me!
run java precommit |
run java postcommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your code and addressing the comments. Looks great!
In addition to being able to construct multiple Environment
with the same URN (e.g. Docker Python and Docker Java), you pushed an additional change to allow mixing of environment types, e.g. (Docker Java, Process Python). I think that makes sense for some situations and there is no reason restricting environment types.
If there are no further comments, I'd merge this later today.
...ution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
Outdated
Show resolved
Hide resolved
Thanks, but I believe this still isn't on par with previous behavior and is causing noise in the logs. https://builds.apache.org/job/beam_PreCommit_Portable_Python_Commit/1433/ |
@tweise What do you exactly mean by noise? If you're saying "call already cancelled" exceptions, I guess there are other reasons causing this issue. I found few logs from previous PRs that showing the same error message.
I'll look into it but maybe it's better to create a new ticket. @mxm: Any comment? I know you're the original author of |
Run Portable_Python PreCommit |
1 similar comment
Run Portable_Python PreCommit |
@ihji after repeating It would be great if you could create a new ticket and take a look at eliminating these exceptions (listed below). Thanks! During job termination:
During job submission
The following seems to appear consistently in the PVR tests:
|
Could we simply restore the original order of the close calls? (edit: saw you already did that) IMHO the last two errors above are unrelated. The first one originates originates from the client opening the connection to the JobServer too early. The second one is a problem with the embedded SDK harness which does not clean up GRPC channels correctly. |
I agree that we should track above issues and fix them. |
Multi-language support in
DefaultJobBundleFactory
.Current status:
DefaultJobBundleFactory
reuses a set of grpc servers if a new environment shares the same URN.DefaultJobBundleFactory
initializes a set of grpc servers as instance variables and createsWrappedSdkHarnessClient
from them.Proposed changes:
AutoValue_ServerInfo
to enable thread-safe initialization ofWrappedSdkHarnessClient
.Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)