Skip to content

Commit

Permalink
[Improve][e2e] Improved e2e start sleep (apache#2677)
Browse files Browse the repository at this point in the history
* [Improve][e2e] Improved e2e start sleep

* update

* update

* fix 2675
  • Loading branch information
hailin0 authored and MRYOG committed Sep 16, 2022
1 parent 9b27a2d commit 62a08aa
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 47 deletions.
5 changes: 5 additions & 0 deletions seatunnel-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<version>${junit4.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ public void startIoTDBContainer() throws Exception {
// wait for IoTDB fully start
session = createSession();
given().ignoreExceptions()
.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> session.open());
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> session.open());
initIoTDBTimeseries();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
Expand All @@ -39,6 +41,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class FakeSourceToJdbcIT extends FlinkContainer {
Expand All @@ -54,9 +57,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
}

private void initializeJdbcTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

Expand Down Expand Up @@ -76,9 +76,11 @@ public void startGreenplumContainer() throws ClassNotFoundException, SQLExceptio
// wait for Greenplum fully start
Class.forName(GREENPLUM_DRIVER);
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
initializeJdbcTable();
batchInsertData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -37,6 +39,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class JdbcSourceToConsoleIT extends FlinkContainer {
Expand All @@ -52,9 +55,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
batchInsertData();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand All @@ -36,7 +37,6 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
Expand Down Expand Up @@ -77,7 +77,8 @@ public void startMongoContainer() {
Startables.deepStart(Stream.of(mongodbContainer)).join();
log.info("Mongodb container started");
Awaitility.given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initConnection);
this.generateTestData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.e2e.flink.v2.redis;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

Expand Down Expand Up @@ -56,9 +56,11 @@ public void startRedisContainer() {
Startables.deepStart(Stream.of(redisContainer)).join();
log.info("Redis container started");
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initJedis);
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initJedis);
this.generateTestData();
}

Expand Down
5 changes: 0 additions & 5 deletions seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,5 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.flink.clickhouse;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
Expand All @@ -39,6 +41,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class FakeSourceToClickhouseIT extends FlinkContainer {
Expand All @@ -59,9 +62,13 @@ public void startClickhouseContainer() throws InterruptedException {
Startables.deepStart(Stream.of(clickhouseServer)).join();
LOGGER.info("Clickhouse container started");
// wait for clickhouse fully start
Thread.sleep(5000L);
dataSource = createDatasource();
initializeClickhouseTable();
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeClickhouseTable());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ public void startIoTDBContainer() throws Exception {
// wait for IoTDB fully start
session = createSession();
given().ignoreExceptions()
.await()
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> session.open());
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> session.open());
initIoTDBTimeseries();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.e2e.spark.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.e2e.spark.v2.jdbc;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

Expand Down Expand Up @@ -76,9 +76,11 @@ public void startGreenplumContainer() throws ClassNotFoundException, SQLExceptio
// wait for Greenplum fully start
Class.forName(GREENPLUM_DRIVER);
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcConnection());
initializeJdbcTable();
batchInsertData();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.e2e.spark.v2.redis;

import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

Expand Down Expand Up @@ -56,9 +56,11 @@ public void startRedisContainer() {
Startables.deepStart(Stream.of(redisContainer)).join();
log.info("Redis container started");
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initJedis);
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(180, TimeUnit.SECONDS)
.untilAsserted(this::initJedis);
this.generateTestData();
}

Expand Down
6 changes: 0 additions & 6 deletions seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.spark.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

import com.google.common.collect.Lists;
Expand All @@ -39,6 +41,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class FakeSourceToJdbcIT extends SparkContainer {
Expand All @@ -54,9 +57,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
}

private void initializeJdbcTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.spark.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

import com.google.common.collect.Lists;
Expand All @@ -38,6 +40,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class JdbcSourceToConsoleIT extends SparkContainer {
Expand All @@ -54,9 +57,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
psl.setPortBindings(Lists.newArrayList("33306:3306"));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
batchInsertData();
}

Expand Down

0 comments on commit 62a08aa

Please sign in to comment.