Skip to content

Commit

Permalink
[Hotfix] Fix DEFAULT TABLE problem (#6352)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Apr 1, 2024
1 parent fd2a57d commit cdb1856
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 15 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
<version>31</version>
<relativePath />
</parent>

<groupId>org.apache.seatunnel</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ public static List<CatalogTable> getCatalogTables(
return optionalCatalog
.map(
c -> {
long startTime = System.currentTimeMillis();
try (Catalog catalog = c) {
long startTime = System.currentTimeMillis();
catalog.open();
List<CatalogTable> catalogTables =
catalog.getTables(readonlyConfig);
log.info(
String.format(
"Get catalog tables, cost time: %d",
"Get catalog tables, cost time: %d ms",
System.currentTimeMillis() - startTime));
if (catalogTables.isEmpty()) {
throw new SeaTunnelException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,16 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta
// 2. If a query is configured but does not contain a WHERE clause and tablePath is
// configured , use TABLE STATUS.
// 3. If a query is configured with a WHERE clause, or a query statement is configured but
// tablePath is not, use COUNT(*).
// tablePath is TablePath.DEFAULT, use COUNT(*).

boolean useTableStats =
StringUtils.isBlank(table.getQuery())
|| (!table.getQuery().toLowerCase().contains("where")
&& table.getTablePath() != null);
&& table.getTablePath() != null
&& !TablePath.DEFAULT
.getFullName()
.equals(table.getTablePath().getFullName()));

if (useTableStats) {
// The statement used to get approximate row count which is less
// accurate than COUNT(*), but is more efficient for large table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,16 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta
// 2. If a query is configured but does not contain a WHERE clause and tablePath is
// configured, use TABLE STATUS.
// 3. If a query is configured with a WHERE clause, or a query statement is configured but
// tablePath is not, use COUNT(*).
// tablePath is TablePath.DEFAULT, use COUNT(*).

boolean useTableStats =
StringUtils.isBlank(table.getQuery())
|| (!table.getQuery().toLowerCase().contains("where")
&& table.getTablePath() != null);
&& table.getTablePath() != null
&& !TablePath.DEFAULT
.getFullName()
.equals(table.getTablePath().getFullName()));

if (useTableStats) {
TablePath tablePath = table.getTablePath();
String analyzeTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,15 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta
// 2. If a query is configured but does not contain a WHERE clause and tablePath is
// configured, use TABLE STATUS.
// 3. If a query is configured with a WHERE clause, or a query statement is configured but
// tablePath is not, use COUNT(*).
// tablePath is TablePath.DEFAULT, use COUNT(*).

boolean useTableStats =
StringUtils.isBlank(table.getQuery())
|| (!table.getQuery().toLowerCase().contains("where")
&& table.getTablePath() != null);
&& table.getTablePath() != null
&& !TablePath.DEFAULT
.getFullName()
.equals(table.getTablePath().getFullName()));
if (useTableStats) {
String rowCountQuery =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,16 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta
// 2. If a query is configured but does not contain a WHERE clause and tablePath is
// configured, use TABLE STATUS.
// 3. If a query is configured with a WHERE clause, or a query statement is configured but
// tablePath is not, use COUNT(*).
// tablePath is TablePath.DEFAULT, use COUNT(*).

boolean useTableStats =
StringUtils.isBlank(table.getQuery())
|| (!table.getQuery().toLowerCase().contains("where")
&& table.getTablePath() != null);
&& table.getTablePath() != null
&& !TablePath.DEFAULT
.getFullName()
.equals(table.getTablePath().getFullName()));

if (useTableStats) {
TablePath tablePath = table.getTablePath();
try (Statement stmt = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ public class AmazondynamodbIT extends TestSuiteBase implements TestResource {

@TestTemplate
public void testAmazondynamodb(TestContainer container) throws Exception {
assertHasData(SOURCE_TABLE);
Container.ExecResult execResult = container.executeJob(AMAZONDYNAMODB_JOB_CONFIG);
Assertions.assertEquals(0, execResult.getExitCode());
assertHasData();
assertHasData(SOURCE_TABLE);
assertHasData(SINK_TABLE);
compareResult();
clearSinkTable();
}
Expand Down Expand Up @@ -168,10 +170,10 @@ private void clearSinkTable() {
createTable(dynamoDbClient, SINK_TABLE);
}

private void assertHasData() {
ScanResponse scan =
dynamoDbClient.scan(ScanRequest.builder().tableName(SINK_TABLE).build());
Assertions.assertTrue(scan.hasItems(), "sink table is empty.");
private void assertHasData(String tableName) {
ScanResponse scan = dynamoDbClient.scan(ScanRequest.builder().tableName(tableName).build());
Assertions.assertTrue(
!scan.items().isEmpty(), String.format("table %s is empty.", tableName));
}

private void compareResult() {
Expand Down

0 comments on commit cdb1856

Please sign in to comment.