diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 00000000000..dfdb8b771ce
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1 @@
+*.sh text eol=lf
diff --git a/docs/en/connector-v2/sink/PostgreSql.md b/docs/en/connector-v2/sink/PostgreSql.md
new file mode 100644
index 00000000000..f7d6469b60f
--- /dev/null
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -0,0 +1,200 @@
+# PostgreSql
+
+> JDBC PostgreSql Sink Connector
+
+## Support Those Engines
+
+> Spark
+> Flink
+> SeaTunnel Zeta
+
+## Key Features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+> Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` for the database which is
+> support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
+
+## Description
+
+Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
+semantics (using XA transaction guarantee).
+
+## Supported DataSource Info
+
+| Datasource | Supported Versions | Driver | Url | Maven |
+|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
+| PostgreSQL | Different dependency version has different driver class. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example PostgreSQL datasource: cp postgresql-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+> If you want to manipulate the GEOMETRY type in PostgreSQL, add postgresql-xxx.jar and postgis-jdbc-xxx.jar to $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+| PostgreSQL Data type | SeaTunnel Data type |
+|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL
| BOOLEAN |
+| _BOOL
| ARRAY<BOOLEAN> |
+| BYTEA
| BYTES |
+| _BYTEA
| ARRAY<TINYINT> |
+| INT2
SMALLSERIAL
INT4
SERIAL
| INT |
+| _INT2
_INT4
| ARRAY<INT> |
+| INT8
BIGSERIAL
| BIGINT |
+| _INT8
| ARRAY<BIGINT> |
+| FLOAT4
| FLOAT |
+| _FLOAT4
| ARRAY<FLOAT> |
+| FLOAT8
| DOUBLE |
+| _FLOAT8
| ARRAY<DOUBLE> |
+| NUMERIC(Get the designated column's specified column size>0) | DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point) |
+| NUMERIC(Get the designated column's specified column size<0) | DECIMAL(38, 18) |
+| BPCHAR
CHARACTER
VARCHAR
TEXT
GEOMETRY
GEOGRAPHY | STRING |
+| _BPCHAR
_CHARACTER
_VARCHAR
_TEXT | ARRAY<STRING> |
+| TIMESTAMP
| TIMESTAMP |
+| TIME
| TIME |
+| DATE
| DATE |
+| OTHER DATA TYPES | NOT SUPPORTED YET |
+
+## Options
+
+| Name | Type | Required | Default | Description |
+|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/test |
+| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use PostgreSQL the value is `org.postgresql.Driver`. |
+| user | String | No | - | Connection instance user name |
+| password | String | No | - | Connection instance password |
+| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority |
+| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. |
+| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. |
+| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. |
+| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance |
+| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. |
+| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) |
+| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database |
+| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database |
+| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. |
+| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to. |
+| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, PostgreSQL is `org.postgresql.xa.PGXADataSource`, and
please refer to appendix for other data sources |
+| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures |
+| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics |
+| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default |
+| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target table is test_table will also be 16 rows of data in the table. Before run this job, you need create database test and table test_table in your PostgreSQL. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
+
+```
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ parallelism = 1
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform-v2
+}
+
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = root
+ password = 123456
+ query = "insert into test_table(name,age) values(?,?)"
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
+```
+
+### Generate Sink SQL
+
+> This example not need to write complex sql statements, you can configure the database name table name to automatically generate add statements for you
+
+```
+sink {
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = org.postgresql.Driver
+ user = root
+ password = 123456
+
+ generate_sink_sql = true
+ database = test
+ table = "public.test_table"
+ }
+}
+```
+
+### Exactly-once :
+
+> For accurate write scene we guarantee accurate once
+
+```
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+
+ max_retries = 0
+ user = root
+ password = 123456
+ query = "insert into test_table(name,age) values(?,?)"
+
+ is_exactly_once = "true"
+
+ xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+ }
+}
+```
+
+### CDC(Change Data Capture) Event
+
+> CDC change data is also supported by us In this case, you need config database, table and primary_keys.
+
+```
+sink {
+ jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = root
+ password = 123456
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = test
+ table = sink_table
+ primary_keys = ["id","name"]
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/PostgreSQL.md b/docs/en/connector-v2/source/PostgreSQL.md
new file mode 100644
index 00000000000..3f9e13d2e64
--- /dev/null
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -0,0 +1,158 @@
+# PostgreSQL
+
+> JDBC PostgreSQL Source Connector
+
+## Support Those Engines
+
+> Spark
+> Flink
+> SeaTunnel Zeta
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+
+## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource Info
+
+| Datasource | Supported versions | Driver | Url | Maven |
+|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
+| PostgreSQL | Different dependency version has different driver class. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example PostgreSQL datasource: cp postgresql-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+> If you want to manipulate the GEOMETRY type in PostgreSQL, add postgresql-xxx.jar and postgis-jdbc-xxx.jar to $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+| PostgreSQL Data type | SeaTunnel Data type |
+|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL
| BOOLEAN |
+| _BOOL
| ARRAY<BOOLEAN> |
+| BYTEA
| BYTES |
+| _BYTEA
| ARRAY<TINYINT> |
+| INT2
SMALLSERIAL
INT4
SERIAL
| INT |
+| _INT2
_INT4
| ARRAY<INT> |
+| INT8
BIGSERIAL
| BIGINT |
+| _INT8
| ARRAY<BIGINT> |
+| FLOAT4
| FLOAT |
+| _FLOAT4
| ARRAY<FLOAT> |
+| FLOAT8
| DOUBLE |
+| _FLOAT8
| ARRAY<DOUBLE> |
+| NUMERIC(Get the designated column's specified column size>0) | DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point) |
+| NUMERIC(Get the designated column's specified column size<0) | DECIMAL(38, 18) |
+| BPCHAR
CHARACTER
VARCHAR
TEXT
GEOMETRY
GEOGRAPHY | STRING |
+| _BPCHAR
_CHARACTER
_VARCHAR
_TEXT | ARRAY<STRING> |
+| TIMESTAMP
| TIMESTAMP |
+| TIME
| TIME |
+| DATE
| DATE |
+| OTHER DATA TYPES | NOT SUPPORTED YET |
+
+## Options
+
+| Name | Type | Required | Default | Description |
+|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/test |
+| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use PostgreSQL the value is `org.postgresql.Driver`. |
+| user | String | No | - | Connection instance user name |
+| password | String | No | - | Connection instance password |
+| query | String | Yes | - | Query statement |
+| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete |
+| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column. |
+| partition_lower_bound | BigDecimal | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. |
+| partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
+| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
+| fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. |
+| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+> This example queries type_bin 'table' 16 data in your test "database" in single parallel and queries all of its fields. You can also specify which fields to query for final output to the console.
+
+```
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
+
+source{
+ Jdbc {
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from source limit 16"
+ }
+}
+
+transform {
+ # please go to https://seatunnel.apache.org/docs/transform-v2/sql
+}
+
+sink {
+ Console {}
+}
+```
+
+### Parallel:
+
+> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table
+
+```
+source{
+ jdbc{
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from source"
+ partition_column= "id"
+ partition_num = 5
+ }
+}
+```
+
+### Parallel Boundary:
+
+> It is more efficient to specify the data within the upper and lower bounds of the query It is more efficient to read your data source according to the upper and lower boundaries you configured
+
+```
+source{
+ jdbc{
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = "org.postgresql.Driver"
+ user = "root"
+ password = "test"
+ query = "select * from source"
+ partition_column= "id"
+
+ # The name of the table returned
+ result_table_name = "jdbc"
+ partition_lower_bound = 1
+ partition_upper_bound = 50
+ partition_num = 5
+ }
+}
+```
+
diff --git a/docs/en/start-v2/kubernetes/kubernetes.mdx b/docs/en/start-v2/kubernetes/kubernetes.mdx
index 32555d2410d..6ba479aa4ff 100644
--- a/docs/en/start-v2/kubernetes/kubernetes.mdx
+++ b/docs/en/start-v2/kubernetes/kubernetes.mdx
@@ -203,7 +203,7 @@ spec:
- key: seatunnel.streaming.conf
path: seatunnel.streaming.conf
job:
- jarURI: local:///opt/seatunnel/starter/seatunnel-flink-starter.jar
+ jarURI: local:///opt/seatunnel/starter/seatunnel-flink-13-starter.jar
entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink
args: ["--config", "/data/seatunnel.streaming.conf"]
parallelism: 2
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index a1c4e40fbb0..551da2c7cec 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -47,8 +47,8 @@ seatunnel.source.LocalFile = connector-file-local
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.OssFile = connector-file-oss
seatunnel.sink.OssFile = connector-file-oss
-seatunnel.source.OssJindoFile = connector-file-oss-jindo
-seatunnel.sink.OssJindoFile = connector-file-oss-jindo
+seatunnel.source.OssJindoFile = connector-file-jindo-oss
+seatunnel.sink.OssJindoFile = connector-file-jindo-oss
seatunnel.source.CosFile = connector-file-cos
seatunnel.sink.CosFile = connector-file-cos
seatunnel.source.Pulsar = connector-pulsar
diff --git a/pom.xml b/pom.xml
index 7dce624be31..3d619644952 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,11 +141,6 @@
4.2.0
true
-
- 3.0.0
- 2.4.7
- 3.1.4
- 4.1.60.Final
@@ -452,39 +447,6 @@
provided
-
-
- org.apache.hadoop
- hadoop-aliyun
- ${hadoop-aliyun.version}
- provided
-
-
- net.minidev
- json-smart
-
-
-
-
-
- net.minidev
- json-smart
- ${json-smart.version}
-
-
-
- org.apache.hadoop
- hadoop-aws
- ${hadoop-aws.version}
- provided
-
-
-
- io.netty
- netty-buffer
- ${netty-buffer.version}
-
-
diff --git a/release-note.md b/release-note.md
index b542b35a814..9ade9c61430 100644
--- a/release-note.md
+++ b/release-note.md
@@ -183,6 +183,7 @@
- [Docs] Fix markdown syntax (#4426)
- [Docs] Fix Kafka Doc Error Config Key "kafka." (#4427)
- [Docs] Add Transform to Quick Start v2 (#4436)
+- [Docs] Fix Dockerfile and seatunnel-flink.yaml in Set Up with Kubernetes (#4788)
- [Docs] Fix Mysql sink format doc (#4800)
- [Docs] Add the generate sink sql parameter for the jdbc sink document (#4797)
- [Docs] Add the generate sink sql parameter And example (#4769)
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
new file mode 100644
index 00000000000..1e08da60a95
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.config;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class DorisSinkFactory implements TableSinkFactory {
+
+ public static final String IDENTIFIER = "Doris";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ DorisConfig.FENODES,
+ DorisConfig.USERNAME,
+ DorisConfig.PASSWORD,
+ DorisConfig.SINK_LABEL_PREFIX,
+ DorisConfig.DORIS_SINK_CONFIG_PREFIX)
+ .optional(DorisConfig.SINK_ENABLE_2PC, DorisConfig.SINK_ENABLE_DELETE)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
new file mode 100644
index 00000000000..7c9f08dfb71
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisDataTypeConvertor.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.datatype;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.apache.commons.collections4.MapUtils;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+@AutoService(DataTypeConvertor.class)
+public class DorisDataTypeConvertor implements DataTypeConvertor {
+
+ public static final String NULL = "NULL";
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DATE = "DATE";
+ public static final String DATETIME = "DATETIME";
+ public static final String CHAR = "CHAR";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String BINARY = "BINARY";
+ public static final String VARBINARY = "VARBINARY";
+ public static final String ARRAY = "ARRAY";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
+ public static final String UNION = "UNION";
+ public static final String INTERVAL = "INTERVAL";
+ public static final String TIMESTAMP = "TIMESTAMP";
+ public static final String YEAR = "YEAR";
+ public static final String GEOMETRY = "GEOMETRY";
+ public static final String IP = "IP";
+
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
+ public static final Integer DEFAULT_PRECISION = 10;
+
+ public static final Integer DEFAULT_SCALE = 0;
+
+ @Override
+ public SeaTunnelDataType> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ Map dataTypeProperties;
+ switch (connectorDataType.toUpperCase(Locale.ROOT)) {
+ case DECIMAL:
+ // parse precision and scale
+ int left = connectorDataType.indexOf("(");
+ int right = connectorDataType.indexOf(")");
+ int precision = DEFAULT_PRECISION;
+ int scale = DEFAULT_SCALE;
+ if (left != -1 && right != -1) {
+ String[] precisionAndScale =
+ connectorDataType.substring(left + 1, right).split(",");
+ if (precisionAndScale.length == 2) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ scale = Integer.parseInt(precisionAndScale[1]);
+ } else if (precisionAndScale.length == 1) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ }
+ }
+ dataTypeProperties = ImmutableMap.of(PRECISION, precision, SCALE, scale);
+ break;
+ default:
+ dataTypeProperties = Collections.emptyMap();
+ break;
+ }
+ return toSeaTunnelType(connectorDataType, dataTypeProperties);
+ }
+
+ @Override
+ public SeaTunnelDataType> toSeaTunnelType(
+ String connectorDataType, Map dataTypeProperties)
+ throws DataTypeConvertException {
+ checkNotNull(connectorDataType, "mysqlType can not be null");
+ int precision;
+ int scale;
+ switch (connectorDataType.toUpperCase(Locale.ROOT)) {
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case INT:
+ case YEAR:
+ return BasicType.INT_TYPE;
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case CHAR:
+ case VARCHAR:
+ return BasicType.STRING_TYPE;
+ case BINARY:
+ case VARBINARY:
+ case GEOMETRY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case DECIMAL:
+ precision = MapUtils.getInteger(dataTypeProperties, PRECISION, DEFAULT_PRECISION);
+ scale = MapUtils.getInteger(dataTypeProperties, SCALE, DEFAULT_SCALE);
+ return new DecimalType(precision, scale);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support DORIS type '%s'' yet.", connectorDataType));
+ }
+ }
+
+ @Override
+ public String toConnectorType(
+ SeaTunnelDataType> seaTunnelDataType, Map dataTypeProperties)
+ throws DataTypeConvertException {
+ checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ // todo: verify
+ switch (sqlType) {
+ case ARRAY:
+ return ARRAY;
+ case MAP:
+ case ROW:
+ case STRING:
+ case NULL:
+ return VARCHAR;
+ case BOOLEAN:
+ return BOOLEAN;
+ case TINYINT:
+ return TINYINT;
+ case SMALLINT:
+ return SMALLINT;
+ case INT:
+ return INT;
+ case BIGINT:
+ return BIGINT;
+ case FLOAT:
+ return FLOAT;
+ case DOUBLE:
+ return DOUBLE;
+ case DECIMAL:
+ return DECIMAL;
+ case BYTES:
+ return BINARY;
+ case DATE:
+ return DATE;
+ case TIME:
+ case TIMESTAMP:
+ return TIMESTAMP;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support Doris type '%s'' yet.", sqlType));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return "Doris";
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 5d415d90c18..8f6fe7c8827 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -30,7 +30,7 @@
SeaTunnel : Connectors V2 : Paimon
- 0.4-SNAPSHOT
+ 0.4.0-incubating
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 3a1b5a4177e..de11af1e173 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -178,6 +178,7 @@
io.netty:netty-buffer:jar
io.netty:netty-common:jar
+
${artifact.file.name}
/lib
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 9f28f6fdbb0..bf3169e4c80 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -131,7 +131,7 @@ private Set searchPluginJars() {
private MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
- jobFilePath, idGenerator, jobConfig, commonPluginJars);
+ jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint);
}
private LogicalDagGenerator getLogicalDagGenerator() {
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index d81de1702ed..09bae74f5a2 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -63,12 +63,16 @@
public class JobConfigParser {
private static final ILogger LOGGER = Logger.getLogger(JobConfigParser.class);
private IdGenerator idGenerator;
-
+ private boolean isStartWithSavePoint;
private List commonPluginJars;
- public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull List commonPluginJars) {
+ public JobConfigParser(
+ @NonNull IdGenerator idGenerator,
+ @NonNull List commonPluginJars,
+ boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.commonPluginJars = commonPluginJars;
+ this.isStartWithSavePoint = isStartWithSavePoint;
}
public Tuple2 parseSource(
@@ -190,7 +194,9 @@ public Tuple2 parseTransform(
sink.prepare(config);
sink.setJobContext(jobConfig.getJobContext());
sink.setTypeInfo(rowType);
- handleSaveMode(sink);
+ if (!isStartWithSavePoint) {
+ handleSaveMode(sink);
+ }
final String actionName =
createSinkActionName(0, tuple.getLeft().getPluginName(), getTableName(config));
final SinkAction action =
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 09027a2a248..86c0f3c94f5 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -107,23 +107,27 @@ public class MultipleTableJobConfigParser {
private final ReadonlyConfig envOptions;
private final JobConfigParser fallbackParser;
+ private final boolean isStartWithSavePoint;
public MultipleTableJobConfigParser(
String jobDefineFilePath, IdGenerator idGenerator, JobConfig jobConfig) {
- this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList());
+ this(jobDefineFilePath, idGenerator, jobConfig, Collections.emptyList(), false);
}
public MultipleTableJobConfigParser(
String jobDefineFilePath,
IdGenerator idGenerator,
JobConfig jobConfig,
- List commonPluginJars) {
+ List commonPluginJars,
+ boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.commonPluginJars = commonPluginJars;
+ this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig = ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
- this.fallbackParser = new JobConfigParser(idGenerator, commonPluginJars);
+ this.fallbackParser =
+ new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint);
}
public ImmutablePair, Set> parse() {
@@ -607,7 +611,9 @@ private static T findLast(LinkedHashMap, T> map) {
sink,
factoryUrls,
actionConfig);
- handleSaveMode(sink);
+ if (!isStartWithSavePoint) {
+ handleSaveMode(sink);
+ }
sinkAction.setParallelism(parallelism);
return sinkAction;
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
index c39ddda99c8..a742fe39a0b 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/pom.xml
@@ -30,6 +30,14 @@
imap-storage-file
SeaTunnel : Engine : Storage : IMap Storage Plugins : File
+
+
+ 3.0.0
+ 2.4.7
+ 3.1.4
+ 4.1.60.Final
+
+
org.apache.seatunnel
@@ -64,24 +72,25 @@
awaitility
+
org.apache.hadoop
hadoop-aliyun
-
-
-
- net.minidev
- json-smart
+ ${hadoop-aliyun.version}
+ provided
org.apache.hadoop
hadoop-aws
+ ${hadoop-aws.version}
+ provided
io.netty
netty-buffer
+ ${netty-buffer.version}
provided
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 70bbd1c0df5..11e5c57b8af 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -34,7 +34,4 @@ j2objc-annotations-1.1.jar
jsr305-1.3.9.jar
jsr305-3.0.0.jar
jsr305-3.0.2.jar
-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
-accessors-smart-2.4.7.jar
-asm-9.1.jar
-json-smart-2.4.7.jar
+listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
\ No newline at end of file