From 01d8854c9648083b54b77ce38fc8ac586161f5d3 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Thu, 29 Jun 2023 17:58:11 +0800 Subject: [PATCH] [Feature][Connector-V2][Clickhouse] clickhouse writes with checkpoints --- docs/en/connector-v2/sink/Clickhouse.md | 2 +- .../sink/client/ClickhouseSinkWriter.java | 39 +++++++++++-------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md index 05d03330c70..7c4bab991ba 100644 --- a/docs/en/connector-v2/sink/Clickhouse.md +++ b/docs/en/connector-v2/sink/Clickhouse.md @@ -58,7 +58,7 @@ In addition to the above mandatory parameters that must be specified by `clickho ### bulk_size [number] -The number of rows written through [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the `default is 20000` . +The number of rows written through [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the `default is 20000`, if checkpoints are enabled, writing will also occur at the times when the checkpoints are satisfied . ### split_mode [boolean] diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java index 443eec921aa..235279b4d5a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java @@ -90,6 +90,7 @@ public void write(SeaTunnelRow element) throws IOException { @Override public Optional prepareCommit() throws IOException { + flush(); return Optional.empty(); } @@ -99,23 +100,7 @@ public void abortPrepare() {} @Override public void close() throws IOException { this.proxy.close(); - for (ClickhouseBatchStatement batchStatement : statementMap.values()) { - try (ClickHouseConnectionImpl needClosedConnection = - batchStatement.getClickHouseConnection(); - JdbcBatchStatementExecutor needClosedStatement = - batchStatement.getJdbcBatchStatementExecutor()) { - IntHolder intHolder = batchStatement.getIntHolder(); - if (intHolder.getValue() > 0) { - flush(needClosedStatement); - intHolder.setValue(0); - } - } catch (SQLException e) { - throw new ClickhouseConnectorException( - CommonErrorCode.SQL_OPERATION_FAILED, - "Failed to close prepared statement.", - e); - } - } + flush(); } private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor clickHouseStatement) { @@ -138,6 +123,26 @@ private void flush(JdbcBatchStatementExecutor clickHouseStatement) { } } + private void flush() { + for (ClickhouseBatchStatement batchStatement : statementMap.values()) { + try (ClickHouseConnectionImpl needClosedConnection = + batchStatement.getClickHouseConnection(); + JdbcBatchStatementExecutor needClosedStatement = + batchStatement.getJdbcBatchStatementExecutor()) { + IntHolder intHolder = batchStatement.getIntHolder(); + if (intHolder.getValue() > 0) { + flush(needClosedStatement); + intHolder.setValue(0); + } + } catch (SQLException e) { + throw new ClickhouseConnectorException( + CommonErrorCode.SQL_OPERATION_FAILED, + "Failed to close prepared statement.", + e); + } + } + } + private Map initStatementMap() { Map result = new HashMap<>(Common.COLLECTION_SIZE); shardRouter