diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/CdcMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/CdcMetrics.java index 5b6721dc135e2..4b1f45dbd3518 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/CdcMetrics.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/CdcMetrics.java @@ -25,7 +25,7 @@ public class CdcMetrics { public static final Counter PARTITIONS_DETECTED_COUNTER = - Metrics.counter(ReadChangeStream.class, "partitions_detected"); + Metrics.counter(ReadChangeStream.class, "partitions_detected_count"); public static final Counter PARTITION_SPLIT_COUNTER = Metrics.counter(ReadChangeStream.class, "partition_split_count"); @@ -33,6 +33,9 @@ public class CdcMetrics { public static final Counter PARTITION_MERGE_COUNTER = Metrics.counter(ReadChangeStream.class, "partition_merge_count"); + public static final Distribution INITIAL_PARTITION_CREATED_TO_SCHEDULED_MS = + Metrics.distribution(ReadChangeStream.class, "initial_partition_created_to_scheduled_ms"); + public static final Distribution PARTITION_CREATED_TO_SCHEDULED_MS = Metrics.distribution(ReadChangeStream.class, "partition_created_to_scheduled_ms"); @@ -41,6 +44,15 @@ public class CdcMetrics { Metrics.distribution( ReadChangeStream.class, "watermark_to_latest_record_commit_timestamp_ms"); + public static final Distribution RECORD_COMMIT_TIMESTAMP_TO_READ_MS = + Metrics.distribution(ReadChangeStream.class, "record_commit_timestamp_to_read_ms"); + + public static final Distribution RECORD_READ_TO_EMITTED_MS = + Metrics.distribution(ReadChangeStream.class, "record_read_to_emitted_ms"); + public static final Distribution RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS = Metrics.distribution(ReadChangeStream.class, "record_commit_timestamp_to_emitted_ms"); + + public static final Counter DATA_RECORD_COUNTER = + Metrics.counter(ReadChangeStream.class, "data_record_count"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java index 27b23cd7a78f5..f57d292a09e16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/DetectNewPartitionsDoFn.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.spanner.cdc; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.INITIAL_PARTITION_CREATED_TO_SCHEDULED_MS; import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITIONS_DETECTED_COUNTER; import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITION_CREATED_TO_SCHEDULED_MS; @@ -141,9 +142,15 @@ public ProcessContinuation processElement( return ProcessContinuation.stop(); } PartitionMetadata metadata = buildPartitionMetadata(resultSet); - PARTITION_CREATED_TO_SCHEDULED_MS.update( - new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis()) - .getMillis()); + if (InitialPartition.isInitialPartition(metadata.getPartitionToken())) { + INITIAL_PARTITION_CREATED_TO_SCHEDULED_MS.update( + new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis()) + .getMillis()); + } else { + PARTITION_CREATED_TO_SCHEDULED_MS.update( + new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis()) + .getMillis()); + } LOG.debug( String.format( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/PostProcessingMetricsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/PostProcessingMetricsDoFn.java index dada48584ff01..5db81f4cede37 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/PostProcessingMetricsDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/PostProcessingMetricsDoFn.java @@ -17,10 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.spanner.cdc; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.DATA_RECORD_COUNTER; import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_COMMIT_TIMESTAMP_TO_READ_MS; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_READ_TO_EMITTED_MS; import java.io.Serializable; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord.Metadata; import org.apache.beam.sdk.transforms.DoFn; import org.joda.time.Duration; import org.joda.time.Instant; @@ -31,11 +35,17 @@ public class PostProcessingMetricsDoFn extends DoFn receiver) { - RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS.update( - new Duration( - new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime()), - new Instant()) - .getMillis()); - receiver.output(dataChangeRecord); + final Instant now = new Instant(); + final Metadata metadata = dataChangeRecord.getMetadata(); + final Instant readInstant = new Instant(metadata.getReadAt().toSqlTimestamp().getTime()); + final Instant commitInstant = + new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime()); + + RECORD_COMMIT_TIMESTAMP_TO_READ_MS.update(new Duration(commitInstant, readInstant).getMillis()); + RECORD_READ_TO_EMITTED_MS.update(new Duration(readInstant, now).getMillis()); + RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS.update(new Duration(commitInstant, now).getMillis()); + DATA_RECORD_COUNTER.inc(); + + receiver.outputWithTimestamp(dataChangeRecord, commitInstant); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java index fdd5e27e3e79e..5dd5744f429af 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java @@ -54,7 +54,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; // TODO: Add java docs @UnboundedPerElement @@ -92,24 +91,33 @@ public ReadChangeStreamPartitionDoFn( } @GetInitialWatermarkEstimatorState - public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) { + public Instant getInitialWatermarkEstimatorState( + @Element PartitionMetadata partition, @Timestamp Instant currentElementTimestamp) { + LOG.info("[" + partition.getPartitionToken() + "] Get initial watermark estimator"); return currentElementTimestamp; } @NewWatermarkEstimator public ManualWatermarkEstimator newWatermarkEstimator( + @Element PartitionMetadata partition, @WatermarkEstimatorState Instant watermarkEstimatorState) { + LOG.info("[" + partition.getPartitionToken() + "] New watermark estimator"); return new Manual(watermarkEstimatorState); } @GetInitialRestriction - public PartitionRestriction initialRestriction(@Element PartitionMetadata element) { + public PartitionRestriction initialRestriction(@Element PartitionMetadata partition) { + LOG.info("[" + partition.getPartitionToken() + "] Initial restriction"); return new PartitionRestriction( - element.getStartTimestamp(), element.getEndTimestamp(), PartitionMode.QUERY_CHANGE_STREAM); + partition.getStartTimestamp(), + partition.getEndTimestamp(), + PartitionMode.QUERY_CHANGE_STREAM); } @NewTracker - public PartitionRestrictionTracker newTracker(@Restriction PartitionRestriction restriction) { + public PartitionRestrictionTracker newTracker( + @Element PartitionMetadata partition, @Restriction PartitionRestriction restriction) { + LOG.info("[" + partition.getPartitionToken() + "] New tracker"); return new PartitionRestrictionTracker(restriction); } @@ -142,14 +150,11 @@ public ProcessContinuation processElement( RestrictionTracker tracker, OutputReceiver receiver, ManualWatermarkEstimator watermarkEstimator) { - MDC.put("partitionToken", partition.getPartitionToken()); - LOG.info( - "Processing element " - + partition.getPartitionToken() - + " with restriction " - + tracker.currentRestriction()); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Processing element with restriction " + tracker.currentRestriction()); - switch (tracker.currentRestriction().getMode()) { + final PartitionMode mode = tracker.currentRestriction().getMode(); + switch (mode) { case QUERY_CHANGE_STREAM: return queryChangeStream(partition, tracker, receiver, watermarkEstimator); case WAIT_FOR_CHILD_PARTITIONS: @@ -164,8 +169,8 @@ public ProcessContinuation processElement( return done(partition, tracker); default: // TODO: Verify what to do here - throw new IllegalArgumentException( - "Unknown mode " + tracker.currentRestriction().getMode()); + LOG.error("[" + token + "] Unknown mode " + mode); + throw new IllegalArgumentException("Unknown mode " + mode); } } @@ -212,9 +217,10 @@ private ProcessContinuation queryChangeStream( RestrictionTracker tracker, OutputReceiver receiver, ManualWatermarkEstimator watermarkEstimator) { + final String token = partition.getPartitionToken(); try (ResultSet resultSet = changeStreamDao.changeStreamQuery( - partition.getPartitionToken(), + token, tracker.currentRestriction().getStartTimestamp(), partition.isInclusiveStart(), partition.getEndTimestamp(), @@ -224,7 +230,7 @@ private ProcessContinuation queryChangeStream( // TODO: Check what should we do if there is an error here final List records = changeStreamRecordMapper.toChangeStreamRecords( - partition.getPartitionToken(), resultSet.getCurrentRowAsStruct()); + token, resultSet.getCurrentRowAsStruct()); LOG.debug("Mapped records: " + records); Optional maybeContinuation; @@ -232,19 +238,22 @@ private ProcessContinuation queryChangeStream( if (record instanceof DataChangeRecord) { maybeContinuation = dataChangeRecordAction.run( - (DataChangeRecord) record, tracker, receiver, watermarkEstimator); + partition, (DataChangeRecord) record, tracker, receiver, watermarkEstimator); } else if (record instanceof HeartbeatRecord) { maybeContinuation = - heartbeatRecordAction.run((HeartbeatRecord) record, tracker, watermarkEstimator); + heartbeatRecordAction.run( + partition, (HeartbeatRecord) record, tracker, watermarkEstimator); } else if (record instanceof ChildPartitionsRecord) { maybeContinuation = childPartitionsRecordAction.run( - (ChildPartitionsRecord) record, partition, tracker, watermarkEstimator); + partition, (ChildPartitionsRecord) record, tracker, watermarkEstimator); } else { + LOG.error("[" + token + "] Unknown record type " + record.getClass()); // FIXME: Check what should we do if the record is unknown throw new IllegalArgumentException("Unknown record type " + record.getClass()); } if (maybeContinuation.isPresent()) { + LOG.info("[" + token + "] Continuation present, returning " + maybeContinuation); return maybeContinuation.get(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java index ac053c2d2e4f2..2cfc9b1f796c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java @@ -49,22 +49,24 @@ public ChildPartitionsRecordAction(PartitionMetadataDao partitionMetadataDao) { } public Optional run( - ChildPartitionsRecord record, PartitionMetadata partition, + ChildPartitionsRecord record, RestrictionTracker tracker, ManualWatermarkEstimator watermarkEstimator) { - LOG.info("Processing child partition record " + record); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Processing child partition record " + record); final Timestamp startTimestamp = record.getStartTimestamp(); if (!tracker.tryClaim(PartitionPosition.queryChangeStream(startTimestamp))) { - LOG.info("Could not claim, stopping"); + LOG.info( + "[" + token + "] Could not claim queryChangeStream(" + startTimestamp + "), stopping"); return Optional.of(ProcessContinuation.stop()); } watermarkEstimator.setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); for (ChildPartition childPartition : record.getChildPartitions()) { if (isSplit(childPartition)) { - LOG.info("Processing child partition split event"); + LOG.info("[" + token + "] Processing child partition split event"); PARTITION_SPLIT_COUNTER.inc(); final PartitionMetadata row = @@ -78,7 +80,7 @@ public Optional run( // TODO: Make sure this does not fail if the rows already exist partitionMetadataDao.insert(row); } else { - LOG.info("Processing child partition merge event"); + LOG.info("[" + token + "] Processing child partition merge event"); PARTITION_MERGE_COUNTER.inc(); partitionMetadataDao.runInTransaction( @@ -88,7 +90,11 @@ public Optional run( childPartition.getParentTokens(), Collections.singletonList(FINISHED)); if (finishedParents == childPartition.getParentTokens().size() - 1) { - LOG.info("All parents are finished, inserting child partition " + childPartition); + LOG.info( + "[" + + token + + "] All parents are finished, inserting child partition " + + childPartition); transaction.insert( toPartitionMetadata( record.getStartTimestamp(), @@ -97,7 +103,9 @@ public Optional run( childPartition)); } else { LOG.info( - "At least one parent is not finished (" + "[" + + token + + "] At least one parent is not finished (" + "finishedParents = " + finishedParents + ", " @@ -111,7 +119,7 @@ public Optional run( } } - LOG.info("Child partitions action completed successfully"); + LOG.info("[" + token + "] Child partitions action completed successfully"); return Optional.empty(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordAction.java index 38495a91e16aa..e4e5091360400 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordAction.java @@ -20,6 +20,7 @@ import com.google.cloud.Timestamp; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionRestriction; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -35,22 +36,25 @@ public class DataChangeRecordAction { private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class); public Optional run( + PartitionMetadata partition, DataChangeRecord record, RestrictionTracker tracker, OutputReceiver outputReceiver, ManualWatermarkEstimator watermarkEstimator) { - LOG.info("Processing data record for " + record.getPartitionToken()); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Processing data record " + record.getCommitTimestamp()); final Timestamp commitTimestamp = record.getCommitTimestamp(); if (!tracker.tryClaim(PartitionPosition.queryChangeStream(commitTimestamp))) { - LOG.info("Could not claim, stopping"); + LOG.info( + "[" + token + "] Could not claim queryChangeStream(" + commitTimestamp + "), stopping"); return Optional.of(ProcessContinuation.stop()); } // TODO: Ask about this, do we need to output with timestamp? outputReceiver.output(record); watermarkEstimator.setWatermark(new Instant(commitTimestamp.toSqlTimestamp().getTime())); - LOG.info("Data record action completed successfully"); + LOG.info("[" + token + "] Data record action completed successfully"); return Optional.empty(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DeletePartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DeletePartitionAction.java index ddf91dd61cb4c..5e87282ed8af3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DeletePartitionAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DeletePartitionAction.java @@ -40,15 +40,16 @@ public DeletePartitionAction(PartitionMetadataDao partitionMetadataDao) { public Optional run( PartitionMetadata partition, RestrictionTracker tracker) { - LOG.info("Deleting partition " + partition.getPartitionToken()); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Deleting partition"); if (!tracker.tryClaim(PartitionPosition.deletePartition())) { - LOG.info("Could not claim, stopping"); + LOG.info("[" + token + "] Could not claim deletePartition(), stopping"); return Optional.of(ProcessContinuation.stop()); } - partitionMetadataDao.delete(partition.getPartitionToken()); + partitionMetadataDao.delete(token); - LOG.info("Delete partition action completed successfully"); + LOG.info("[" + token + "] Delete partition action completed successfully"); return Optional.empty(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DonePartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DonePartitionAction.java index 9b11d35a87683..e2da4e41927d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DonePartitionAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DonePartitionAction.java @@ -32,14 +32,15 @@ public class DonePartitionAction { public ProcessContinuation run( PartitionMetadata partition, RestrictionTracker tracker) { - LOG.info("Marking partition " + partition.getPartitionToken() + " as done"); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Marking partition as done"); if (!tracker.tryClaim(PartitionPosition.done())) { - LOG.info("Could not claim, stopping"); + LOG.info("[" + token + "] Could not claim done(), stopping"); return ProcessContinuation.stop(); } - LOG.info("Done partition action completed successfully"); + LOG.info("[" + token + "] Done partition action completed successfully"); return ProcessContinuation.stop(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/FinishPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/FinishPartitionAction.java index 4dc26da2bf5b3..83efd6ed36674 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/FinishPartitionAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/FinishPartitionAction.java @@ -42,16 +42,17 @@ public FinishPartitionAction(PartitionMetadataDao partitionMetadataDao) { public Optional run( PartitionMetadata partition, RestrictionTracker tracker) { - LOG.info("Finishing partition " + partition.getPartitionToken()); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Finishing partition"); if (!tracker.tryClaim(PartitionPosition.finishPartition())) { - LOG.info("Could not claim, stopping"); + LOG.info("[" + token + "] Could not claim finishPartition(), stopping"); return Optional.of(ProcessContinuation.stop()); } - partitionMetadataDao.updateState(partition.getPartitionToken(), FINISHED); + partitionMetadataDao.updateState(token, FINISHED); - LOG.info("Finish partition action completed successfully"); + LOG.info("[" + token + "] Finish partition action completed successfully"); return Optional.empty(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordAction.java index 2311a46a62596..f6e44ef59fc10 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordAction.java @@ -20,6 +20,7 @@ import com.google.cloud.Timestamp; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.HeartbeatRecord; +import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionRestriction; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -34,19 +35,21 @@ public class HeartbeatRecordAction { private static final Logger LOG = LoggerFactory.getLogger(HeartbeatRecordAction.class); public Optional run( + PartitionMetadata partition, HeartbeatRecord record, RestrictionTracker tracker, ManualWatermarkEstimator watermarkEstimator) { - LOG.info("Processing heartbeat record " + record); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Processing heartbeat record " + record); final Timestamp timestamp = record.getTimestamp(); if (!tracker.tryClaim(PartitionPosition.queryChangeStream(timestamp))) { - LOG.info("Could not claim, stopping"); + LOG.info("[" + token + "] Could not claim queryChangeStream(" + timestamp + "), stopping"); return Optional.of(ProcessContinuation.stop()); } watermarkEstimator.setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); - LOG.info("Heartbeat record action completed successfully"); + LOG.info("[" + token + "] Heartbeat record action completed successfully"); return Optional.empty(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java index 8bd98c0d3803e..4b40ca04e7cf4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java @@ -47,26 +47,29 @@ public WaitForChildPartitionsAction( public Optional run( PartitionMetadata partition, RestrictionTracker tracker) { - LOG.info("Waiting for child partitions for " + partition.getPartitionToken()); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Waiting for child partitions"); if (!tracker.tryClaim(PartitionPosition.waitForChildPartitions())) { - LOG.info("Could not claim, stopping"); + LOG.info("[" + token + "] Could not claim waitForChildPartitions(), stopping"); return Optional.of(ProcessContinuation.stop()); } long numberOfUnscheduledChildren = partitionMetadataDao.countChildPartitionsInStates( - partition.getPartitionToken(), Collections.singletonList(CREATED)); - LOG.info("Number of unscheduled children is " + numberOfUnscheduledChildren); + token, Collections.singletonList(CREATED)); + LOG.info("[" + token + "] Number of unscheduled children is " + numberOfUnscheduledChildren); if (numberOfUnscheduledChildren > 0) { LOG.info( - "Resuming, there are " + "[" + + token + + " ] Resuming, there are " + numberOfUnscheduledChildren + " unscheduled / not finished children"); return Optional.of(ProcessContinuation.resume().withResumeDelay(resumeDuration)); } - LOG.info("Wait for child partitions action completed successfully"); + LOG.info("[" + token + "] Wait for child partitions action completed successfully"); return Optional.empty(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForParentPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForParentPartitionsAction.java index fea8ec3aee92d..6e04842dda8ab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForParentPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForParentPartitionsAction.java @@ -44,21 +44,26 @@ public WaitForParentPartitionsAction( public Optional run( PartitionMetadata partition, RestrictionTracker tracker) { - LOG.info("Waiting for parent partitions of " + partition.getPartitionToken()); + final String token = partition.getPartitionToken(); + LOG.info("[" + token + "] Waiting for parent partitions"); if (!tracker.tryClaim(PartitionPosition.waitForParentPartitions())) { - LOG.info("Could not claim, stopping"); + LOG.info("[" + token + "] Could not claim waitForParentPartitions(), stopping"); return Optional.of(ProcessContinuation.stop()); } - long numberOfExistingParents = - partitionMetadataDao.countExistingParents(partition.getPartitionToken()); - LOG.info("Number of existing parents is " + numberOfExistingParents); + long numberOfExistingParents = partitionMetadataDao.countExistingParents(token); + LOG.info("[" + token + "] Number of existing parents is " + numberOfExistingParents); if (numberOfExistingParents > 0) { - LOG.info("Parents still exist (" + numberOfExistingParents + " parents), resuming"); + LOG.info( + "[" + + token + + "] Parents still exist (" + + numberOfExistingParents + + " parents), resuming"); return Optional.of(ProcessContinuation.resume().withResumeDelay(resumeDuration)); } - LOG.info("Wait for parent partitions action completed successfully"); + LOG.info("[" + token + "] Wait for parent partitions action completed successfully"); return Optional.empty(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/model/DataChangeRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/model/DataChangeRecord.java index 499b128949cc1..957f3e7fea1b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/model/DataChangeRecord.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/model/DataChangeRecord.java @@ -45,6 +45,7 @@ public class DataChangeRecord implements ChangeStreamRecord { private ValueCaptureType valueCaptureType; private long numberOfRecordsInTransaction; private long numberOfPartitionsInTransaction; + private Metadata metadata; /** Default constructor for serialization only. */ private DataChangeRecord() {} @@ -74,6 +75,7 @@ public DataChangeRecord( this.valueCaptureType = valueCaptureType; this.numberOfRecordsInTransaction = numberOfRecordsInTransaction; this.numberOfPartitionsInTransaction = numberOfPartitionsInTransaction; + this.metadata = new Metadata(Timestamp.now()); } public String getPartitionToken() { @@ -124,6 +126,10 @@ public long getNumberOfPartitionsInTransaction() { return numberOfPartitionsInTransaction; } + public Metadata getMetadata() { + return metadata; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -195,6 +201,31 @@ public String toString() { + numberOfRecordsInTransaction + ", numberOfPartitionsInTransaction=" + numberOfPartitionsInTransaction + + ", metadata=" + + metadata + '}'; } + + @DefaultCoder(AvroCoder.class) + public static class Metadata { + + @AvroEncode(using = TimestampEncoding.class) + private Timestamp readAt; + + /** Default constructor for serialization only. */ + private Metadata() {} + + public Metadata(Timestamp readAt) { + this.readAt = readAt; + } + + public Timestamp getReadAt() { + return readAt; + } + + @Override + public String toString() { + return "Metadata{" + "readAt=" + readAt + '}'; + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java index a41c2e2c94dfd..38ecf5ede28f1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java @@ -177,10 +177,10 @@ public void testQueryChangeStreamModeWithDataChangeRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(PARTITION_TOKEN, rowAsStruct)) .thenReturn(Arrays.asList(record1, record2)); when(dataChangeRecordAction.run( - record1, restrictionTracker, outputReceiver, watermarkEstimator)) + partition, record1, restrictionTracker, outputReceiver, watermarkEstimator)) .thenReturn(Optional.empty()); when(dataChangeRecordAction.run( - record2, restrictionTracker, outputReceiver, watermarkEstimator)) + partition, record2, restrictionTracker, outputReceiver, watermarkEstimator)) .thenReturn(Optional.of(ProcessContinuation.stop())); final ProcessContinuation result = @@ -188,11 +188,11 @@ public void testQueryChangeStreamModeWithDataChangeRecord() { assertEquals(ProcessContinuation.stop(), result); verify(dataChangeRecordAction) - .run(record1, restrictionTracker, outputReceiver, watermarkEstimator); + .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator); verify(dataChangeRecordAction) - .run(record2, restrictionTracker, outputReceiver, watermarkEstimator); + .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); @@ -221,19 +221,19 @@ public void testQueryChangeStreamModeWithHeartbeatRecord() { when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct); when(changeStreamRecordMapper.toChangeStreamRecords(PARTITION_TOKEN, rowAsStruct)) .thenReturn(Arrays.asList(record1, record2)); - when(heartbeatRecordAction.run(record1, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run(partition, record1, restrictionTracker, watermarkEstimator)) .thenReturn(Optional.empty()); - when(heartbeatRecordAction.run(record2, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run(partition, record2, restrictionTracker, watermarkEstimator)) .thenReturn(Optional.of(ProcessContinuation.stop())); final ProcessContinuation result = doFn.processElement(partition, restrictionTracker, outputReceiver, watermarkEstimator); assertEquals(ProcessContinuation.stop(), result); - verify(heartbeatRecordAction).run(record1, restrictionTracker, watermarkEstimator); - verify(heartbeatRecordAction).run(record2, restrictionTracker, watermarkEstimator); + verify(heartbeatRecordAction).run(partition, record1, restrictionTracker, watermarkEstimator); + verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); @@ -263,10 +263,10 @@ public void testQueryChangeStreamModeWithChildPartitionsRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(PARTITION_TOKEN, rowAsStruct)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - record1, partition, restrictionTracker, watermarkEstimator)) + partition, record1, restrictionTracker, watermarkEstimator)) .thenReturn(Optional.empty()); when(childPartitionsRecordAction.run( - record2, partition, restrictionTracker, watermarkEstimator)) + partition, record2, restrictionTracker, watermarkEstimator)) .thenReturn(Optional.of(ProcessContinuation.resume())); final ProcessContinuation result = @@ -274,12 +274,12 @@ public void testQueryChangeStreamModeWithChildPartitionsRecord() { assertEquals(ProcessContinuation.resume(), result); verify(childPartitionsRecordAction) - .run(record1, partition, restrictionTracker, watermarkEstimator); + .run(partition, record1, restrictionTracker, watermarkEstimator); verify(childPartitionsRecordAction) - .run(record2, partition, restrictionTracker, watermarkEstimator); + .run(partition, record2, restrictionTracker, watermarkEstimator); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); @@ -313,8 +313,8 @@ public void testQueryChangeStreamModeWithStreamFinished() { verify(deletePartitionAction).run(partition, restrictionTracker); verify(donePartitionAction).run(partition, restrictionTracker); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); } @@ -330,8 +330,8 @@ public void testWaitForChildPartitionsMode() { assertEquals(ProcessContinuation.stop(), result); verify(waitForChildPartitionsAction).run(partition, restrictionTracker); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); @@ -352,8 +352,8 @@ public void testFinishPartitionMode() { assertEquals(ProcessContinuation.stop(), result); verify(finishPartitionAction).run(partition, restrictionTracker); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); @@ -374,8 +374,8 @@ public void testWaitForParentPartitionsMode() { assertEquals(ProcessContinuation.stop(), result); verify(waitForParentPartitionsAction).run(partition, restrictionTracker); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); @@ -396,8 +396,8 @@ public void testDeletePartitionMode() { assertEquals(ProcessContinuation.stop(), result); verify(deletePartitionAction).run(partition, restrictionTracker); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); @@ -417,8 +417,8 @@ public void testDoneMode() { assertEquals(ProcessContinuation.stop(), result); verify(donePartitionAction).run(partition, restrictionTracker); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java index aa2295c81ef48..4e72a843147eb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java @@ -82,7 +82,7 @@ public void testRestrictionClaimedAndIsSplitCase() { when(tracker.tryClaim(PartitionPosition.queryChangeStream(startTimestamp))).thenReturn(true); final Optional maybeContinuation = - action.run(record, partition, tracker, watermarkEstimator); + action.run(partition, record, tracker, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -136,7 +136,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndAllParentsFinished() { .thenReturn(1L); final Optional maybeContinuation = - action.run(record, partition, tracker, watermarkEstimator); + action.run(partition, record, tracker, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -178,7 +178,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndAtLeastOneParentIsNotFinished .thenReturn(0L); final Optional maybeContinuation = - action.run(record, partition, tracker, watermarkEstimator); + action.run(partition, record, tracker, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -200,7 +200,7 @@ public void testRestrictionNotClaimed() { when(tracker.tryClaim(PartitionPosition.queryChangeStream(startTimestamp))).thenReturn(false); final Optional maybeContinuation = - action.run(record, partition, tracker, watermarkEstimator); + action.run(partition, record, tracker, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordActionTest.java index f7e13ec291021..532046f6103df 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/DataChangeRecordActionTest.java @@ -27,6 +27,7 @@ import com.google.cloud.Timestamp; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord; +import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionRestriction; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -40,6 +41,7 @@ public class DataChangeRecordActionTest { private DataChangeRecordAction action; + private PartitionMetadata partition; private RestrictionTracker tracker; private OutputReceiver outputReceiver; private ManualWatermarkEstimator watermarkEstimator; @@ -47,6 +49,7 @@ public class DataChangeRecordActionTest { @Before public void setUp() { action = new DataChangeRecordAction(); + partition = mock(PartitionMetadata.class); tracker = mock(RestrictionTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); @@ -60,7 +63,7 @@ public void testRestrictionClaimed() { when(tracker.tryClaim(PartitionPosition.queryChangeStream(timestamp))).thenReturn(true); final Optional maybeContinuation = - action.run(record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, outputReceiver, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(outputReceiver).output(record); @@ -75,7 +78,7 @@ public void testRestrictionNotClaimed() { when(tracker.tryClaim(PartitionPosition.queryChangeStream(timestamp))).thenReturn(false); final Optional maybeContinuation = - action.run(record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, outputReceiver, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(outputReceiver, never()).output(any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordActionTest.java index d014df6ef54cd..fbceba971bc15 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/HeartbeatRecordActionTest.java @@ -27,6 +27,7 @@ import com.google.cloud.Timestamp; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.HeartbeatRecord; +import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition; import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionRestriction; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -39,12 +40,14 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; - private ManualWatermarkEstimator watermarkEstimator; + private PartitionMetadata partition; private RestrictionTracker tracker; + private ManualWatermarkEstimator watermarkEstimator; @Before public void setUp() { action = new HeartbeatRecordAction(); + partition = mock(PartitionMetadata.class); tracker = mock(RestrictionTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -56,7 +59,7 @@ public void testRestrictionClaimed() { when(tracker.tryClaim(PartitionPosition.queryChangeStream(timestamp))).thenReturn(true); final Optional maybeContinuation = - action.run(new HeartbeatRecord(timestamp), tracker, watermarkEstimator); + action.run(partition, new HeartbeatRecord(timestamp), tracker, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); @@ -69,7 +72,7 @@ public void testRestrictionNotClaimed() { when(tracker.tryClaim(PartitionPosition.queryChangeStream(timestamp))).thenReturn(false); final Optional maybeContinuation = - action.run(new HeartbeatRecord(timestamp), tracker, watermarkEstimator); + action.run(partition, new HeartbeatRecord(timestamp), tracker, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any());