Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector][JDBC]Support Redshift sink and source #3615

Merged
merged 8 commits into from
Dec 6, 2022
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ there are some reference value for params above.
| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 |
| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |

## Example

Expand Down Expand Up @@ -207,3 +208,4 @@ sink {
- [Feature] Support Sqlite JDBC Sink ([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
- [Feature] Support Doris JDBC Sink
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ there are some reference value for params above.
| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc |
| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |

## Example

Expand Down Expand Up @@ -164,3 +165,4 @@ parallel:
- [Feature] Support Teradata JDBC Source ([3362](https://github.com/apache/incubator-seatunnel/pull/3362))
- [Feature] Support JDBC Fetch Size Config ([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
- [Feature] Support Doris JDBC Source
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<sqlite.version>3.39.3.0</sqlite.version>
<tablestore.version>5.13.9</tablestore.version>
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.9</redshift.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -107,6 +108,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
<version>${redshift.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -160,6 +167,10 @@
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
</dependency>
<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.util.Optional;

public class RedshiftDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Redshift";
}

@Override
public JdbcRowConverter getRowConverter() {
return new RedshiftJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new RedshiftTypeMapper();
}

@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

@AutoService(JdbcDialectFactory.class)
public class RedshiftDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:redshift:");
}

@Override
public JdbcDialect create() {
return new RedshiftDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

public class RedshiftJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "Redshift";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

@Slf4j
public class RedshiftTypeMapper implements JdbcDialectTypeMapper {

/* ============================ data types ===================== */
private static final String REDSHIFT_SMALLINT = "SMALLINT";
private static final String REDSHIFT_INT2 = "INT2";
private static final String REDSHIFT_INTEGER = "INTEGER";
private static final String REDSHIFT_INT = "INT";
private static final String REDSHIFT_INT4 = "INT4";
private static final String REDSHIFT_BIGINT = "BIGINT";
private static final String REDSHIFT_INT8 = "INT8";

private static final String REDSHIFT_DECIMAL = "DECIMAL";
private static final String REDSHIFT_NUMERIC = "NUMERIC";
private static final String REDSHIFT_REAL = "REAL";
private static final String REDSHIFT_FLOAT4 = "FLOAT4";
private static final String REDSHIFT_DOUBLE_PRECISION = "DOUBLE PRECISION";
private static final String REDSHIFT_FLOAT8 = "FLOAT8";
private static final String REDSHIFT_FLOAT = "FLOAT";

private static final String REDSHIFT_BOOLEAN = "BOOLEAN";
private static final String REDSHIFT_BOOL = "BOOL";

private static final String REDSHIFT_CHAR = "CHAR";
private static final String REDSHIFT_CHARACTER = "CHARACTER";
private static final String REDSHIFT_NCHAR = "NCHAR";
private static final String REDSHIFT_BPCHAR = "BPCHAR";

private static final String REDSHIFT_VARCHAR = "VARCHAR";
private static final String REDSHIFT_CHARACTER_VARYING = "CHARACTER VARYING";
private static final String REDSHIFT_NVARCHAR = "NVARCHAR";
private static final String REDSHIFT_TEXT = "TEXT";

private static final String REDSHIFT_DATE = "DATE";
/*FIXME*/

private static final String REDSHIFT_GEOMETRY = "GEOMETRY";
private static final String REDSHIFT_OID = "OID";
private static final String REDSHIFT_SUPER = "SUPER";

private static final String REDSHIFT_TIME = "TIME";
private static final String REDSHIFT_TIME_WITH_TIME_ZONE = "TIME WITH TIME ZONE";

private static final String REDSHIFT_TIMETZ = "TIMETZ";
private static final String REDSHIFT_TIMESTAMP = "TIMESTAMP";
private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE = "TIMESTAMP WITHOUT TIME ZONE";

private static final String REDSHIFT_TIMESTAMPTZ = "TIMESTAMPTZ";

@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String redshiftType = metadata.getColumnTypeName(colIndex).toUpperCase();
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (redshiftType) {
case REDSHIFT_SMALLINT:
case REDSHIFT_INT2:
return BasicType.SHORT_TYPE;
case REDSHIFT_INTEGER:
case REDSHIFT_INT:
case REDSHIFT_INT4:
return BasicType.INT_TYPE;
case REDSHIFT_BIGINT:
case REDSHIFT_INT8:
case REDSHIFT_OID:
return BasicType.LONG_TYPE;
case REDSHIFT_DECIMAL:
case REDSHIFT_NUMERIC:
return new DecimalType(precision, scale);
case REDSHIFT_REAL:
case REDSHIFT_FLOAT4:
return BasicType.FLOAT_TYPE;
case REDSHIFT_DOUBLE_PRECISION:
case REDSHIFT_FLOAT8:
case REDSHIFT_FLOAT:
return BasicType.DOUBLE_TYPE;
case REDSHIFT_BOOLEAN:
case REDSHIFT_BOOL:
return BasicType.BOOLEAN_TYPE;
case REDSHIFT_CHAR:
case REDSHIFT_CHARACTER:
case REDSHIFT_NCHAR:
case REDSHIFT_BPCHAR:
case REDSHIFT_VARCHAR:
case REDSHIFT_CHARACTER_VARYING:
case REDSHIFT_NVARCHAR:
case REDSHIFT_TEXT:
case REDSHIFT_SUPER:
return BasicType.STRING_TYPE;
case REDSHIFT_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case REDSHIFT_GEOMETRY:
return PrimitiveByteArrayType.INSTANCE;
case REDSHIFT_TIME:
case REDSHIFT_TIME_WITH_TIME_ZONE:
case REDSHIFT_TIMETZ:
return LocalTimeType.LOCAL_TIME_TYPE;
case REDSHIFT_TIMESTAMP:
case REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE:
case REDSHIFT_TIMESTAMPTZ:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
EricJoy2048 marked this conversation as resolved.
Show resolved Hide resolved
String.format(
"Doesn't support REDSHIFT type '%s' on column '%s' yet.",
redshiftType, jdbcColumnName));
}
}
}