Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Connector] Use Console sink will submit two job in flink cluster #1702

Closed
3 tasks done
Hisoka-X opened this issue Apr 15, 2022 · 4 comments · Fixed by #1710
Closed
3 tasks done

[Bug] [Connector] Use Console sink will submit two job in flink cluster #1702

Hisoka-X opened this issue Apr 15, 2022 · 4 comments · Fixed by #1710
Labels
bug connectors-v1 SeaTunnel connectors, include sink, source

Comments

@Hisoka-X
Copy link
Member

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

When use console sink, and submit job to flink cluster, client will sumbit two job.
image

SeaTunnel Version

dev

SeaTunnel Config

env {
  execution.parallelism = 2
}

source {

  JdbcSource {
    driver = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false"
    username = "root"
    password = "123456"
    query = "SELECT int_type,char_type,bigint_type from test_test where 1=1"
    result_table_name = "user"
    partition_column = "int_type"
    partition_upper_bound = 50
    partition_lower_bound = 0
    parallelism = 4
  }

}

transform {

}
sink {
  ConsoleSink {}
}

Running Command

./bin/start-seatunnel-flink.sh --config ./config/flink.batch.conf

Error Exception

-

Flink or Spark Version

Flink 1.13.6

Java or Scala Version

Java8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Hisoka-X Hisoka-X added the bug label Apr 15, 2022
@ruanwenjun
Copy link
Member

Yes, I also find this, and this will happen in batch job.

@ruanwenjun
Copy link
Member

ruanwenjun commented Apr 16, 2022

@benjfan I find this may be caused by we have added two DataSink in ConsoleSink? https://github.com/apache/incubator-seatunnel/blob/3a4ef4c60d8397fbf7ee7b16a06494072d3f0245/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java#L47-L54

After change to below code, this bug has been fixed.

   @Override
    public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> rowDataSet) {
        return rowDataSet.output(this);
    }

   @SuppressWarnings("RegexpSingleline")
    @Override
    public void writeRecord(Row record) {
        if (limit <= 0) {
            return;
        }
        System.out.println(record.toString());
        limit--;
    }

@Hisoka-X
Copy link
Member Author

@benjfan I find this may be caused by we have added two DataSink in ConsoleSink?

https://github.com/apache/incubator-seatunnel/blob/3a4ef4c60d8397fbf7ee7b16a06494072d3f0245/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java#L47-L54

After change to below code, this bug has been fixed.

   @Override
    public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> rowDataSet) {
        return rowDataSet.output(this);
    }

   @SuppressWarnings("RegexpSingleline")
    @Override
    public void writeRecord(Row record) {
        if (limit <= 0) {
            return;
        }
        System.out.println(record.toString());
        limit--;
    }

This method will make record print on TaskManager, not client side.

@ruanwenjun
Copy link
Member

@benjfan I find this may be caused by we have added two DataSink in ConsoleSink?
https://github.com/apache/incubator-seatunnel/blob/3a4ef4c60d8397fbf7ee7b16a06494072d3f0245/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java#L47-L54

After change to below code, this bug has been fixed.

   @Override
    public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> rowDataSet) {
        return rowDataSet.output(this);
    }

   @SuppressWarnings("RegexpSingleline")
    @Override
    public void writeRecord(Row record) {
        if (limit <= 0) {
            return;
        }
        System.out.println(record.toString());
        limit--;
    }

This method will make record print on TaskManager, not client side.

Yes, you are right, we cannot easily do this change. The main reason is that Flink will do execute in collect method, and this will conflict with our execute.

@ruanwenjun ruanwenjun added the connectors-v1 SeaTunnel connectors, include sink, source label Apr 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug connectors-v1 SeaTunnel connectors, include sink, source
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants