diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java index 3da634fb8d6..dba81476359 100644 --- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java +++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java @@ -72,13 +72,15 @@ public void start(List sources, List tran sink.outputBatch(flinkEnvironment, dataSet); } - try { - LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan()); - JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName()); - LOGGER.info(execute.toString()); - } catch (Exception e) { - LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName()); - throw e; + if (whetherExecute(sinks)) { + try { + LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan()); + JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName()); + LOGGER.info(execute.toString()); + } catch (Exception e) { + LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName()); + throw e; + } } } @@ -116,4 +118,7 @@ public Config getConfig() { return config; } + private boolean whetherExecute(List sinks) { + return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName())); + } } diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java index 27318143b89..a7394c12728 100644 --- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java +++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java @@ -48,7 +48,6 @@ public void outputBatch(FlinkEnvironment env, DataSet rowDataSet) { } catch (Exception e) { LOGGER.error("Failed to print result! ", e); } - rowDataSet.output(this); } @Override