Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Connector-V2] Remove Clickhouse Fields Config #3826

Merged
merged 6 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Write data to Clickhouse can also be done using JDBC
| table | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| fields | string | yes | - |
| clickhouse.config | map | no | |
| bulk_size | string | no | 20000 |
| split_mode | string | no | false |
Expand Down Expand Up @@ -59,10 +58,6 @@ The table name

`ClickHouse` user password

### fields [array]

The data field that needs to be output to `ClickHouse` , if not configured, it will be automatically adapted according to the sink table `schema` .

### clickhouse.config [map]

In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc` .
Expand Down Expand Up @@ -188,4 +183,6 @@ sink {
- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)

- [Improve] Remove Clickhouse Fields Config ([3826](https://github.com/apache/incubator-seatunnel/pull/3826))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ public class ClickhouseConfig {
public static final Option<Integer> BULK_SIZE = Options.key("bulk_size").intType()
.defaultValue(20000).withDescription("Bulk size of clickhouse jdbc");

/**
* Clickhouse fields
*/
public static final Option<String> FIELDS = Options.key("fields").stringType()
.noDefaultValue().withDescription("Clickhouse fields");

public static final Option<String> SQL = Options.key("sql").stringType()
.noDefaultValue().withDescription("Clickhouse sql used to query data");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.Setter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand All @@ -34,7 +33,6 @@
public class ReaderOption implements Serializable {

private ShardMetadata shardMetadata;
private List<String> fields;
private String[] primaryKeys;
private boolean allowExperimentalLightweightDelete;
private boolean supportUpsert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
Expand Down Expand Up @@ -51,7 +50,6 @@ public OptionRule optionRule() {
.optional(CLICKHOUSE_CONFIG,
BULK_SIZE,
SPLIT_MODE,
FIELDS,
SHARDING_KEY,
PRIMARY_KEY,
SUPPORT_UPSERT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
Expand All @@ -45,7 +44,6 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
Expand All @@ -63,7 +61,6 @@
import com.google.common.collect.ImmutableMap;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -161,18 +158,6 @@ public void prepare(Config config) throws PrepareFailException {
new Shard(1, 1, nodes.get(0)));
}

List<String> fields = new ArrayList<>();
if (config.hasPath(FIELDS.key())) {
fields.addAll(config.getStringList(FIELDS.key()));
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields.addAll(tableSchema.keySet());
}
proxy.close();

String[] primaryKeys = null;
Expand All @@ -195,7 +180,6 @@ public void prepare(Config config) throws PrepareFailException {
this.option = ReaderOption.builder()
.shardMetadata(metadata)
.properties(clickhouseProperties)
.fields(fields)
.tableEngine(table.getEngine())
.tableSchema(tableSchema)
.bulkSize(config.getInt(BULK_SIZE.key()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
.setRowType(option.getSeaTunnelRowType())
.setPrimaryKeys(option.getPrimaryKeys())
.setClickhouseTableSchema(option.getTableSchema())
.setProjectionFields(option.getFields().toArray(new String[0]))
.setAllowExperimentalLightweightDelete(option.isAllowExperimentalLightweightDelete())
.setSupportUpsert(option.isSupportUpsert())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ public void closeStatements() throws SQLException {
if (!buffer.isEmpty()) {
executeBatch();
}
if (statementExecutor != null) {
statementExecutor.closeStatements();
}
statementExecutor.closeStatements();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class JdbcBatchStatementExecutorBuilder {
private String tableEngine;
private SeaTunnelRowType rowType;
private String[] primaryKeys;
private String[] projectionFields;
private Map<String, String> clickhouseTableSchema;
private boolean supportUpsert;
private boolean allowExperimentalLightweightDelete;
Expand All @@ -53,28 +52,25 @@ private boolean supportMergeTreeEngineExperimentalLightweightDelete() {

private boolean supportReplacingMergeTreeTableUpsert() {
return tableEngine.endsWith(REPLACING_MERGE_TREE_ENGINE_SUFFIX)
&& Objects.equals(primaryKeys, orderByKeys);
&& Arrays.equals(primaryKeys, orderByKeys);
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
}

private String[] getDefaultProjectionFields() {
List<String> fieldNames = Arrays.asList(rowType.getFieldNames());
return clickhouseTableSchema.keySet()
.stream()
.filter(field -> fieldNames.contains(field))
.toArray(value -> new String[0]);
.filter(fieldNames::contains)
.toArray(String[]::new);
}

public JdbcBatchStatementExecutor build() {
Objects.requireNonNull(table);
Objects.requireNonNull(tableEngine);
Objects.requireNonNull(rowType);
Objects.requireNonNull(clickhouseTableSchema);
if (projectionFields == null) {
projectionFields = getDefaultProjectionFields();
}

JdbcRowConverter valueRowConverter = new JdbcRowConverter(
rowType, clickhouseTableSchema, projectionFields);
rowType, clickhouseTableSchema, getDefaultProjectionFields());
if (primaryKeys == null || primaryKeys.length == 0) {
// INSERT: writer all events when primary-keys is empty
return createInsertBufferedExecutor(table, rowType, valueRowConverter);
Expand Down Expand Up @@ -192,8 +188,8 @@ private static JdbcBatchStatementExecutor createSimpleExecutor(String sql,

private static SeaTunnelDataType[] getKeyTypes(int[] pkFields, SeaTunnelRowType rowType) {
return Arrays.stream(pkFields)
.mapToObj((IntFunction<SeaTunnelDataType>) index -> rowType.getFieldType(index))
.toArray(length -> new SeaTunnelDataType[length]);
.mapToObj((IntFunction<SeaTunnelDataType>) rowType::getFieldType)
.toArray(SeaTunnelDataType[]::new);
}

private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkFields) {
Expand All @@ -205,7 +201,7 @@ private static Function<SeaTunnelRow, SeaTunnelRow> createKeyExtractor(int[] pkF
SeaTunnelRow newRow = new SeaTunnelRow(fields);
newRow.setTableId(row.getTableId());
newRow.setRowKind(row.getRowKind());
return row;
return newRow;
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static String quoteIdentifier(String identifier) {
public static String getInsertIntoStatement(String tableName,
String[] fieldNames) {
String columns = Arrays.stream(fieldNames)
.map(fieldName -> quoteIdentifier(fieldName))
.map(SqlUtils::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
.map(fieldName -> "?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
Expand All @@ -48,7 +47,6 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
Expand Down Expand Up @@ -120,18 +118,7 @@ public void prepare(Config config) throws PrepareFailException {
table.getEngine(),
false, // we don't need to set splitMode in clickhouse file mode.
new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
List<String> fields;
if (config.hasPath(FIELDS.key())) {
fields = config.getStringList(FIELDS.key());
// check if the fields exist in schema
for (String field : fields) {
if (!tableSchema.containsKey(field)) {
throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, "Field " + field + " does not exist in table " + config.getString(TABLE.key()));
}
}
} else {
fields = new ArrayList<>(tableSchema.keySet());
}
List<String> fields = new ArrayList<>(tableSchema.keySet());
Map<String, String> nodeUser = config.getObjectList(NODE_PASS.key()).stream()
.collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS),
configObject -> configObject.toConfig().hasPath(USERNAME.key()) ? configObject.toConfig().getString(USERNAME.key()) : "root"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
Expand All @@ -48,6 +47,6 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH)
.optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
.optional(COPY_METHOD, SHARDING_KEY, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,7 @@ private void compareResult() throws SQLException, IOException {
private Boolean compare(String sql) {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
return false;
}
return true;
return !resultSet.next();
} catch (SQLException e) {
throw new RuntimeException("result compare error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,6 @@ sink {
host = "clickhouse:8123"
database = "default"
table = "sink_table"
fields = [
"id",
"c_map",
"c_array_string",
"c_array_short",
"c_array_int",
"c_array_long",
"c_array_float",
"c_array_double",
"c_string",
"c_boolean",
"c_int8",
"c_int16",
"c_int32",
"c_int64",
"c_float32",
"c_float64",
"c_decimal",
"c_date",
"c_datetime",
"c_nullable",
"c_lowcardinality",
"c_nested.int",
"c_nested.double",
"c_nested.string",
"c_int128",
"c_uint128",
"c_int256",
"c_uint256",
"c_point",
"c_ring"
]
username = "default"
password = ""
}
Expand Down