Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Docs][Connector-V2][StarRocks]Reconstruct the StarRocks connector document #5132

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 119 additions & 98 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,44 @@

> StarRocks sink connector

## Description
## Support These Engines

Used to send data to StarRocks. Both support streaming and batch mode.
The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key features
## Key Features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|---------|----------|-----------------|
| nodeUrls | list | yes | - |
| base-url | string | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | no | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| enable_upsert_delete | boolean | no | false |
| save_mode_create_template | string | no | see below |
| starrocks.config | map | no | - |

### nodeUrls [list]

`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]`

### base-url [string]

The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db`

### username [string]

`StarRocks` user username

### password [string]

`StarRocks` user password

### database [string]

The name of StarRocks database

### table [string]

The name of StarRocks table, If not set, the table name will be the name of the upstream table

### labelPrefix [string]

The prefix of StarRocks stream load label

### batch_max_rows [long]

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks

### batch_max_bytes [int]

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks

### batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks

### max_retries [int]

The number of retries to flush failed

### retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

### max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to `StarRocks`

### enable_upsert_delete [boolean]
## Description

Whether to enable upsert/delete, only supports PrimaryKey model.
Used to send data to StarRocks. Both support streaming and batch mode.
The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.

### save_mode_create_template [string]
## Sink Options

| Name | Type | Required | Default | Description |
|------------------------------|---------|----------|-----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` |
| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` |
| username | string | yes | - | `StarRocks` user username |
| password | string | yes | - | `StarRocks` user password |
| database | string | yes | - | The name of StarRocks database |
| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table |
| labelPrefix | string | no | - | The prefix of StarRocks stream load label |
EricJoy2048 marked this conversation as resolved.
Show resolved Hide resolved
| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks |
| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks |
| batch_interval_ms | int | no | - | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks |
| max_retries | int | no | - | The number of retries to flush failed |
| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff |
| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` |
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |

### save_mode_create_template

We use templates to automatically create starrocks tables,
which will create corresponding table creation statements based on the type of upstream data and schema type,
Expand Down Expand Up @@ -131,19 +81,72 @@ You can use the following placeholders
description of StarRocks
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)

### starrocks.config [map]

The parameter of the stream load `data_desc`
## Data Type Mapping

| StarRocks Data type | SeaTunnel Data type |
|---------------------|---------------------|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| DATE | STRING |
| TIME | STRING |
| DATETIME | STRING |
| STRING | STRING |
| ARRAY | STRING |
| MAP | STRING |
| BYTES | STRING |

#### Supported import data formats

The supported formats include CSV and JSON. Default value: JSON
The supported formats include CSV and JSON

## Example
## Task Example

Use JSON format to import data
### Simple:

> The following example describes writing multiple data types to StarRocks, and users need to create corresponding tables downstream

```hocon
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
Expand All @@ -158,12 +161,29 @@ sink {
}
}
}

```

Use CSV format to import data
### Support write cdc changelog event(INSERT/UPDATE/DELETE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an example for save_mode_create_template option?


```hocon
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...

// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
}
}
```

### Use JSON format to import data

```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
Expand All @@ -173,28 +193,30 @@ sink {
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
format = "JSON"
strip_outer_array = true
}
}
}

```

Support write cdc changelog event(INSERT/UPDATE/DELETE)
### Use CSV format to import data

```hocon
```
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...

// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
```
Expand All @@ -206,4 +228,3 @@ sink {
- Add StarRocks Sink Connector
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865)