From f7e7e2d6dcc85da03916ccb5f262a2c249f0bb59 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 29 Dec 2022 15:49:59 +0800 Subject: [PATCH 1/5] [Improve] [Connector-V2] Remove Clickhouse Fields Config --- docs/en/connector-v2/sink/Clickhouse.md | 5 ----- .../clickhouse/config/ClickhouseConfig.java | 6 ------ .../clickhouse/config/ReaderOption.java | 2 -- .../clickhouse/sink/client/ClickhouseSink.java | 16 ---------------- .../sink/client/ClickhouseSinkWriter.java | 1 - .../executor/BufferedBatchStatementExecutor.java | 4 +--- .../JdbcBatchStatementExecutorBuilder.java | 16 ++++++---------- .../sink/client/executor/SqlUtils.java | 2 +- 8 files changed, 8 insertions(+), 44 deletions(-) diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md index 90bec5fc9c1..5f36bf62325 100644 --- a/docs/en/connector-v2/sink/Clickhouse.md +++ b/docs/en/connector-v2/sink/Clickhouse.md @@ -30,7 +30,6 @@ Write data to Clickhouse can also be done using JDBC | table | string | yes | - | | username | string | yes | - | | password | string | yes | - | -| fields | string | yes | - | | clickhouse.* | string | no | | | bulk_size | string | no | 20000 | | split_mode | string | no | false | @@ -60,10 +59,6 @@ The table name `ClickHouse` user password -### fields [array] - -The data field that needs to be output to `ClickHouse` , if not configured, it will be automatically adapted according to the sink table `schema` . - ### clickhouse [string] In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc` . diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index 9f93eb36ce7..ae6642a2518 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -33,12 +33,6 @@ public class ClickhouseConfig { public static final Option BULK_SIZE = Options.key("bulk_size").intType() .defaultValue(20000).withDescription("Bulk size of clickhouse jdbc"); - /** - * Clickhouse fields - */ - public static final Option FIELDS = Options.key("fields").stringType() - .noDefaultValue().withDescription("Clickhouse fields"); - public static final Option SQL = Options.key("sql").stringType() .noDefaultValue().withDescription("Clickhouse sql used to query data"); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java index 59f71174153..ac8a80f063e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java @@ -25,7 +25,6 @@ import lombok.Setter; import java.io.Serializable; -import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,7 +33,6 @@ public class ReaderOption implements Serializable { private ShardMetadata shardMetadata; - private List fields; private String[] primaryKeys; private boolean allowExperimentalLightweightDelete; private boolean supportUpsert; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java index 61149daad7e..5f8458f724b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java @@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY; @@ -46,7 +45,6 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; @@ -64,7 +62,6 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -164,18 +161,6 @@ public void prepare(Config config) throws PrepareFailException { new Shard(1, 1, nodes.get(0))); } - List fields = new ArrayList<>(); - if (config.hasPath(FIELDS.key())) { - fields.addAll(config.getStringList(FIELDS.key())); - // check if the fields exist in schema - for (String field : fields) { - if (!tableSchema.containsKey(field)) { - throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key())); - } - } - } else { - fields.addAll(tableSchema.keySet()); - } proxy.close(); String[] primaryKeys = null; @@ -198,7 +183,6 @@ public void prepare(Config config) throws PrepareFailException { this.option = ReaderOption.builder() .shardMetadata(metadata) .properties(clickhouseProperties) - .fields(fields) .tableEngine(table.getEngine()) .tableSchema(tableSchema) .bulkSize(config.getInt(BULK_SIZE.key())) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java index deb01545333..f1f7fa5315a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java @@ -135,7 +135,6 @@ private Map initStatementMap() { .setRowType(option.getSeaTunnelRowType()) .setPrimaryKeys(option.getPrimaryKeys()) .setClickhouseTableSchema(option.getTableSchema()) - .setProjectionFields(option.getFields().toArray(new String[0])) .setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete()) .setSupportUpsert(option.isSupportUpsert()) .build(); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java index 7b5a4d24920..2f10565c384 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/BufferedBatchStatementExecutor.java @@ -63,8 +63,6 @@ public void closeStatements() throws SQLException { if (!buffer.isEmpty()) { executeBatch(); } - if (statementExecutor != null) { - statementExecutor.closeStatements(); - } + statementExecutor.closeStatements(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java index cf25088410d..2932c3ebd59 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java @@ -40,7 +40,6 @@ public class JdbcBatchStatementExecutorBuilder { private String tableEngine; private SeaTunnelRowType rowType; private String[] primaryKeys; - private String[] projectionFields; private Map clickhouseTableSchema; private boolean supportUpsert; private boolean allowExperimentalLightweightDelete; @@ -53,14 +52,14 @@ private boolean supportMergeTreeEngineExperimentalLightweightDelete() { private boolean supportReplacingMergeTreeTableUpsert() { return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX) - && Objects.equals(primaryKeys, orderByKeys); + && Arrays.equals(primaryKeys, orderByKeys); } private String[] getDefaultProjectionFields() { List fieldNames = Arrays.asList(rowType.getFieldNames()); return clickhouseTableSchema.keySet() .stream() - .filter(field -> fieldNames.contains(field)) + .filter(fieldNames::contains) .toArray(value -> new String[0]); } @@ -69,12 +68,9 @@ public JdbcBatchStatementExecutor build() { Objects.requireNonNull(tableEngine); Objects.requireNonNull(rowType); Objects.requireNonNull(clickhouseTableSchema); - if (projectionFields == null) { - projectionFields = getDefaultProjectionFields(); - } JdbcRowConverter valueRowConverter = new JdbcRowConverter( - rowType, clickhouseTableSchema, projectionFields); + rowType, clickhouseTableSchema, getDefaultProjectionFields()); if (primaryKeys == null || primaryKeys.length == 0) { // INSERT: writer all events when primary-keys is empty return createInsertBufferedExecutor(table, rowType, valueRowConverter); @@ -192,8 +188,8 @@ private static JdbcBatchStatementExecutor createSimpleExecutor(String sql, private static SeaTunnelDataType[] getKeyTypes(int[] pkFields, SeaTunnelRowType rowType) { return Arrays.stream(pkFields) - .mapToObj((IntFunction) index -> rowType.getFieldType(index)) - .toArray(length -> new SeaTunnelDataType[length]); + .mapToObj((IntFunction) rowType::getFieldType) + .toArray(SeaTunnelDataType[]::new); } private static Function createKeyExtractor(int[] pkFields) { @@ -205,7 +201,7 @@ private static Function createKeyExtractor(int[] pkF SeaTunnelRow newRow = new SeaTunnelRow(fields); newRow.setTableId(row.getTableId()); newRow.setRowKind(row.getRowKind()); - return row; + return newRow; }; } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java index 6582a174792..b0be465d463 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/SqlUtils.java @@ -30,7 +30,7 @@ public static String quoteIdentifier(String identifier) { public static String getInsertIntoStatement(String tableName, String[] fieldNames) { String columns = Arrays.stream(fieldNames) - .map(fieldName -> quoteIdentifier(fieldName)) + .map(SqlUtils::quoteIdentifier) .collect(Collectors.joining(", ")); String placeholders = Arrays.stream(fieldNames) .map(fieldName -> "?") From 4047b486b2fcaab16d8b69d94aa73c97ac4b32d7 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 29 Dec 2022 15:56:15 +0800 Subject: [PATCH 2/5] [Improve] [Connector-V2] Remove Clickhouse Fields Config --- docs/en/connector-v2/sink/Clickhouse.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md index 5f36bf62325..581210acb45 100644 --- a/docs/en/connector-v2/sink/Clickhouse.md +++ b/docs/en/connector-v2/sink/Clickhouse.md @@ -183,4 +183,6 @@ sink { - [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141)) -- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653)) \ No newline at end of file +- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653)) + +- [Improve] Remove Clickhouse Fields Config ([3826](https://github.com/apache/incubator-seatunnel/pull/3826)) \ No newline at end of file From 23b171cdbea1b2253110ae163aec008538352ba0 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 3 Jan 2023 10:50:13 +0800 Subject: [PATCH 3/5] [Improve] [Connector-V2] Remove Clickhouse Fields Config --- .../clickhouse/sink/ClickhouseSinkFactory.java | 2 -- .../clickhouse/sink/file/ClickhouseFileSink.java | 15 +-------------- .../sink/file/ClickhouseFileSinkFactory.java | 3 +-- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java index 0e343a7d834..fe685765096 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java @@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY; @@ -51,7 +50,6 @@ public OptionRule optionRule() { .optional(CLICKHOUSE_CONFIG, BULK_SIZE, SPLIT_MODE, - FIELDS, SHARDING_KEY, PRIMARY_KEY, SUPPORT_UPSERT, diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index 1da90545019..192ad68156c 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; @@ -48,7 +47,6 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; @@ -120,18 +118,7 @@ public void prepare(Config config) throws PrepareFailException { table.getEngine(), false, // we don't need to set splitMode in clickhouse file mode. new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), config.getString(PASSWORD.key())); - List fields; - if (config.hasPath(FIELDS.key())) { - fields = config.getStringList(FIELDS.key()); - // check if the fields exist in schema - for (String field : fields) { - if (!tableSchema.containsKey(field)) { - throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key())); - } - } - } else { - fields = new ArrayList<>(tableSchema.keySet()); - } + List fields = new ArrayList<>(tableSchema.keySet()); Map nodeUser = config.getObjectList(NODE_PASS.key()).stream() .collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS), configObject -> configObject.toConfig().hasPath(USERNAME.key()) ? configObject.toConfig().getString(USERNAME.key()) : "root")); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java index 829d8e005cf..fc5b2ecca84 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java @@ -21,7 +21,6 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; @@ -48,6 +47,6 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH) - .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build(); + .optional(COPY_METHOD, SHARDING_KEY, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build(); } } From 1b8e41769e81eb363cd2d787b378832cc4825f88 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 3 Jan 2023 14:56:35 +0800 Subject: [PATCH 4/5] [Improve] [Connector-V2] Remove Clickhouse Fields Config --- .../JdbcBatchStatementExecutorBuilder.java | 5 ++- .../seatunnel/clickhouse/ClickhouseIT.java | 5 +-- .../resources/clickhouse_to_clickhouse.conf | 32 ------------------- 3 files changed, 3 insertions(+), 39 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java index 2932c3ebd59..e8fb8025954 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java @@ -57,10 +57,9 @@ private boolean supportReplacingMergeTreeTableUpsert() { private String[] getDefaultProjectionFields() { List fieldNames = Arrays.asList(rowType.getFieldNames()); - return clickhouseTableSchema.keySet() + return (String[]) clickhouseTableSchema.keySet() .stream() - .filter(fieldNames::contains) - .toArray(value -> new String[0]); + .filter(fieldNames::contains).toArray(); } public JdbcBatchStatementExecutor build() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 05ca6d35990..c4baa787117 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -368,10 +368,7 @@ private void compareResult() throws SQLException, IOException { private Boolean compare(String sql) { try (Statement statement = connection.createStatement()) { ResultSet resultSet = statement.executeQuery(sql); - while (resultSet.next()) { - return false; - } - return true; + return !resultSet.next(); } catch (SQLException e) { throw new RuntimeException("result compare error", e); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf index 41ecae56963..1137c7c9375 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_clickhouse.conf @@ -43,38 +43,6 @@ sink { host = "clickhouse:8123" database = "default" table = "sink_table" - fields = [ - "id", - "c_map", - "c_array_string", - "c_array_short", - "c_array_int", - "c_array_long", - "c_array_float", - "c_array_double", - "c_string", - "c_boolean", - "c_int8", - "c_int16", - "c_int32", - "c_int64", - "c_float32", - "c_float64", - "c_decimal", - "c_date", - "c_datetime", - "c_nullable", - "c_lowcardinality", - "c_nested.int", - "c_nested.double", - "c_nested.string", - "c_int128", - "c_uint128", - "c_int256", - "c_uint256", - "c_point", - "c_ring" - ] username = "default" password = "" } From 81daf51373f0d31881f814a2feef43253841e644 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 3 Jan 2023 15:05:48 +0800 Subject: [PATCH 5/5] [Improve] [Connector-V2] Remove Clickhouse Fields Config --- .../client/executor/JdbcBatchStatementExecutorBuilder.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java index e8fb8025954..a13681d3a3e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/executor/JdbcBatchStatementExecutorBuilder.java @@ -57,9 +57,10 @@ private boolean supportReplacingMergeTreeTableUpsert() { private String[] getDefaultProjectionFields() { List fieldNames = Arrays.asList(rowType.getFieldNames()); - return (String[]) clickhouseTableSchema.keySet() + return clickhouseTableSchema.keySet() .stream() - .filter(fieldNames::contains).toArray(); + .filter(fieldNames::contains) + .toArray(String[]::new); } public JdbcBatchStatementExecutor build() {