Skip to content

Commit

Permalink
add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y committed Jan 19, 2024
1 parent bdcca72 commit f2c2869
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource {

private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1";
private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";

private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key";

Expand Down Expand Up @@ -373,6 +375,102 @@ public void testMultiTableWithRestore(TestContainer container)
}
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK and FLINK do not support multi table")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/postgrescdc_to_postgres_test_add_Filed.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});

// stream stage
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertAll(
() ->
Assertions.assertIterableEquals(
query(
getQuerySQL(
POSTGRESQL_SCHEMA,
SOURCE_TABLE_3)),
query(
getQuerySQL(
POSTGRESQL_SCHEMA,
SINK_TABLE_3)))));

Pattern jobIdPattern =
Pattern.compile(
".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*",
Pattern.DOTALL);
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
String jobId;
if (matcher.matches()) {
jobId = matcher.group(1);
} else {
throw new RuntimeException("Can not find jobId");
}

Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());

// add filed add insert source table data
addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
addFieldsForTable(POSTGRESQL_SCHEMA, SINK_TABLE_3);
insertSourceTableForAddFields(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);

// Restore job
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});

// stream stage
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertAll(
() ->
Assertions.assertIterableEquals(
query(
getQuerySQL(
POSTGRESQL_SCHEMA,
SOURCE_TABLE_3)),
query(
getQuerySQL(
POSTGRESQL_SCHEMA,
SINK_TABLE_3)))));

log.info("****************** container logs start ******************");
String containerLogs = container.getServerLogs();
log.info(containerLogs);
// pg cdc logs contain ERROR
// Assertions.assertFalse(containerLogs.contains("ERROR"));
log.info("****************** container logs end ******************");
} finally {
// Clear related content to ensure that multiple operations are not affected
clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_3);
}
}

@TestTemplate
public void testPostgresCdcCheckDataWithNoPrimaryKey(TestContainer container) throws Exception {

Expand Down Expand Up @@ -541,6 +639,20 @@ private void executeSql(String sql) {
}
}

private void addFieldsForTable(String database, String tableName) {

executeSql("ALTER TABLE " + database + "." + tableName + " ADD COLUMN f_big BIGINT");
}

private void insertSourceTableForAddFields(String database, String tableName) {
executeSql(
"INSERT INTO "
+ database
+ "."
+ tableName
+ " VALUES (2, '2', 32767, 65535, 2147483647);");
}

private void upsertDeleteSourceTable(String database, String tableName) {

executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,33 @@ CREATE TABLE full_types_no_primary_key
f_default_numeric NUMERIC
);

CREATE TABLE postgres_cdc_table_3
(
id INTEGER NOT NULL,
f_bytea BYTEA,
f_small SMALLINT,
f_int INTEGER,
PRIMARY KEY (id)
);

CREATE TABLE sink_postgres_cdc_table_3
(
id INTEGER NOT NULL,
f_bytea BYTEA,
f_small SMALLINT,
f_int INTEGER,
PRIMARY KEY (id)
);

ALTER TABLE postgres_cdc_table_1
REPLICA IDENTITY FULL;

ALTER TABLE postgres_cdc_table_2
REPLICA IDENTITY FULL;

ALTER TABLE postgres_cdc_table_3
REPLICA IDENTITY FULL;

ALTER TABLE sink_postgres_cdc_table_1
REPLICA IDENTITY FULL;

Expand All @@ -170,6 +191,9 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
'2020-07-17', '18:00:22', 500);

INSERT INTO postgres_cdc_table_3
VALUES (1, '2', 32767, 65535);

INSERT INTO full_types_no_primary_key
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second=7000000
read_limit.rows_per_second=400
}

source {
Postgres-CDC {
result_table_name = "customers_mysql_cdc"
username = "postgres"
password = "postgres"
database-names = ["postgres_cdc"]
schema-names = ["inventory"]
table-names = ["postgres_cdc.inventory.postgres_cdc_table_3"]
base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
decoding.plugin.name = "decoderbufs"
}
}

transform {

}

sink {
jdbc {
source_table_name = "customers_mysql_cdc"
url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
driver = "org.postgresql.Driver"
user = "postgres"
password = "postgres"

generate_sink_sql = true
# You need to configure both database and table
database = postgres_cdc
table = inventory.sink_postgres_cdc_table_3
primary_keys = ["id"]
}
}

0 comments on commit f2c2869

Please sign in to comment.