Skip to content

Commit

Permalink
Merge branch 'apache:dev' into fix-flink-2pc-runmode-set
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilinli123 authored Aug 6, 2023
2 parents 0e358ba + d952cea commit de5937d
Show file tree
Hide file tree
Showing 16 changed files with 648 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.sh text eol=lf
200 changes: 200 additions & 0 deletions docs/en/connector-v2/sink/PostgreSql.md

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions docs/en/connector-v2/source/PostgreSQL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# PostgreSQL

> JDBC PostgreSQL Source Connector
## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)

> supports query SQL and can achieve projection effect.
## Description

Read external data source data through JDBC.

## Supported DataSource Info

| Datasource | Supported versions | Driver | Url | Maven |
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | Different dependency version has different driver class. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |

## Database Dependency

> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
> For example PostgreSQL datasource: cp postgresql-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/<br/>
> If you want to manipulate the GEOMETRY type in PostgreSQL, add postgresql-xxx.jar and postgis-jdbc-xxx.jar to $SEATNUNNEL_HOME/plugins/jdbc/lib/
## Data Type Mapping

| PostgreSQL Data type | SeaTunnel Data type |
|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
| BOOL<br/> | BOOLEAN |
| _BOOL<br/> | ARRAY&LT;BOOLEAN&GT; |
| BYTEA<br/> | BYTES |
| _BYTEA<br/> | ARRAY&LT;TINYINT&GT; |
| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/> | INT |
| _INT2<br/>_INT4<br/> | ARRAY&LT;INT&GT; |
| INT8<br/>BIGSERIAL<br/> | BIGINT |
| _INT8<br/> | ARRAY&LT;BIGINT&GT; |
| FLOAT4<br/> | FLOAT |
| _FLOAT4<br/> | ARRAY&LT;FLOAT&GT; |
| FLOAT8<br/> | DOUBLE |
| _FLOAT8<br/> | ARRAY&LT;DOUBLE&GT; |
| NUMERIC(Get the designated column's specified column size>0) | DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point) |
| NUMERIC(Get the designated column's specified column size<0) | DECIMAL(38, 18) |
| BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY | STRING |
| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT | ARRAY&LT;STRING&GT; |
| TIMESTAMP<br/> | TIMESTAMP |
| TIME<br/> | TIME |
| DATE<br/> | DATE |
| OTHER DATA TYPES | NOT SUPPORTED YET |

## Options

| Name | Type | Required | Default | Description |
|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/test |
| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,<br/> if you use PostgreSQL the value is `org.postgresql.Driver`. |
| user | String | No | - | Connection instance user name |
| password | String | No | - | Connection instance password |
| query | String | Yes | - | Query statement |
| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete |
| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column. |
| partition_lower_bound | BigDecimal | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. |
| partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
| fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure<br/> the row fetch size used in the query toimprove performance by<br/> reducing the number database hits required to satisfy the selection criteria.<br/> Zero means use jdbc default value. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |

### Tips

> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
## Task Example

### Simple:

> This example queries type_bin 'table' 16 data in your test "database" in single parallel and queries all of its fields. You can also specify which fields to query for final output to the console.
```
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "root"
password = "test"
query = "select * from source limit 16"
}
}
transform {
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
```

### Parallel:

> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table
```
source{
jdbc{
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "root"
password = "test"
query = "select * from source"
partition_column= "id"
partition_num = 5
}
}
```

### Parallel Boundary:

> It is more efficient to specify the data within the upper and lower bounds of the query It is more efficient to read your data source according to the upper and lower boundaries you configured
```
source{
jdbc{
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "root"
password = "test"
query = "select * from source"
partition_column= "id"
# The name of the table returned
result_table_name = "jdbc"
partition_lower_bound = 1
partition_upper_bound = 50
partition_num = 5
}
}
```

2 changes: 1 addition & 1 deletion docs/en/start-v2/kubernetes/kubernetes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ spec:
- key: seatunnel.streaming.conf
path: seatunnel.streaming.conf
job:
jarURI: local:///opt/seatunnel/starter/seatunnel-flink-starter.jar
jarURI: local:///opt/seatunnel/starter/seatunnel-flink-13-starter.jar
entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
args: ["--config", "/data/seatunnel.streaming.conf"]
parallelism: 2
Expand Down
4 changes: 2 additions & 2 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ seatunnel.source.LocalFile = connector-file-local
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.OssFile = connector-file-oss
seatunnel.sink.OssFile = connector-file-oss
seatunnel.source.OssJindoFile = connector-file-oss-jindo
seatunnel.sink.OssJindoFile = connector-file-oss-jindo
seatunnel.source.OssJindoFile = connector-file-jindo-oss
seatunnel.sink.OssJindoFile = connector-file-jindo-oss
seatunnel.source.CosFile = connector-file-cos
seatunnel.sink.CosFile = connector-file-cos
seatunnel.source.Pulsar = connector-pulsar
Expand Down
38 changes: 0 additions & 38 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@
<awaitility.version>4.2.0</awaitility.version>
<e2e.dependency.skip>true</e2e.dependency.skip>

<!-- Imap storage dependency package -->
<hadoop-aliyun.version>3.0.0</hadoop-aliyun.version>
<json-smart.version>2.4.7</json-smart.version>
<hadoop-aws.version>3.1.4</hadoop-aws.version>
<netty-buffer.version>4.1.60.Final</netty-buffer.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -452,39 +447,6 @@
<scope>provided</scope>
</dependency>

<!-- Imap storage dependency package -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop-aliyun.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
<version>${json-smart.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-buffer.version}</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
- [Docs] Fix markdown syntax (#4426)
- [Docs] Fix Kafka Doc Error Config Key "kafka." (#4427)
- [Docs] Add Transform to Quick Start v2 (#4436)
- [Docs] Fix Dockerfile and seatunnel-flink.yaml in Set Up with Kubernetes (#4788)
- [Docs] Fix Mysql sink format doc (#4800)
- [Docs] Add the generate sink sql parameter for the jdbc sink document (#4797)
- [Docs] Add the generate sink sql parameter And example (#4769)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.doris.config;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;

import com.google.auto.service.AutoService;

@AutoService(Factory.class)
public class DorisSinkFactory implements TableSinkFactory {

public static final String IDENTIFIER = "Doris";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
DorisConfig.FENODES,
DorisConfig.USERNAME,
DorisConfig.PASSWORD,
DorisConfig.SINK_LABEL_PREFIX,
DorisConfig.DORIS_SINK_CONFIG_PREFIX)
.optional(DorisConfig.SINK_ENABLE_2PC, DorisConfig.SINK_ENABLE_DELETE)
.build();
}
}
Loading

0 comments on commit de5937d

Please sign in to comment.