diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java index 6795263fa71df..a27a90d9a7be0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java @@ -124,51 +124,62 @@ public ProcessContinuation processElement( // TODO(hengfeng): move this to DAO. String query = String.format("SELECT * FROM `%s` WHERE State = 'CREATED'", this.metadataTableName); - ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query)); - - long currentIndex = tracker.currentRestriction().getFrom(); + try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) { + long currentIndex = tracker.currentRestriction().getFrom(); + + // Output the records. + while (resultSet.next()) { + // TODO(hengfeng): change the log level in this file. + LOG.debug("Reading record currentIndex:" + currentIndex); + if (!tracker.tryClaim(currentIndex)) { + return ProcessContinuation.stop(); + } + PartitionMetadata metadata = buildPartitionMetadata(resultSet); + LOG.debug( + String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata)); + + currentIndex++; + + Instant now = Instant.now(); + LOG.debug("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now); + receiver.output(metadata); + + // TODO(hengfeng): investigate if we can move this to DAO. + this.databaseClient + .readWriteTransaction() + .run( + transaction -> { + // Update the record to SCHEDULED. + // TODO(hengfeng): use mutations instead. + Statement updateStatement = + Statement.newBuilder( + String.format( + "UPDATE `%s` " + + "SET State = 'SCHEDULED' " + + "WHERE PartitionToken = @PartitionToken", + this.metadataTableName)) + .bind("PartitionToken") + .to(metadata.getPartitionToken()) + .build(); + transaction.executeUpdate(updateStatement); + LOG.debug("Updated the record:" + metadata.getPartitionToken()); + return null; + }); + } + } - // Output the records. - while (resultSet.next()) { - // TODO(hengfeng): change the log level in this file. - LOG.debug("Reading record currentIndex:" + currentIndex); - if (!tracker.tryClaim(currentIndex)) { + // If there are no partitions in the table, we should stop this SDF + // function. + // TODO(hengfeng): move this query to DAO. + query = String.format("SELECT COUNT(*) FROM `%s`", this.metadataTableName); + try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) { + if (resultSet.next() && resultSet.getLong(0) == 0) { + if (!tracker.tryClaim(Long.MAX_VALUE)) { + LOG.warning("Failed to claim the end of range in DetectNewPartitionsDoFn."); + } return ProcessContinuation.stop(); } - PartitionMetadata metadata = buildPartitionMetadata(resultSet); - LOG.debug( - String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata)); - - currentIndex++; - - Instant now = Instant.now(); - LOG.debug("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now); - receiver.output(metadata); - - // TODO(hengfeng): investigate if we can move this to DAO. - this.databaseClient - .readWriteTransaction() - .run( - transaction -> { - // Update the record to SCHEDULED. - // TODO(hengfeng): use mutations instead. - Statement updateStatement = - Statement.newBuilder( - String.format( - "UPDATE `%s` " - + "SET State = 'SCHEDULED' " - + "WHERE PartitionToken = @PartitionToken", - this.metadataTableName)) - .bind("PartitionToken") - .to(metadata.getPartitionToken()) - .build(); - transaction.executeUpdate(updateStatement); - LOG.debug("Updated the record:" + metadata.getPartitionToken()); - return null; - }); } - - // TODO(hengfeng): investigate how we can terminate this function. return ProcessContinuation.resume().withResumeDelay(resumeDuration); }