Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE][TOOLS-CDC]:add flink Command line support #105

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions flink-connector-oceanbase-tools-cdc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2024 OceanBase.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-parent</artifactId>
<version>${revision}</version>
</parent>

<artifactId>flink-connector-oceanbase-tools-cdc</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-db2-cdc</artifactId>
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oracle.ojdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.connector.flink.connection;

import com.oceanbase.connector.flink.OceanBaseConnectorOptions;
import com.oceanbase.connector.flink.tools.catalog.TableSchema;
import com.oceanbase.connector.flink.utils.OceanBaseToolsJdbcUtils;

public class OceanBaseToolsConnectProvider extends OceanBaseConnectionProvider {

public OceanBaseToolsConnectProvider(OceanBaseConnectorOptions options) {
super(options);
}

public boolean databaseExists(String database) {
return OceanBaseToolsJdbcUtils.databaseExists(database, this::getConnection);
}

public void createDatabase(String database) {
OceanBaseToolsJdbcUtils.createDatabase(database, this::getConnection);
}

public boolean tableExists(String database, String table) {
return OceanBaseToolsJdbcUtils.tableExists(database, table, this::getConnection);
}

public void createTable(TableSchema schema) {
OceanBaseToolsJdbcUtils.createTable(schema, this::getConnection);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.connector.flink.table;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Date;
import java.sql.Time;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class OceanBaseJsonSerializationSchema extends AbstractRecordSerializationSchema<String> {

private static final long serialVersionUID = 1L;
private static final Logger log =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is best to use uppercase letters to be consistent with other places.

LoggerFactory.getLogger(OceanBaseJsonSerializationSchema.class);

private final TableInfo tableInfo;
private static final ObjectMapper objectMapper = new ObjectMapper();

public OceanBaseJsonSerializationSchema(TableInfo tableInfo) {
this.tableInfo = tableInfo;
}

@Override
public Record serialize(String rowDataStr) {
try {
JsonNode rowDataNode = objectMapper.readTree(rowDataStr);
DataChangeRecord.Type type;
String op = rowDataNode.path("op").asText();
if ("r".equals(op) || "c".equals(op)) {
type = DataChangeRecord.Type.UPSERT;
} else if ("d".equals(op)) {
type = DataChangeRecord.Type.DELETE;
} else {
throw new IllegalArgumentException("Unknown operation type: " + op);
}
int size = tableInfo.getFieldNames().size();
Object[] values = new Object[size];
for (int i = 0; i < size; i++) {
String fieldName = tableInfo.getFieldNames().get(i);
JsonNode fieldNode = rowDataNode.path("after").path(fieldName);
values[i] = objectMapper.convertValue(fieldNode, new TypeReference<Object>() {});
}

return new DataChangeRecord(tableInfo, type, values);
} catch (IOException e) {
log.error("Failed to parse rowData JSON: {}", rowDataStr, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw the exception out?

return null;
}
}

@Override
protected SerializationRuntimeConverter createNotNullConverter(LogicalType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better add a test for this method, covering all types.

switch (type.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case INTERVAL_YEAR_MONTH:
case BIGINT:
case INTERVAL_DAY_TIME:
case FLOAT:
case DOUBLE:
case BINARY:
case VARBINARY:
return data -> data;
case CHAR:
case VARCHAR:
return Object::toString;
case DATE:
return data -> Date.valueOf(LocalDate.ofEpochDay((int) data));
case TIME_WITHOUT_TIME_ZONE:
return data -> Time.valueOf(LocalTime.ofNanoOfDay((int) data * 1_000_000L));
case TIMESTAMP_WITHOUT_TIME_ZONE:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not support LocalZonedTimestampType, TimestampType?

return data -> ((TimestampData) data).toTimestamp();
case DECIMAL:
return data -> ((DecimalData) data).toBigDecimal();
case ARRAY:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also support convert Map, Row type to string

return data -> {
ArrayData arrayData = (ArrayData) data;
return IntStream.range(0, arrayData.size())
.mapToObj(i -> arrayData.getString(i).toString())
.collect(Collectors.joining(","));
};
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2024 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.connector.flink.tools.catalog;

public class FieldSchema {
private String name;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios will these fields be modified? How about make these fields final and remove all setters?

private String typeString;
private String defaultValue;
private String comment;

public Boolean getNullable() {
return nullable;
}

public void setNullable(Boolean nullable) {
this.nullable = nullable;
}

private Boolean nullable;

public FieldSchema() {}

public FieldSchema(String name, String typeString, String comment, Boolean nullable) {
this.name = name;
this.typeString = typeString;
this.comment = comment;
this.nullable = nullable;
}

public FieldSchema(
String name, String typeString, String defaultValue, String comment, Boolean nullable) {
this.name = name;
this.typeString = typeString;
this.defaultValue = defaultValue;
this.comment = comment;
this.nullable = nullable;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getTypeString() {
return typeString;
}

public void setTypeString(String typeString) {
this.typeString = typeString;
}

public String getComment() {
return comment;
}

public void setComment(String comment) {
this.comment = comment;
}

public String getDefaultValue() {
return defaultValue;
}

public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}

@Override
public String toString() {
return "FieldSchema{"
+ "name='"
+ name
+ '\''
+ ", typeString='"
+ typeString
+ '\''
+ ", defaultValue='"
+ defaultValue
+ '\''
+ ", comment='"
+ comment
+ '\''
+ ", nullable='"
+ nullable
+ '\''
+ '}';
}
}
Loading