From bb02162ec4bf6af4dd945d2d30cff961520c273e Mon Sep 17 00:00:00 2001 From: Guangdong Liu <804167098@qq.com> Date: Tue, 8 Aug 2023 10:39:44 +0800 Subject: [PATCH] [Improve] [Connector-V2] Remove scheduler in JDBC sink #4736 (#5168) --------- Co-authored-by: gdliu3 --- docs/en/connector-v2/sink/DB2.md | 3 +- docs/en/connector-v2/sink/Jdbc.md | 8 +--- docs/en/connector-v2/sink/Mysql.md | 3 +- docs/en/connector-v2/sink/OceanBase.md | 3 +- docs/en/connector-v2/sink/PostgreSql.md | 3 +- docs/en/connector-v2/sink/Snowflake.md | 3 +- docs/en/connector-v2/sink/Vertica.md | 3 +- .../jdbc/config/JdbcConnectionConfig.java | 13 ------ .../seatunnel/jdbc/config/JdbcOptions.java | 6 --- .../jdbc/internal/JdbcOutputFormat.java | 44 ------------------- .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 2 - 11 files changed, 7 insertions(+), 84 deletions(-) diff --git a/docs/en/connector-v2/sink/DB2.md b/docs/en/connector-v2/sink/DB2.md index 8f5a7285e35d..fc0aaca0943c 100644 --- a/docs/en/connector-v2/sink/DB2.md +++ b/docs/en/connector-v2/sink/DB2.md @@ -65,8 +65,7 @@ semantics (using XA transaction guarantee). | support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | | max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | | is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | | generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | | xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, DB2 is `com.db2.cj.jdbc.Db2XADataSource`, and
please refer to appendix for other data sources | diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 9d68278cf51e..755de8bb9a7c 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -41,7 +41,6 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | connection_check_timeout_sec | Int | No | 30 | | max_retries | Int | No | 0 | | batch_size | Int | No | 1000 | -| batch_interval_ms | Int | No | 1000 | | is_exactly_once | Boolean | No | false | | generate_sink_sql | Boolean | No | false | | xa_data_source_class_name | String | No | - | @@ -107,12 +106,7 @@ The number of retries to submit failed (executeBatch) ### batch_size[int] -For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms` -, the data will be flushed into the database - -### batch_interval_ms[int] - -For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms` +For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval` , the data will be flushed into the database ### is_exactly_once[boolean] diff --git a/docs/en/connector-v2/sink/Mysql.md b/docs/en/connector-v2/sink/Mysql.md index 92254c1b54fa..55c825ed168e 100644 --- a/docs/en/connector-v2/sink/Mysql.md +++ b/docs/en/connector-v2/sink/Mysql.md @@ -67,8 +67,7 @@ semantics (using XA transaction guarantee). | support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | | max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | | is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | | generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | | xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and
please refer to appendix for other data sources | diff --git a/docs/en/connector-v2/sink/OceanBase.md b/docs/en/connector-v2/sink/OceanBase.md index ec87ce3d36d1..3cea0b5e6e6d 100644 --- a/docs/en/connector-v2/sink/OceanBase.md +++ b/docs/en/connector-v2/sink/OceanBase.md @@ -81,8 +81,7 @@ Write data through jdbc. Support Batch mode and Streaming mode, support concurre | support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | | max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | | generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | | max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | | transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | diff --git a/docs/en/connector-v2/sink/PostgreSql.md b/docs/en/connector-v2/sink/PostgreSql.md index f7d6469b60fc..3cb2b82811e6 100644 --- a/docs/en/connector-v2/sink/PostgreSql.md +++ b/docs/en/connector-v2/sink/PostgreSql.md @@ -74,8 +74,7 @@ semantics (using XA transaction guarantee). | support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | | max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | | is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | | generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to. | | xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, PostgreSQL is `org.postgresql.xa.PGXADataSource`, and
please refer to appendix for other data sources | diff --git a/docs/en/connector-v2/sink/Snowflake.md b/docs/en/connector-v2/sink/Snowflake.md index 21bfb175ef7e..1dfff5e09c74 100644 --- a/docs/en/connector-v2/sink/Snowflake.md +++ b/docs/en/connector-v2/sink/Snowflake.md @@ -61,8 +61,7 @@ Write data through jdbc. Support Batch mode and Streaming mode, support concurre | support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | | max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | | max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | | transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | | auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | diff --git a/docs/en/connector-v2/sink/Vertica.md b/docs/en/connector-v2/sink/Vertica.md index 0db8571d55f2..9a6244076828 100644 --- a/docs/en/connector-v2/sink/Vertica.md +++ b/docs/en/connector-v2/sink/Vertica.md @@ -67,8 +67,7 @@ semantics (using XA transaction guarantee). | support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | | max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | -| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | -| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | | is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | | generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | | xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, vertical is `com.vertical.cj.jdbc.VerticalXADataSource`, and
please refer to appendix for other data sources | diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index 6e2147c03c86..555963af2cf7 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -38,7 +38,6 @@ public class JdbcConnectionConfig implements Serializable { public boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue(); public int batchSize = JdbcOptions.BATCH_SIZE.defaultValue(); - public int batchIntervalMs = JdbcOptions.BATCH_INTERVAL_MS.defaultValue(); public String xaDataSourceClassName; @@ -55,7 +54,6 @@ public static JdbcConnectionConfig of(ReadonlyConfig config) { builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES)); builder.connectionCheckTimeoutSeconds(config.get(JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC)); builder.batchSize(config.get(JdbcOptions.BATCH_SIZE)); - builder.batchIntervalMs(config.get(JdbcOptions.BATCH_INTERVAL_MS)); if (config.get(JdbcOptions.IS_EXACTLY_ONCE)) { builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME)); builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS)); @@ -104,10 +102,6 @@ public int getBatchSize() { return batchSize; } - public int getBatchIntervalMs() { - return batchIntervalMs; - } - public String getXaDataSourceClassName() { return xaDataSourceClassName; } @@ -136,7 +130,6 @@ public static final class Builder { private String query; private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue(); private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue(); - private int batchIntervalMs = JdbcOptions.BATCH_INTERVAL_MS.defaultValue(); private String xaDataSourceClassName; private int maxCommitAttempts = JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue(); private int transactionTimeoutSec = JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue(); @@ -193,11 +186,6 @@ public Builder batchSize(int batchSize) { return this; } - public Builder batchIntervalMs(int batchIntervalMs) { - this.batchIntervalMs = batchIntervalMs; - return this; - } - public Builder xaDataSourceClassName(String xaDataSourceClassName) { this.xaDataSourceClassName = xaDataSourceClassName; return this; @@ -216,7 +204,6 @@ public Builder transactionTimeoutSec(int transactionTimeoutSec) { public JdbcConnectionConfig build() { JdbcConnectionConfig jdbcConnectionConfig = new JdbcConnectionConfig(); jdbcConnectionConfig.batchSize = this.batchSize; - jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs; jdbcConnectionConfig.driverName = this.driverName; jdbcConnectionConfig.compatibleMode = this.compatibleMode; jdbcConnectionConfig.maxRetries = this.maxRetries; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index f5d1613c53ef..207995d0b451 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -71,12 +71,6 @@ public interface JdbcOptions { "For queries that return a large number of objects, " + "you can configure the row fetch size used in the query to improve performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value."); - Option BATCH_INTERVAL_MS = - Options.key("batch_interval_ms") - .intType() - .defaultValue(0) - .withDescription("batch interval milliSecond"); - Option IS_EXACTLY_ONCE = Options.key("is_exactly_once") .booleanType() diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java index d47814f15314..a7d791252213 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java @@ -34,11 +34,6 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.SQLException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkNotNull; @@ -58,9 +53,6 @@ public class JdbcOutputFormat> implem private transient E jdbcStatementExecutor; private transient int batchCount = 0; private transient volatile boolean closed = false; - - private transient ScheduledExecutorService scheduler; - private transient ScheduledFuture scheduledFuture; private transient volatile Exception flushException; public JdbcOutputFormat( @@ -83,37 +75,6 @@ public void open() throws IOException { e); } jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory); - - if (jdbcConnectionConfig.getBatchIntervalMs() != 0 - && jdbcConnectionConfig.getBatchSize() != 1) { - this.scheduler = - Executors.newScheduledThreadPool( - 1, - runnable -> { - AtomicInteger cnt = new AtomicInteger(0); - Thread thread = new Thread(runnable); - thread.setDaemon(true); - thread.setName( - "jdbc-upsert-output-format" + "-" + cnt.incrementAndGet()); - return thread; - }); - this.scheduledFuture = - this.scheduler.scheduleWithFixedDelay( - () -> { - synchronized (JdbcOutputFormat.this) { - if (!closed) { - try { - flush(); - } catch (Exception e) { - flushException = e; - } - } - } - }, - jdbcConnectionConfig.getBatchIntervalMs(), - jdbcConnectionConfig.getBatchIntervalMs(), - TimeUnit.MILLISECONDS); - } } private E createAndOpenStatementExecutor(StatementExecutorFactory statementExecutorFactory) { @@ -209,11 +170,6 @@ public synchronized void close() { if (!closed) { closed = true; - if (this.scheduledFuture != null) { - scheduledFuture.cancel(false); - this.scheduler.shutdown(); - } - if (batchCount > 0) { try { flush(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index a26628ff3a43..8209533f9d55 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -45,7 +45,6 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT; -import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_SIZE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC; @@ -166,7 +165,6 @@ public OptionRule optionRule() { PASSWORD, CONNECTION_CHECK_TIMEOUT_SEC, BATCH_SIZE, - BATCH_INTERVAL_MS, IS_EXACTLY_ONCE, GENERATE_SINK_SQL, AUTO_COMMIT,