Skip to content

Commit

Permalink
[Feature][Connector-V2][Clickhouse] Add clickhouse.config to the sour…
Browse files Browse the repository at this point in the history
…ce connector (apache#7143)

* The source connector of Clickhouse support a custom clickhouse config

* UPDATE doc

* fmt

---------

Co-authored-by: Dongyeon <loustler@naverz-corp.com>
  • Loading branch information
2 people authored and hawk9821 committed Jul 13, 2024
1 parent b8995e6 commit 100d302
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 33 deletions.
22 changes: 13 additions & 9 deletions docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor

## Source Options

| Name | Type | Required | Default | Description |
|------------------|--------|----------|------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| host | String | Yes | - | `ClickHouse` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as `"host1:8123,host2:8123"` . |
| database | String | Yes | - | The `ClickHouse` database. |
| sql | String | Yes | - | The query sql used to search data though Clickhouse server. |
| username | String | Yes | - | `ClickHouse` user username. |
| password | String | Yes | - | `ClickHouse` user password. |
| server_time_zone | String | No | ZoneId.systemDefault() | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
| Name | Type | Required | Default | Description |
|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| host | String | Yes | - | `ClickHouse` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as `"host1:8123,host2:8123"` . |
| database | String | Yes | - | The `ClickHouse` database. |
| sql | String | Yes | - | The query sql used to search data though Clickhouse server. |
| username | String | Yes | - | `ClickHouse` user username. |
| password | String | Yes | - | `ClickHouse` user password. |
| clickhouse.config | Map | No | - | In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc`. |
| server_time_zone | String | No | ZoneId.systemDefault() | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## How to Create a Clickhouse Data Synchronization Jobs

Expand All @@ -80,6 +81,9 @@ source {
password = "xxxxx"
server_time_zone = "UTC"
result_table_name = "test"
clickhouse.config = {
"socket_timeout": "300000"
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that mongodb saved it as numeric internally (#6997)
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)
- [Connector-v2] [Redis] Using scan replace keys operation command,support batchWrite in single mode(#7030,#7085)
- [Connector-V2] [Clickhouse] Add a new optional configuration `clickhouse.config` to the source connector of ClickHouse (#7143)

### Zeta(ST-Engine)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void prepare(Config config) throws PrepareFailException {
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
null,
null,
null);
} else {
nodes =
Expand All @@ -121,7 +122,8 @@ public void prepare(Config config) throws PrepareFailException {
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
config.getString(PASSWORD.key()),
null);
}

Properties clickhouseProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public void prepare(Config config) throws PrepareFailException {
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
config.getString(PASSWORD.key()),
null);

ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
Map<String, String> tableSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
Expand Down Expand Up @@ -96,13 +98,27 @@ public void prepare(Config config) throws PrepareFailException {
.build();

config = config.withFallback(ConfigFactory.parseMap(defaultConfig));

Map<String, String> customConfig = null;

if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
customConfig =
config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entrySet ->
entrySet.getValue().unwrapped().toString()));
}

servers =
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
config.getString(PASSWORD.key()),
customConfig);

sql = config.getString(SQL.key());
ClickHouseNode currentServer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
Expand All @@ -39,7 +40,10 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME, PASSWORD).build();
return OptionRule.builder()
.required(HOST, DATABASE, SQL, USERNAME, PASSWORD)
.optional(CLICKHOUSE_CONFIG)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;

import com.clickhouse.client.ClickHouseCredentials;
Expand All @@ -25,6 +26,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ClickhouseUtil {
Expand All @@ -34,30 +36,35 @@ public static List<ClickHouseNode> createNodes(
String database,
String serverTimeZone,
String username,
String password) {
String password,
Map<String, String> options) {
return Arrays.stream(nodeAddress.split(","))
.map(
address -> {
String[] nodeAndPort = address.split(":", 2);
if (StringUtils.isEmpty(username) && StringUtils.isEmpty(password)) {
return ClickHouseNode.builder()
.host(nodeAndPort[0])
.port(
ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1]))
.database(database)
.timeZone(serverTimeZone)
.build();
ClickHouseNode.Builder builder =
ClickHouseNode.builder()
.host(nodeAndPort[0])
.port(
ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1]))
.database(database)
.timeZone(serverTimeZone);
if (MapUtils.isNotEmpty(options)) {
for (Map.Entry<String, String> entry : options.entrySet()) {
builder = builder.addOption(entry.getKey(), entry.getValue());
}
}
return ClickHouseNode.builder()
.host(nodeAndPort[0])
.port(ClickHouseProtocol.HTTP, Integer.parseInt(nodeAndPort[1]))
.database(database)
.timeZone(serverTimeZone)
.credentials(
ClickHouseCredentials.fromUserAndPassword(
username, password))
.build();

if (StringUtils.isNotEmpty(username)
&& StringUtils.isNotEmpty(password)) {
builder =
builder.credentials(
ClickHouseCredentials.fromUserAndPassword(
username, password));
}

return builder.build();
})
.collect(Collectors.toList());
}
Expand Down

0 comments on commit 100d302

Please sign in to comment.