Skip to content

Commit

Permalink
Revert "[flink] Sync the table with same name in different db (apache…
Browse files Browse the repository at this point in the history
…#1743)"

This reverts commit 28bd193.
  • Loading branch information
yuzelin committed Aug 9, 2023
1 parent 28bd193 commit 15db061
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 69 deletions.
6 changes: 3 additions & 3 deletions docs/content/how-to/cdc-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ under database `source_db`. The command to submit the job looks like:
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4 \
--including-tables 'source_db.product|source_db.user|source_db.address'
--including-tables 'product|user|address'
```
At a later point we would like the job to also synchronize tables [order, custom],
Expand All @@ -222,7 +222,7 @@ The command to recover from previous snapshot and add new tables to synchronize
--catalog-conf metastore=hive \
--catalog-conf uri=thrift://hive-metastore:9083 \
--table-conf bucket=4 \
--including-tables 'source_db.product|source_db.user|source_db.address|source_db.order|source_db.custom'
--including-tables 'product|user|address|order|custom'
```
{{< hint info >}}
Expand All @@ -249,7 +249,7 @@ synchronize all the `db.+.tbl.+` into tables `test_db.tbl1`, `test_db.tbl2` ...
--table-conf bucket=4 \
--table-conf changelog-producer=input \
--table-conf sink.parallelism=4 \
--including-tables 'db.+.tbl.+'
--including-tables 'tbl.+'
```
By setting database-name to a regular expression, the synchronization job will capture all tables under matched databases
Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/mysql_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
</tr>
<tr>
<td><h5>--including-tables</h5></td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'db1.a|db2.b|db2.c'.Regular expression is supported, for example, specifying "--including-tables db1.test|db2.paimon.*" means to synchronize table 'db1.test' and all tables start with 'db2.paimon'.</td>
<td>It is used to specify which source tables are to be synchronized. You must use '|' to separate multiple tables.Because '|' is a special character, a comma is required, for example: 'a|b|c'.Regular expression is supported, for example, specifying "--including-tables test|paimon.*" means to synchronize table 'test' and all tables start with 'paimon'.</td>
</tr>
<tr>
<td><h5>--excluding-tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void setRawEvent(String rawEvent) {
try {
root = objectMapper.readValue(rawEvent, JsonNode.class);
payload = root.get("payload");
currentTable = getDatabaseName() + "." + payload.get("source").get("table").asText();
currentTable = payload.get("source").get("table").asText();
shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -164,8 +164,7 @@ public void setRawEvent(String rawEvent) {

@Override
public String parseTableName() {
String tableName = payload.get("source").get("table").asText();
return tableNameConverter.convert(Identifier.create(getDatabaseName(), tableName));
return tableNameConverter.convert(Identifier.create(getDatabaseName(), currentTable));
}

private boolean isSchemaChange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ private Predicate<MySqlSchema> monitorTablePredication() {
schema.identifier());
return false;
}
return shouldMonitorTable(
schema.identifier().getDatabaseName() + "." + schema.tableName());
return shouldMonitorTable(schema.tableName());
};
}

Expand Down Expand Up @@ -410,8 +409,14 @@ private String buildTableList(List<Identifier> excludedTables) {

// a table can be monitored only when its name meets the including pattern and doesn't
// be excluded by excluding pattern at the same time
String includingPattern =
String.format(
"%s%s(%s)",
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
separatorRex,
includingTables);
if (excludedTables.isEmpty()) {
return includingTables;
return includingPattern;
}

String excludingPattern =
Expand All @@ -425,7 +430,7 @@ private String buildTableList(List<Identifier> excludedTables) {
+ t.getObjectName()))
.collect(Collectors.joining("|"));
excludingPattern = "?!" + excludingPattern;
return String.format("(%s)(%s)", excludingPattern, includingTables);
return String.format("(%s)(%s)", excludingPattern, includingPattern);
}

throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " + mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private void testTableAffixImpl(Statement statement) throws Exception {
public void testIncludingTables() throws Exception {
includingAndExcludingTablesImpl(
"paimon_sync_database_including",
"paimon_sync_database_including.flink|paimon_sync_database_including.paimon.+",
"flink|paimon.+",
null,
Arrays.asList("flink", "paimon_1", "paimon_2"),
Collections.singletonList("ignored"));
Expand All @@ -575,67 +575,18 @@ public void testExcludingTables() throws Exception {
includingAndExcludingTablesImpl(
"paimon_sync_database_excluding",
null,
"paimon_sync_database_excluding.flink|paimon_sync_database_excluding.paimon.+",
"flink|paimon.+",
Collections.singletonList("sync"),
Arrays.asList("flink", "paimon_1", "paimon_2"));
}

@Test
@Timeout(60)
public void testSameTableNameInDifferentDatabase() throws Exception {
String databaseName = "paimon_sync_database_excluding|paimon_sync_database_including";
String includingTables = "paimon_sync_database_including.paimon_1";
String excludingTables = "paimon_sync_database_excluding.paimon_1";

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", databaseName);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(1000);
env.setRestartStrategy(RestartStrategies.noRestart());

Map<String, String> tableConfig = getBasicTableConfig();
MySqlSyncDatabaseAction action =
new MySqlSyncDatabaseAction(
mySqlConfig,
warehouse,
database,
false,
true,
null,
null,
includingTables,
excludingTables,
Collections.emptyMap(),
tableConfig,
DIVIDED);
action.build(env);
JobClient client = env.executeAsync();
waitJobRunning(client);

try (Statement statement = getStatement()) {
statement.executeUpdate(
"INSERT INTO paimon_sync_database_including.paimon_1 VALUES (1),(2)");
statement.executeUpdate(
"INSERT INTO paimon_sync_database_excluding.paimon_1 VALUES (3),(4)");

FileStoreTable table = getFileStoreTable("paimon_1");
RowType rowType =
RowType.of(new DataType[] {DataTypes.INT().notNull()}, new String[] {"k"});
List<String> primaryKeys = Collections.singletonList("k");
// only the paimon_sync_database_including.paimon_1 is synchronized
waitForResult(Arrays.asList("+I[1]", "+I[2]"), table, rowType, primaryKeys);
}
}

@Test
@Timeout(60)
public void testIncludingAndExcludingTables() throws Exception {
includingAndExcludingTablesImpl(
"paimon_sync_database_in_excluding",
"paimon_sync_database_in_excluding.flink|paimon_sync_database_in_excluding.paimon.+",
"paimon_sync_database_in_excluding.paimon_1",
"flink|paimon.+",
"paimon_1",
Arrays.asList("flink", "paimon_2"),
Arrays.asList("paimon_1", "test"));
}
Expand Down Expand Up @@ -805,8 +756,8 @@ public void testAddIgnoredTable() throws Exception {
true,
null,
null,
mySqlDatabase + ".t.+",
mySqlDatabase + "..*a$",
"t.+",
".*a$",
Collections.emptyMap(),
tableConfig,
COMBINED);
Expand Down Expand Up @@ -1106,7 +1057,7 @@ private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
true,
null,
null,
databaseName + ".t.+",
"t.+",
null,
catalogConfig,
tableConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public void testActionRunResult() throws Exception {
false,
null,
null,
".*shard_.*\\.t.+|.*shard_.*\\.s.+",
".*shard_.*\\.ta|.*shard_.*\\.sa",
"t.+|s.+",
"ta|sa",
Collections.emptyMap(),
tableConfig,
mode);
Expand Down

0 comments on commit 15db061

Please sign in to comment.