diff --git a/pom.xml b/pom.xml index 6c0bb7e719b..4d4a3731ae6 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ org.apache apache 31 + org.apache.seatunnel diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index 6f2b6adeb25..b268fe612e8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -115,14 +115,14 @@ public static List getCatalogTables( return optionalCatalog .map( c -> { - long startTime = System.currentTimeMillis(); try (Catalog catalog = c) { + long startTime = System.currentTimeMillis(); catalog.open(); List catalogTables = catalog.getTables(readonlyConfig); log.info( String.format( - "Get catalog tables, cost time: %d", + "Get catalog tables, cost time: %d ms", System.currentTimeMillis() - startTime)); if (catalogTables.isEmpty()) { throw new SeaTunnelException( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index a6632a58732..5527417e916 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -174,12 +174,16 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured , use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + if (useTableStats) { // The statement used to get approximate row count which is less // accurate than COUNT(*), but is more efficient for large table. diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 1cfeb8d7056..8dedc6dfc19 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -184,12 +184,16 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured, use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + if (useTableStats) { TablePath tablePath = table.getTablePath(); String analyzeTable = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index d1bf6257ec5..51c5eb67d21 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -155,12 +155,15 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured, use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); if (useTableStats) { String rowCountQuery = String.format( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index 8826e1fdc9e..87e7418966d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -165,12 +165,16 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta // 2. If a query is configured but does not contain a WHERE clause and tablePath is // configured, use TABLE STATUS. // 3. If a query is configured with a WHERE clause, or a query statement is configured but - // tablePath is not, use COUNT(*). + // tablePath is TablePath.DEFAULT, use COUNT(*). boolean useTableStats = StringUtils.isBlank(table.getQuery()) || (!table.getQuery().toLowerCase().contains("where") - && table.getTablePath() != null); + && table.getTablePath() != null + && !TablePath.DEFAULT + .getFullName() + .equals(table.getTablePath().getFullName())); + if (useTableStats) { TablePath tablePath = table.getTablePath(); try (Statement stmt = connection.createStatement()) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java index a7a9e3f7024..5ae72f6d956 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-amazondynamodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/amazondynamodb/AmazondynamodbIT.java @@ -94,9 +94,11 @@ public class AmazondynamodbIT extends TestSuiteBase implements TestResource { @TestTemplate public void testAmazondynamodb(TestContainer container) throws Exception { + assertHasData(SOURCE_TABLE); Container.ExecResult execResult = container.executeJob(AMAZONDYNAMODB_JOB_CONFIG); Assertions.assertEquals(0, execResult.getExitCode()); - assertHasData(); + assertHasData(SOURCE_TABLE); + assertHasData(SINK_TABLE); compareResult(); clearSinkTable(); } @@ -168,10 +170,10 @@ private void clearSinkTable() { createTable(dynamoDbClient, SINK_TABLE); } - private void assertHasData() { - ScanResponse scan = - dynamoDbClient.scan(ScanRequest.builder().tableName(SINK_TABLE).build()); - Assertions.assertTrue(scan.hasItems(), "sink table is empty."); + private void assertHasData(String tableName) { + ScanResponse scan = dynamoDbClient.scan(ScanRequest.builder().tableName(tableName).build()); + Assertions.assertTrue( + !scan.items().isEmpty(), String.format("table %s is empty.", tableName)); } private void compareResult() {