Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Connector-V2][Mongodbcdc] Fix mongodb cdc e2e instability #5128

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
Expand Down Expand Up @@ -234,7 +235,7 @@ public class MongodbSourceOptions extends SourceOptions {
.withDescription(
"Decides if the table options contains Debezium client properties that start with prefix 'debezium'.");

public static final Option<StartupMode> STARTUP_MODE =
public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
Expand All @@ -245,7 +246,7 @@ public class MongodbSourceOptions extends SourceOptions {
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");

public static final Option<StopMode> STOP_MODE =
public static final SingleChoiceOption<StopMode> STOP_MODE =
Options.key(SourceOptions.STOP_MODE_KEY)
.singleChoice(StopMode.class, Collections.singletonList(StopMode.NEVER))
.defaultValue(StopMode.NEVER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
Expand Down Expand Up @@ -78,7 +80,7 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource {

// ----------------------------------------------------------------------------
// mysql
private static final String MYSQL_HOST = "mysql_cdc_e2e";
private static final String MYSQL_HOST = "mysql_e2e";

private static final String MYSQL_USER_NAME = "st_user";

Expand All @@ -104,8 +106,10 @@ private static MySqlContainer createMySqlContainer() {
mySqlContainer.withDatabaseName(MYSQL_DATABASE);
mySqlContainer.withUsername(MYSQL_USER_NAME);
mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
mySqlContainer.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
// For local test use
// mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
return mySqlContainer;
}

Expand Down Expand Up @@ -134,6 +138,9 @@ public void startUp() {
mongodbContainer = new MongoDBContainer(NETWORK);
// For local test use
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
mongodbContainer.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mongodb-Docker-Image")));

Startables.deepStart(Stream.of(mongodbContainer)).join();
mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE);
initConnection();
Expand Down Expand Up @@ -213,6 +220,7 @@ private List<List<Object>> querySql() {
for (int i = 1; i <= columnCount; i++) {
objects.add(resultSet.getObject(i));
}
log.info("Print mysql sink data:" + objects);
result.add(objects);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
Expand Down Expand Up @@ -45,11 +42,10 @@ source {

sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"

generate_sink_sql = true
# You need to configure both database and table
database = mongodb_cdc
Expand Down