diff --git a/.github/workflows/build_main.yml b/.github/workflows/build_main.yml new file mode 100644 index 00000000..2bbfdb1e --- /dev/null +++ b/.github/workflows/build_main.yml @@ -0,0 +1,34 @@ +name: Build Main + +on: + push: + branches: + - main + +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'zulu' + cache: 'maven' + - name: Maven Install + run: mvn clean install -DskipTests=true + + test-jdbc: + name: Test (flink-connector-oceanbase) + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'zulu' + cache: 'maven' + - name: Maven Test + run: mvn verify -pl :flink-connector-oceanbase diff --git a/.github/workflows/build_pr.yml b/.github/workflows/build_pr.yml new file mode 100644 index 00000000..2334f03f --- /dev/null +++ b/.github/workflows/build_pr.yml @@ -0,0 +1,36 @@ +name: Build PR + +on: + pull_request: + paths-ignore: + - 'docs/**' + - '**.md' + - '.*' + +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'zulu' + cache: 'maven' + - name: Maven Install + run: mvn clean install -DskipTests=true + + test-jdbc: + name: Test (flink-connector-oceanbase) + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'zulu' + cache: 'maven' + - name: Maven Test + run: mvn verify -pl :flink-connector-oceanbase diff --git a/.github/workflows/maven_build_main.yml b/.github/workflows/maven_build_main.yml deleted file mode 100644 index a0a1fec2..00000000 --- a/.github/workflows/maven_build_main.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: Maven Build on Main Branch - -on: - push: - branches: - - main - -jobs: - build: - name: Maven Build - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-java@v3 - with: - java-version: '8' - distribution: 'zulu' - cache: 'maven' - - name: Build with Maven - run: mvn clean install diff --git a/.github/workflows/maven_build_pr.yml b/.github/workflows/maven_build_pr.yml deleted file mode 100644 index f9003b42..00000000 --- a/.github/workflows/maven_build_pr.yml +++ /dev/null @@ -1,22 +0,0 @@ -name: Maven Build on Pull Request - -on: - pull_request: - paths-ignore: - - 'docs/**' - - '**.md' - - '.*' - -jobs: - build: - name: Maven Build - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-java@v3 - with: - java-version: '8' - distribution: 'zulu' - cache: 'maven' - - name: Build with Maven - run: mvn clean install diff --git a/README.md b/README.md index 00dc83aa..d3ad5b30 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ English | [简体中文](README_CN.md) -[![Build Status](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/maven_build_main.yml/badge.svg?branch=main)](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/maven_build_main.yml) +[![Build Status](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/build_main.yml/badge.svg?branch=main)](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/build_main.yml) [![Release](https://img.shields.io/github/release/oceanbase/flink-connector-oceanbase.svg)](https://github.com/oceanbase/flink-connector-oceanbase/releases) [![License](https://img.shields.io/badge/license-Mulan%20PSL%20v2-green.svg)](LICENSE) diff --git a/README_CN.md b/README_CN.md index 7f0bfc1d..90f32995 100644 --- a/README_CN.md +++ b/README_CN.md @@ -2,7 +2,7 @@ [English](README.md) | 简体中文 -[![Build Status](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/maven_build_main.yml/badge.svg?branch=main)](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/maven_build_main.yml) +[![Build Status](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/build_main.yml/badge.svg?branch=main)](https://github.com/oceanbase/flink-connector-oceanbase/actions/workflows/build_main.yml) [![Release](https://img.shields.io/github/release/oceanbase/flink-connector-oceanbase.svg)](https://github.com/oceanbase/flink-connector-oceanbase/releases) [![License](https://img.shields.io/badge/license-Mulan%20PSL%20v2-green.svg)](LICENSE) diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseContainer.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseContainer.java new file mode 100644 index 00000000..d2422295 --- /dev/null +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/OceanBaseContainer.java @@ -0,0 +1,109 @@ +package com.oceanbase.connector.flink; + +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; + +public class OceanBaseContainer extends JdbcDatabaseContainer { + + public static final String DOCKER_IMAGE_NAME = "oceanbase/oceanbase-ce"; + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse(DOCKER_IMAGE_NAME); + + private static final Integer SQL_PORT = 2881; + private static final Integer RPC_PORT = 2882; + + private static final String DEFAULT_USERNAME = "root"; + private static final String DEFAULT_PASSWORD = ""; + private static final String DEFAULT_TENANT_NAME = "test"; + private static final String DEFAULT_DATABASE_NAME = "test"; + + private String tenantName = DEFAULT_TENANT_NAME; + + public OceanBaseContainer(String dockerImageName) { + this(DockerImageName.parse(dockerImageName)); + } + + public OceanBaseContainer(DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + + this.waitStrategy = + Wait.forLogMessage(".*boot success!.*", 1) + .withStartupTimeout(Duration.ofMinutes(5)); + + addExposedPorts(SQL_PORT, RPC_PORT); + } + + @Override + public String getDriverClassName() { + return "com.oceanbase.jdbc.Driver"; + } + + public Integer getSqlPort() { + return getActualPort(SQL_PORT); + } + + public Integer getActualPort(int port) { + return "host".equals(getNetworkMode()) ? port : getMappedPort(port); + } + + @Override + public String getJdbcUrl() { + return getJdbcUrl(DEFAULT_DATABASE_NAME); + } + + public String getJdbcUrl(String databaseName) { + String additionalUrlParams = constructUrlParameters("?", "&"); + return "jdbc:oceanbase://" + + getHost() + + ":" + + getSqlPort() + + "/" + + databaseName + + additionalUrlParams; + } + + public OceanBaseContainer withTenant(String tenantName) { + this.tenantName = tenantName; + return this; + } + + public String getTenantName() { + return tenantName; + } + + @Override + public String getDatabaseName() { + return DEFAULT_DATABASE_NAME; + } + + @Override + public String getUsername() { + return DEFAULT_USERNAME + "@" + tenantName; + } + + @Override + public String getPassword() { + return DEFAULT_PASSWORD; + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } + + @Override + protected void waitUntilContainerStarted() { + getWaitStrategy().waitUntilReady(this); + } + + @Override + protected void configure() { + withEnv("MODE", "slim"); + withEnv("OB_TENANT_NAME", tenantName); + } +} diff --git a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/sink/OceanBaseSinkTest.java b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/sink/OceanBaseSinkTest.java index 7e8100b2..bb819644 100644 --- a/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/sink/OceanBaseSinkTest.java +++ b/flink-connector-oceanbase/src/test/java/com/oceanbase/connector/flink/sink/OceanBaseSinkTest.java @@ -15,32 +15,47 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.TestLogger; -import com.oceanbase.connector.flink.dialect.OceanBaseDialect; -import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect; -import org.junit.Ignore; +import com.oceanbase.connector.flink.OceanBaseContainer; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; -import java.util.concurrent.ThreadLocalRandom; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; -@Ignore -public class OceanBaseSinkTest { +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class OceanBaseSinkTest extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(OceanBaseSinkTest.class); - private static final OceanBaseDialect DIALECT = new OceanBaseMySQLDialect(); + private OceanBaseContainer container; + + @Before + public void before() { + container = + new OceanBaseContainer(OceanBaseContainer.DOCKER_IMAGE_NAME) + .withCopyFileToContainer( + MountableFile.forClasspathResource("ddl/init.sql"), + "/root/boot/init.d/init.sql") + .withLogConsumer(new Slf4jLogConsumer(LOG)); - private static final String JDBC_URL = ""; - private static final String CLUSTER_NAME = ""; - private static final String TENANT_NAME = ""; - private static final String USERNAME = ""; - private static final String PASSWORD = ""; + Startables.deepStart(container).join(); + } @Test public void testSink() throws Exception { @@ -50,92 +65,92 @@ public void testSink() throws Exception { StreamTableEnvironment.create( execEnv, EnvironmentSettings.newInstance().inStreamingMode().build()); - String schemaName = "test"; - String tableName = "user"; - - String createTableSql = - "CREATE TABLE %s (" - + "id bigint(10) primary key," - + "name varchar(20)," - + "age int(10)," - + "height double," - + "birthday date)" - + "PARTITION BY HASH(id) " - + "PARTITIONS 4"; - - String fullTableName = DIALECT.getFullTableName(schemaName, tableName); - - try (Connection connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD); - Statement statement = connection.createStatement()) { - statement.execute(String.format("DROP TABLE IF EXISTS %s", fullTableName)); - statement.execute(String.format(createTableSql, fullTableName)); - } + String tableName = "products"; tEnv.executeSql( String.format( "CREATE TEMPORARY TABLE target (" - + " id BIGINT," - + " name STRING," - + " age INT," - + " height DOUBLE," - + " birthday DATE," - + " PRIMARY KEY (id) NOT ENFORCED" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(20, 10)," + + " PRIMARY KEY (`id`) NOT ENFORCED" + ") with (" + " 'connector'='oceanbase'," + " 'url'='%s'," - + " 'cluster-name'='%s'," - + " 'tenant-name'='%s'," + " 'schema-name'='%s'," + " 'table-name'='%s'," + " 'username'='%s'," + " 'password'='%s'," + " 'compatible-mode'='mysql'," - + " 'connection-pool-properties'='druid.initialSize=4;druid.maxActive=20;'," - + " 'partition.enabled'='true'," - + " 'partition.number'='4'" + + " 'connection-pool-properties'='druid.initialSize=4;druid.maxActive=20;'" + ");", - JDBC_URL, - CLUSTER_NAME, - TENANT_NAME, - schemaName, + container.getJdbcUrl(), + container.getDatabaseName(), tableName, - USERNAME, - PASSWORD)); + container.getUsername(), + container.getPassword())); - StringBuilder sb = new StringBuilder("insert into target values "); - for (int i = 0; i < 100; i++) { - if (i != 0) { - sb.append(","); - } - sb.append( - String.format( - "(%d, '%s', %d, %f, DATE '2023-%02d-%02d')", - i, - "name" + i, - ThreadLocalRandom.current().nextInt(18, 60), - ThreadLocalRandom.current().nextDouble(1.5, 2.0), - ThreadLocalRandom.current().nextInt(1, 6), - ThreadLocalRandom.current().nextInt(1, 28))); - } - tEnv.executeSql(sb.toString()).await(); + tEnv.executeSql( + "INSERT INTO target " + + "VALUES (101, 'scooter', 'Small 2-wheel scooter', 3.14)," + + " (102, 'car battery', '12V car battery', 8.1)," + + " (103, '12-pack drill bits', '12-pack of drill bits with sizes ranging from #40 to #3', 0.8)," + + " (104, 'hammer', '12oz carpenter''s hammer', 0.75)," + + " (105, 'hammer', '14oz carpenter''s hammer', 0.875)," + + " (106, 'hammer', '16oz carpenter''s hammer', 1.0)," + + " (107, 'rocks', 'box of assorted rocks', 5.3)," + + " (108, 'jacket', 'water resistent black wind breaker', 0.1)," + + " (109, 'spare tire', '24 inch spare tire', 22.2);") + .await(); + + List expected = + Arrays.asList( + "101,scooter,Small 2-wheel scooter,3.1400000000", + "102,car battery,12V car battery,8.1000000000", + "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8000000000", + "104,hammer,12oz carpenter's hammer,0.7500000000", + "105,hammer,14oz carpenter's hammer,0.8750000000", + "106,hammer,16oz carpenter's hammer,1.0000000000", + "107,rocks,box of assorted rocks,5.3000000000", + "108,jacket,water resistent black wind breaker,0.1000000000", + "109,spare tire,24 inch spare tire,22.2000000000"); - try (Connection connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD); + List actual = new ArrayList<>(); + try (Connection connection = + DriverManager.getConnection( + container.getJdbcUrl(), + container.getUsername(), + container.getPassword()); Statement statement = connection.createStatement()) { - ResultSet rs = statement.executeQuery(String.format("SELECT * FROM %s", fullTableName)); + ResultSet rs = statement.executeQuery("SELECT * FROM products"); ResultSetMetaData metaData = rs.getMetaData(); - int count = 0; + while (rs.next()) { - sb = new StringBuilder("Row ").append(count++).append(": { "); + StringBuilder sb = new StringBuilder(); for (int i = 0; i < metaData.getColumnCount(); i++) { if (i != 0) { - sb.append(", "); + sb.append(","); } - sb.append(metaData.getColumnName(i + 1)) - .append(": ") - .append(rs.getObject(i + 1)); + sb.append(rs.getObject(i + 1)); } - LOG.info(sb.append("}").toString()); + actual.add(sb.toString()); } } + + assertEqualsInAnyOrder(expected, actual); + } + + public static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); } } diff --git a/flink-connector-oceanbase/src/test/resources/ddl/init.sql b/flink-connector-oceanbase/src/test/resources/ddl/init.sql new file mode 100644 index 00000000..c0741141 --- /dev/null +++ b/flink-connector-oceanbase/src/test/resources/ddl/init.sql @@ -0,0 +1,9 @@ +use test; + +CREATE TABLE products +( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight DECIMAL(20, 10) +); diff --git a/pom.xml b/pom.xml index fceaf5ee..dc39dc98 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,19 @@ See the Mulan PSL v2 for more details. ${flink.version} test + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + org.testcontainers + jdbc + 1.19.0 + test +