Skip to content

Commit

Permalink
[Connector][JDBC]Support Redshift sink and source (#3615)
Browse files Browse the repository at this point in the history
* [Connector][JDBC]Support Redshift sink and source
  • Loading branch information
CalvinKirs authored Dec 6, 2022
1 parent 0b1bdb7 commit 8d9d863
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ there are some reference value for params above.
| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 |
| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |

## Example

Expand Down Expand Up @@ -207,3 +208,4 @@ sink {
- [Feature] Support Sqlite JDBC Sink ([3089](https://github.com/apache/incubator-seatunnel/pull/3089))
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3378](https://github.com/apache/incubator-seatunnel/issues/3378))
- [Feature] Support Doris JDBC Sink
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ there are some reference value for params above.
| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc |
| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |

## Example

Expand Down Expand Up @@ -164,3 +165,4 @@ parallel:
- [Feature] Support Teradata JDBC Source ([3362](https://github.com/apache/incubator-seatunnel/pull/3362))
- [Feature] Support JDBC Fetch Size Config ([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
- [Feature] Support Doris JDBC Source
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<sqlite.version>3.39.3.0</sqlite.version>
<tablestore.version>5.13.9</tablestore.version>
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.9</redshift.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -107,6 +108,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
<version>${redshift.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -160,6 +167,10 @@
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
</dependency>
<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.util.Optional;

public class RedshiftDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Redshift";
}

@Override
public JdbcRowConverter getRowConverter() {
return new RedshiftJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new RedshiftTypeMapper();
}

@Override
public Optional<String> getUpsertStatement(String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

@AutoService(JdbcDialectFactory.class)
public class RedshiftDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:redshift:");
}

@Override
public JdbcDialect create() {
return new RedshiftDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

public class RedshiftJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "Redshift";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

@Slf4j
public class RedshiftTypeMapper implements JdbcDialectTypeMapper {

/* ============================ data types ===================== */
private static final String REDSHIFT_SMALLINT = "SMALLINT";
private static final String REDSHIFT_INT2 = "INT2";
private static final String REDSHIFT_INTEGER = "INTEGER";
private static final String REDSHIFT_INT = "INT";
private static final String REDSHIFT_INT4 = "INT4";
private static final String REDSHIFT_BIGINT = "BIGINT";
private static final String REDSHIFT_INT8 = "INT8";

private static final String REDSHIFT_DECIMAL = "DECIMAL";
private static final String REDSHIFT_NUMERIC = "NUMERIC";
private static final String REDSHIFT_REAL = "REAL";
private static final String REDSHIFT_FLOAT4 = "FLOAT4";
private static final String REDSHIFT_DOUBLE_PRECISION = "DOUBLE PRECISION";
private static final String REDSHIFT_FLOAT8 = "FLOAT8";
private static final String REDSHIFT_FLOAT = "FLOAT";

private static final String REDSHIFT_BOOLEAN = "BOOLEAN";
private static final String REDSHIFT_BOOL = "BOOL";

private static final String REDSHIFT_CHAR = "CHAR";
private static final String REDSHIFT_CHARACTER = "CHARACTER";
private static final String REDSHIFT_NCHAR = "NCHAR";
private static final String REDSHIFT_BPCHAR = "BPCHAR";

private static final String REDSHIFT_VARCHAR = "VARCHAR";
private static final String REDSHIFT_CHARACTER_VARYING = "CHARACTER VARYING";
private static final String REDSHIFT_NVARCHAR = "NVARCHAR";
private static final String REDSHIFT_TEXT = "TEXT";

private static final String REDSHIFT_DATE = "DATE";
/*FIXME*/

private static final String REDSHIFT_GEOMETRY = "GEOMETRY";
private static final String REDSHIFT_OID = "OID";
private static final String REDSHIFT_SUPER = "SUPER";

private static final String REDSHIFT_TIME = "TIME";
private static final String REDSHIFT_TIME_WITH_TIME_ZONE = "TIME WITH TIME ZONE";

private static final String REDSHIFT_TIMETZ = "TIMETZ";
private static final String REDSHIFT_TIMESTAMP = "TIMESTAMP";
private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE = "TIMESTAMP WITHOUT TIME ZONE";

private static final String REDSHIFT_TIMESTAMPTZ = "TIMESTAMPTZ";

@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String redshiftType = metadata.getColumnTypeName(colIndex).toUpperCase();
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (redshiftType) {
case REDSHIFT_SMALLINT:
case REDSHIFT_INT2:
return BasicType.SHORT_TYPE;
case REDSHIFT_INTEGER:
case REDSHIFT_INT:
case REDSHIFT_INT4:
return BasicType.INT_TYPE;
case REDSHIFT_BIGINT:
case REDSHIFT_INT8:
case REDSHIFT_OID:
return BasicType.LONG_TYPE;
case REDSHIFT_DECIMAL:
case REDSHIFT_NUMERIC:
return new DecimalType(precision, scale);
case REDSHIFT_REAL:
case REDSHIFT_FLOAT4:
return BasicType.FLOAT_TYPE;
case REDSHIFT_DOUBLE_PRECISION:
case REDSHIFT_FLOAT8:
case REDSHIFT_FLOAT:
return BasicType.DOUBLE_TYPE;
case REDSHIFT_BOOLEAN:
case REDSHIFT_BOOL:
return BasicType.BOOLEAN_TYPE;
case REDSHIFT_CHAR:
case REDSHIFT_CHARACTER:
case REDSHIFT_NCHAR:
case REDSHIFT_BPCHAR:
case REDSHIFT_VARCHAR:
case REDSHIFT_CHARACTER_VARYING:
case REDSHIFT_NVARCHAR:
case REDSHIFT_TEXT:
case REDSHIFT_SUPER:
return BasicType.STRING_TYPE;
case REDSHIFT_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case REDSHIFT_GEOMETRY:
return PrimitiveByteArrayType.INSTANCE;
case REDSHIFT_TIME:
case REDSHIFT_TIME_WITH_TIME_ZONE:
case REDSHIFT_TIMETZ:
return LocalTimeType.LOCAL_TIME_TYPE;
case REDSHIFT_TIMESTAMP:
case REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE:
case REDSHIFT_TIMESTAMPTZ:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support REDSHIFT type '%s' on column '%s' yet.",
redshiftType, jdbcColumnName));
}
}
}

0 comments on commit 8d9d863

Please sign in to comment.