From 7db9346a96a53c13abd444e06b12520f1df5537c Mon Sep 17 00:00:00 2001
From: chenzy15 <chenzy15@ziroom.com>
Date: Thu, 20 Jul 2023 20:26:38 +0800
Subject: [PATCH] [Hotfix]Fix mongodb cdc e2e instability

---
 .../cdc/mongodb/config/MongodbSourceOptions.java     |  5 +++--
 .../src/test/java/mongodb/MongodbCDCIT.java          | 12 ++++++++++--
 .../src/test/resources/mongodbcdc_to_mysql.conf      |  6 +-----
 3 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
index df73772e071..dac939777f7 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -19,6 +19,7 @@
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
@@ -234,7 +235,7 @@ public class MongodbSourceOptions extends SourceOptions {
                     .withDescription(
                             "Decides if the table options contains Debezium client properties that start with prefix 'debezium'.");
 
-    public static final Option<StartupMode> STARTUP_MODE =
+    public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
             Options.key(SourceOptions.STARTUP_MODE_KEY)
                     .singleChoice(
                             StartupMode.class,
@@ -245,7 +246,7 @@ public class MongodbSourceOptions extends SourceOptions {
                             "Optional startup mode for CDC source, valid enumerations are "
                                     + "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");
 
-    public static final Option<StopMode> STOP_MODE =
+    public static final SingleChoiceOption<StopMode> STOP_MODE =
             Options.key(SourceOptions.STOP_MODE_KEY)
                     .singleChoice(StopMode.class, Collections.singletonList(StopMode.NEVER))
                     .defaultValue(StopMode.NEVER)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index dd7f985f176..c01b36ef188 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -34,7 +34,9 @@
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
@@ -78,7 +80,7 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource {
 
     // ----------------------------------------------------------------------------
     // mysql
-    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_HOST = "mysql_e2e";
 
     private static final String MYSQL_USER_NAME = "st_user";
 
@@ -104,8 +106,10 @@ private static MySqlContainer createMySqlContainer() {
         mySqlContainer.withDatabaseName(MYSQL_DATABASE);
         mySqlContainer.withUsername(MYSQL_USER_NAME);
         mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
+        mySqlContainer.withLogConsumer(
+                new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
         // For local test use
-        // mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
+        mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
         return mySqlContainer;
     }
 
@@ -134,6 +138,9 @@ public void startUp() {
         mongodbContainer = new MongoDBContainer(NETWORK);
         // For local test use
         mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
+        mongodbContainer.withLogConsumer(
+                new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mongodb-Docker-Image")));
+
         Startables.deepStart(Stream.of(mongodbContainer)).join();
         mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE);
         initConnection();
@@ -213,6 +220,7 @@ private List<List<Object>> querySql() {
                 for (int i = 1; i <= columnCount; i++) {
                     objects.add(resultSet.getObject(i));
                 }
+                log.info("Print mysql sink data:" + objects);
                 result.add(objects);
             }
             return result;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
index 7e4a492390b..12846c6a0c2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
@@ -14,9 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in seatunnel config
-######
 
 env {
   # You can set engine configuration here
@@ -45,11 +42,10 @@ source {
 
 sink {
   jdbc {
-    url = "jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
+    url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
     driver = "com.mysql.cj.jdbc.Driver"
     user = "st_user"
     password = "seatunnel"
-
     generate_sink_sql = true
     # You need to configure both database and table
     database = mongodb_cdc