Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE][TOOLS-CDC]:add flink Command line support #105

Closed
wants to merge 6 commits into from

Conversation

xxsc0529
Copy link
Contributor

Summary

When synchronizing data to OceanBase, the flink CLI is supported. close #101

Solution Description

Define program entries, receive parameters, and parse them. Use Oceanbase Catalog to obtain the information required by Flink and create the corresponding sink。Here's how it works.
`CREATE TABLE test_history (
itemid BIGINT NOT NULL,
clock INT DEFAULT 0 NOT NULL,
value DOUBLE PRECISION DEFAULT 0.0000 NOT NULL,
ns INT DEFAULT 0 NOT NULL,
PRIMARY KEY (itemid, clock, ns)
) ;

CREATE TABLE test_history_uint (
itemid BIGINT NOT NULL,
clock INT DEFAULT 0 NOT NULL,
value DECIMAL(20,0) DEFAULT 0 NOT NULL,
ns INT DEFAULT 0 NOT NULL,
PRIMARY KEY (itemid, clock, ns)
) ;

CREATE TABLE test_history_str (
itemid BIGINT NOT NULL,
clock INT DEFAULT 0 NOT NULL,
value VARCHAR(255) DEFAULT '' NOT NULL,
ns INT DEFAULT 0 NOT NULL,
PRIMARY KEY (itemid, clock, ns)
);The command line is as follows./bin/flink run
-Dexecution.checkpointing.interval=10s
-Dparallelism.default=1
-c com.oceanbase.connector.flink.tools.cdc.CdcTools
lib/flink-connector-oceanbase-tools-cdc-1.4-SNAPSHOT.jar
mysql-sync-database
--database test_db
--mysql-conf hostname=xxxx
--mysql-conf port=3306
--mysql-conf username=root
--mysql-conf password=xxxx
--mysql-conf database-name=test_db
--including-tables "tbl1|test.*"
--sink-conf username=xxxx@xxxx
--sink-conf password=xxxx
--sink-conf url=jdbc:mysql://xxxx:xxxx
--sink-conf sink.label-prefix=label
--table-conf replication_num=1`
The sink-config configuration is the configuration of the OceanBase database

@whhe whhe self-requested a review November 12, 2024 10:28
}
sb.append(";");

System.out.println("Generated DDL: " + sb);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use logger but not System.out, check it in other files as well.


return new DataChangeRecord(tableInfo, type, values);
} catch (IOException e) {
log.error("Failed to parse rowData JSON: {}", rowDataStr, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw the exception out?

package com.oceanbase.connector.flink.tools.catalog;

public class FieldSchema {
private String name;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios will these fields be modified? How about make these fields final and remove all setters?

sb.append(", ");
}

public static String identifier(String name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use OceanBaseDialect.quoteIdentifier

import java.util.Map;

public class TableSchema {
public static final String DORIS_TABLE_REGEX = "^[a-zA-Z][a-zA-Z0-9-_]*$";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated field.

private List<String> distributeKeys = new ArrayList<>();
private Map<String, String> properties = new HashMap<>();

private Integer tableBuckets;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is tableBuckets used for?

public abstract String convertToOceanBaseType(
String fieldType, Integer precision, Integer scale);

public abstract String getCdcTableName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it is not necessary while there is already a method getTableIdentifier here.

boolean createTableOnly = params.has(DatabaseSyncConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(DatabaseSyncConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(DatabaseSyncConfig.IGNORE_INCOMPATIBLE);
boolean singleSink = params.has(DatabaseSyncConfig.SINGLE_SINK);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it used for?


public class DatabaseSyncConfig {

public static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a short description for every config key.

@whhe
Copy link
Member

whhe commented Nov 28, 2024

There should be at least a test case for mysql cdc, please add one on the github workflow.

@whhe whhe requested a review from yuanoOo November 28, 2024 10:00
Copy link
Member

@yuanoOo yuanoOo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution, I left some comments.

return data -> Date.valueOf(LocalDate.ofEpochDay((int) data));
case TIME_WITHOUT_TIME_ZONE:
return data -> Time.valueOf(LocalTime.ofNanoOfDay((int) data * 1_000_000L));
case TIMESTAMP_WITHOUT_TIME_ZONE:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not support LocalZonedTimestampType, TimestampType?

return data -> ((TimestampData) data).toTimestamp();
case DECIMAL:
return data -> ((DecimalData) data).toBigDecimal();
case ARRAY:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also support convert Map, Row type to string

}

@Override
protected SerializationRuntimeConverter createNotNullConverter(LogicalType type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better add a test for this method, covering all types.

public class OceanBaseJsonSerializationSchema extends AbstractRecordSerializationSchema<String> {

private static final long serialVersionUID = 1L;
private static final Logger log =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is best to use uppercase letters to be consistent with other places.

return tableSchema;
}

private static List<String> buildDistributeKeys(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does DistributeKeys mean in oceanbase, oceanbase only has index keys and partition keys.

buildColumn(sb, field, keys.contains(entry.getKey()));
}

sb = sb.deleteCharAt(sb.length() - 1); // 删除最后一个逗号
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use Chinese.


/** OceanBase System Operate. */
@Public
public class OceanBaseSystem implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to change the name, it's confusing.

public static final String FLOAT = "FLOAT";
public static final String DOUBLE = "DOUBLE";
public static final String DECIMAL = "DECIMAL";
public static final String DECIMAL_V3 = "DECIMALV3";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? ob does not support this type.

public static final String BIGINT = "BIGINT";
public static final String LARGEINT = "LARGEINT";
// largeint is bigint unsigned in information_schema.COLUMNS
public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ob support this type?

public static final String DECIMAL = "DECIMAL";
public static final String DECIMAL_V3 = "DECIMALV3";
public static final String DATE = "DATE";
public static final String DATE_V2 = "DATEV2";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this

public static final String DATE = "DATE";
public static final String DATE_V2 = "DATEV2";
public static final String DATETIME = "DATETIME";
public static final String DATETIME_V2 = "DATETIMEV2";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this

public static final String VARCHAR = "VARCHAR";
public static final String STRING = "STRING";
public static final String HLL = "HLL";
public static final String BITMAP = "BITMAP";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And these three

public static final int MAX_CHAR_SIZE = 255;

/** Max size of varchar type of OceanBase. */
public static final int MAX_VARCHAR_SIZE = 65533;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember it was 262144

public class OceanBaseTypeMapper {

/** Max size of char type of OceanBase. */
public static final int MAX_CHAR_SIZE = 255;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember it was 256

@xxsc0529 xxsc0529 closed this by deleting the head repository Dec 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature]: Add command line tools to support Flink Jdbc Connector as source
3 participants