Skip to content

Commit

Permalink
[Bug][Connector] Fixed ConsoleSink submit job twice. (#1710)
Browse files Browse the repository at this point in the history
* fix ConsoleSink submit job twice
  • Loading branch information
Hisoka-X authored and ruanwenjun committed Apr 18, 2022
1 parent 34090bf commit ec62ef0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ public void start(List<FlinkBatchSource> sources, List<FlinkBatchTransform> tran
sink.outputBatch(flinkEnvironment, dataSet);
}

try {
LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
LOGGER.info(execute.toString());
} catch (Exception e) {
LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
throw e;
if (whetherExecute(sinks)) {
try {
LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
LOGGER.info(execute.toString());
} catch (Exception e) {
LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
throw e;
}
}
}

Expand Down Expand Up @@ -116,4 +118,7 @@ public Config getConfig() {
return config;
}

private boolean whetherExecute(List<FlinkBatchSink> sinks) {
return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public void outputBatch(FlinkEnvironment env, DataSet<Row> rowDataSet) {
} catch (Exception e) {
LOGGER.error("Failed to print result! ", e);
}
rowDataSet.output(this);
}

@Override
Expand Down

0 comments on commit ec62ef0

Please sign in to comment.