Skip to content

Commit

Permalink
[Improve] [Connector-V2] Remove Clickhouse Fields Config (#3826)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jan 9, 2023
1 parent 8b914de commit 74704c3
Show file tree
Hide file tree
Showing 13 changed files with 15 additions and 100 deletions.
9 changes: 3 additions & 6 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Write data to Clickhouse can also be done using JDBC
| table | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| fields | string | yes | - |
| clickhouse.config | map | no | |
| bulk_size | string | no | 20000 |
| split_mode | string | no | false |
Expand Down Expand Up @@ -59,10 +58,6 @@ The table name

`ClickHouse` user password

### fields [array]

The data field that needs to be output to `ClickHouse` , if not configured, it will be automatically adapted according to the sink table `schema` .

### clickhouse.config [map]

In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc` .
Expand Down Expand Up @@ -188,4 +183,6 @@ sink {
- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)

- [Improve] Remove Clickhouse Fields Config ([3826](https://github.com/apache/incubator-seatunnel/pull/3826))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public class ClickhouseConfig {
public static final Option<Integer> BULK_SIZE = Options.key("bulk_size").intType()
.defaultValue(20000).withDescription("Bulk size of clickhouse jdbc");

/**
* Clickhouse fields
*/
public static final Option<String> FIELDS = Options.key("fields").stringType()
.noDefaultValue().withDescription("Clickhouse fields");

public static final Option<String> SQL = Options.key("sql").stringType()
.noDefaultValue().withDescription("Clickhouse sql used to query data");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.Setter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand All @@ -34,7 +33,6 @@
public class ReaderOption implements Serializable {

private ShardMetadata shardMetadata;
private List<String> fields;
private String[] primaryKeys;
private boolean allowExperimentalLightweightDelete;
private boolean supportUpsert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
Expand Down Expand Up @@ -51,7 +50,6 @@ public OptionRule optionRule() {
.optional(CLICKHOUSE_CONFIG,
BULK_SIZE,
SPLIT_MODE,
FIELDS,
SHARDING_KEY,
PRIMARY_KEY,
SUPPORT_UPSERT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
Expand All @@ -45,7 +44,6 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
Expand All @@ -63,7 +61,6 @@
import com.google.common.collect.ImmutableMap;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -161,18 +158,6 @@ public void prepare(Config config) throws PrepareFailException {
new Shard(1, 1, nodes.get(0)));
}

List<String> fields = new ArrayList<>();
if (config.hasPath(FIELDS.key())) {
fields.addAll(config.getStringList(FIELDS.key()));
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields.addAll(tableSchema.keySet());
}
proxy.close();

String[] primaryKeys = null;
Expand All @@ -195,7 +180,6 @@ public void prepare(Config config) throws PrepareFailException {
this.option = ReaderOption.builder()
.shardMetadata(metadata)
.properties(clickhouseProperties)
.fields(fields)
.tableEngine(table.getEngine())
.tableSchema(tableSchema)
.bulkSize(config.getInt(BULK_SIZE.key()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
.setRowType(option.getSeaTunnelRowType())
.setPrimaryKeys(option.getPrimaryKeys())
.setClickhouseTableSchema(option.getTableSchema())
.setProjectionFields(option.getFields().toArray(new String[0]))
.setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete())
.setSupportUpsert(option.isSupportUpsert())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void closeStatements() throws SQLException {
if (!buffer.isEmpty()) {
executeBatch();
}
if (statementExecutor != null) {
statementExecutor.closeStatements();
}
statementExecutor.closeStatements();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class JdbcBatchStatementExecutorBuilder {
private String tableEngine;
private SeaTunnelRowType rowType;
private String[] primaryKeys;
private String[] projectionFields;
private Map<String, String> clickhouseTableSchema;
private boolean supportUpsert;
private boolean allowExperimentalLightweightDelete;
Expand All @@ -53,28 +52,25 @@ private boolean supportMergeTreeEngineExperimentalLightweightDelete() {

private boolean supportReplacingMergeTreeTableUpsert() {
return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX)
&& Objects.equals(primaryKeys, orderByKeys);
&& Arrays.equals(primaryKeys, orderByKeys);
}

private String[] getDefaultProjectionFields() {
List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
return clickhouseTableSchema.keySet()
.stream()
.filter(field -> fieldNames.contains(field))
.toArray(value -> new String[0]);
.filter(fieldNames::contains)
.toArray(String[]::new);
}

public JdbcBatchStatementExecutor build() {
Objects.requireNonNull(table);
Objects.requireNonNull(tableEngine);
Objects.requireNonNull(rowType);
Objects.requireNonNull(clickhouseTableSchema);
if (projectionFields == null) {
projectionFields = getDefaultProjectionFields();
}

JdbcRowConverter valueRowConverter = new JdbcRowConverter(
rowType, clickhouseTableSchema, projectionFields);
rowType, clickhouseTableSchema, getDefaultProjectionFields());
if (primaryKeys == null || primaryKeys.length == 0) {
// INSERT: writer all events when primary-keys is empty
return createInsertBufferedExecutor(table, rowType, valueRowConverter);
Expand Down Expand Up @@ -192,8 +188,8 @@ private static JdbcBatchStatementExecutor createSimpleExecutor(String sql,

private static SeaTunnelDataType[] getKeyTypes(int[] pkFields, SeaTunnelRowType rowType) {
return Arrays.stream(pkFields)
.mapToObj((IntFunction<SeaTunnelDataType>) index -> rowType.getFieldType(index))
.toArray(length -> new SeaTunnelDataType[length]);
.mapToObj((IntFunction<SeaTunnelDataType>) rowType::getFieldType)
.toArray(SeaTunnelDataType[]::new);
}

private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
Expand All @@ -205,7 +201,7 @@ private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkF
SeaTunnelRow newRow = new SeaTunnelRow(fields);
newRow.setTableId(row.getTableId());
newRow.setRowKind(row.getRowKind());
return row;
return newRow;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static String quoteIdentifier(String identifier) {
public static String getInsertIntoStatement(String tableName,
String[] fieldNames) {
String columns = Arrays.stream(fieldNames)
.map(fieldName -> quoteIdentifier(fieldName))
.map(SqlUtils::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(fieldName -> "?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
Expand All @@ -48,7 +47,6 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
Expand Down Expand Up @@ -120,18 +118,7 @@ public void prepare(Config config) throws PrepareFailException {
table.getEngine(),
false, // we don't need to set splitMode in clickhouse file mode.
new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
List<String> fields;
if (config.hasPath(FIELDS.key())) {
fields = config.getStringList(FIELDS.key());
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields = new ArrayList<>(tableSchema.keySet());
}
List<String> fields = new ArrayList<>(tableSchema.keySet());
Map<String, String> nodeUser = config.getObjectList(NODE_PASS.key()).stream()
.collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS),
configObject -> configObject.toConfig().hasPath(USERNAME.key()) ? configObject.toConfig().getString(USERNAME.key()) : "root"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
Expand All @@ -48,6 +47,6 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH)
.optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
.optional(COPY_METHOD, SHARDING_KEY, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,7 @@ private void compareResult() throws SQLException, IOException {
private Boolean compare(String sql) {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
return false;
}
return true;
return !resultSet.next();
} catch (SQLException e) {
throw new RuntimeException("result compare error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,6 @@ sink {
host = "clickhouse:8123"
database = "default"
table = "sink_table"
fields = [
"id",
"c_map",
"c_array_string",
"c_array_short",
"c_array_int",
"c_array_long",
"c_array_float",
"c_array_double",
"c_string",
"c_boolean",
"c_int8",
"c_int16",
"c_int32",
"c_int64",
"c_float32",
"c_float64",
"c_decimal",
"c_date",
"c_datetime",
"c_nullable",
"c_lowcardinality",
"c_nested.int",
"c_nested.double",
"c_nested.string",
"c_int128",
"c_uint128",
"c_int256",
"c_uint256",
"c_point",
"c_ring"
]
username = "default"
password = ""
}
Expand Down

0 comments on commit 74704c3

Please sign in to comment.