-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bug][plugin]Fix: Correct the way to determine the yarn queue in Flink CommandLine and SQL mode #14237
Conversation
run ci |
Codecov Report
@@ Coverage Diff @@
## dev #14237 +/- ##
============================================
+ Coverage 38.39% 38.43% +0.04%
- Complexity 4478 4502 +24
============================================
Files 1229 1235 +6
Lines 42936 43001 +65
Branches 4763 4767 +4
============================================
+ Hits 16485 16528 +43
- Misses 24625 24646 +21
- Partials 1826 1827 +1
... and 4 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { | ||
String queue = flinkParameters.getQueue(); | ||
if (StringUtils.isNotEmpty(queue)) { // -yqu | ||
args.add(FlinkConstants.FLINK_QUEUE); | ||
args.add(queue); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should add if-else statement here to set different arg name for yarn queue according to Flink version. Please remove judgement from L195-L244, in which just build run command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should add if-else statement here to set different arg name for yarn queue according to Flink version. Please remove judgement from L195-L244, in which just build run command.
done, and fix the same wrong in sql mode, pls have a look
@@ -197,16 +196,19 @@ private static List<String> buildRunCommandLineForOthers(TaskExecutionContext ta | |||
args.add(FlinkConstants.FLINK_RUN); // run | |||
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); // -t | |||
args.add(FlinkConstants.FLINK_YARN_PER_JOB); // yarn-per-job | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove redundant blank line.
} else { | ||
args.add(FlinkConstants.FLINK_RUN); // run | ||
args.add(FlinkConstants.FLINK_RUN_MODE); // -m | ||
args.add(FlinkConstants.FLINK_YARN_CLUSTER); // yarn-cluster | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
} | ||
break; | ||
case APPLICATION: | ||
args.add(FlinkConstants.FLINK_RUN_APPLICATION); // run-application | ||
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); // -t | ||
args.add(FlinkConstants.FLINK_YARN_APPLICATION); // yarn-application | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
String others = flinkParameters.getOthers(); | ||
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) { | ||
String queue = flinkParameters.getQueue(); | ||
if (StringUtils.isNotEmpty(queue)) { | ||
initOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue)); | ||
} | ||
String queue = flinkParameters.getQueue(); | ||
if (StringUtils.isNotEmpty(queue)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will check whether user defines this arg by themselves in others
, so you just have to modify condition in the original way: !others.contains(FlinkConstants. FLINK_QUEUE_FOR_TARGETS)
, btw, may I ask why you name them FLINK_QUEUE_FOR_MODE
and FLINK_QUEUE_FOR_TARGETS
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The YARN queue should be assigned by property yarn.application.queue
rather than -yqu option, this option is not available in sql-client.sh. If we want to specify the YARN queue used by a specific Flink SQL task, it would be more appropriate to have an explicit queue option in the task submission form rather than relying on parameters in the 'others' section. Regarding the naming here, it is because in flink-run, the -yqu option only takes effect within the -m option (for mode). When using the -t option (for target), it is necessary to specify it using -Dyarn.application.queue=%s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The execution of a workflow is tied to a specific tenant, and this tenant holds a queue attribute. This attribute is assigned to the processInstance being executed by the tenant. The queue attribute of the executionContext for any taskInstance belonging to this processInstance will also be consistent. Therefore, the queue of a task is ultimately determined by the queue attribute of the runtime tenant if not explicitly specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this logic is missing, I've debugged and find the queue of processInstance
is null. Also in codes, I don't find where it's assigned by tenant's queue. Would like to help check in your local env?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. From my practice, the queue is assigned. This logic should be correctly implemented in the current version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird, do you test on branch dev?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My dolphin env is 3.1.7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested the modification in this PR on branch dev?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will test it tomorrow, in my env I cp this pr to my 3.1.7.
if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE_FOR_TARGETS)) { | ||
String queue = flinkParameters.getQueue(); | ||
if (StringUtils.isNotEmpty(queue)) { // -Dyarn.application.queue=%s | ||
args.add(String.format(FlinkConstants.FLINK_QUEUE_FOR_TARGETS, queue)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some duplicated code. Can we clean up the logic and try to avoid introducing this new method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
Hi, @ORuteMa , any feedback? |
Sorry I am busy these days, I will test it later. |
@caishunfeng @zhongjiajie pls approve run ci, thanks. |
Done. |
I see some fail in this run, it may have no business to do with my pr. Is it a network problem or something else? Pls have a look. |
|
||
private static void determinedYarnQueue(List<String> args, FlinkParameters flinkParameters, | ||
FlinkDeployMode deployMode, String flinkVersion) { | ||
switch (deployMode) { |
Check warning
Code scanning / CodeQL
Missing enum case in switch
restarted the failed E2E test |
SonarCloud Quality Gate failed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, overall do you have addition suggestion? @Radeity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
…k CommandLine and SQL mode (#14237) * Fix: Correct the way to determine the yarn queue in Flink CommandLine * fix the yarn queue in sql mode && refine the code * refine code * remove unnecessary comment * fix yarn queue properties * remove redundant variable
Purpose of the pull request
Correct the way to determine the yarn queue in Flink CommandLine.
closed #14236
Brief change log
In Flink command line with -t option, Yarn queue should be determined by -Dyarn.application.name=%s rather than -yqu.
Verify this pull request
This pull request is code cleanup without any test coverage. I test it manually.