diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java index 63df28f0cf7f9..fdd5e27e3e79e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java @@ -105,10 +105,7 @@ public ManualWatermarkEstimator newWatermarkEstimator( @GetInitialRestriction public PartitionRestriction initialRestriction(@Element PartitionMetadata element) { return new PartitionRestriction( - element.getStartTimestamp(), - element.getEndTimestamp(), - PartitionMode.QUERY_CHANGE_STREAM, - null); + element.getStartTimestamp(), element.getEndTimestamp(), PartitionMode.QUERY_CHANGE_STREAM); } @NewTracker @@ -134,8 +131,7 @@ public void setup() { this.dataChangeRecordAction = actionFactory.dataChangeRecordAction(); this.heartbeatRecordAction = actionFactory.heartbeatRecordAction(); this.childPartitionsRecordAction = - actionFactory.childPartitionsRecordAction( - partitionMetadataDao, waitForChildPartitionsAction); + actionFactory.childPartitionsRecordAction(partitionMetadataDao); } // TODO: Close DAOs on teardown @@ -179,33 +175,35 @@ public ProcessContinuation processElement( * The states will be stored in the {@link PartitionRestriction} and claimed through the * {@link PartitionPosition}. * - * HEARTBEAT RECORD - * DATA CHANGE RECORD - * |---------------------| - * v | - * +---------------------+ | - * | QUERY_CHANGE_STREAM |----------| - * +---------------------+ - * | - * CHILD_PARTITION_RECORD | NO MORE RECORDS - * |----------------------------------------------| - * v v - * +---------------------------+ +------------------+ - * | WAIT_FOR_CHILD_PARTITIONS |---------------------->| FINISH_PARTITION | - * +---------------------------+ +------------------+ - * |--------------------| - * v - * +----------------------------+ - * | WAIT_FOR_PARENT_PARTITIONS | - * +----------------------------+ - * v - * +------------------+ - * | DELETE_PARTITION | - * +------------------+ - * v - * +------+ - * | DONE | - * +------+ + * HEARTBEAT RECORD + * DATA CHANGE RECORD + * CHILD PARTITION RECORD + * |---------------------| + * v | + * +---------------------+ | + * | QUERY_CHANGE_STREAM |----------| + * +---------------------+ + * | NO MORE RECORDS + * v + * +---------------------------+ + * | WAIT_FOR_CHILD_PARTITIONS | + * +---------------------------+ + * v + * +------------------+ + * | FINISH_PARTITION | + * +------------------+ + * v + * +----------------------------+ + * | WAIT_FOR_PARENT_PARTITIONS | + * +----------------------------+ + * v + * +------------------+ + * | DELETE_PARTITION | + * +------------------+ + * v + * +------+ + * | DONE | + * +------+ * */ // spotless:on @@ -252,17 +250,15 @@ private ProcessContinuation queryChangeStream( } } - return finishPartition(partition, tracker); + return waitForChildPartitions(partition, tracker); } } private ProcessContinuation waitForChildPartitions( PartitionMetadata partition, RestrictionTracker tracker) { - final Long childPartitionsToWaitFor = - tracker.currentRestriction().getChildPartitionsToWaitFor(); return waitForChildPartitionsAction - .run(partition, tracker, childPartitionsToWaitFor) + .run(partition, tracker) .orElseGet(() -> finishPartition(partition, tracker)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ActionFactory.java index 0bc0081aac456..ef2feeeefec93 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ActionFactory.java @@ -52,11 +52,9 @@ public synchronized HeartbeatRecordAction heartbeatRecordAction() { // TODO: See if synchronized is a bottleneck and refactor if so public synchronized ChildPartitionsRecordAction childPartitionsRecordAction( - PartitionMetadataDao partitionMetadataDao, - WaitForChildPartitionsAction waitForChildPartitionsAction) { + PartitionMetadataDao partitionMetadataDao) { if (childPartitionsRecordActionInstance == null) { - childPartitionsRecordActionInstance = - new ChildPartitionsRecordAction(partitionMetadataDao, waitForChildPartitionsAction); + childPartitionsRecordActionInstance = new ChildPartitionsRecordAction(partitionMetadataDao); } return childPartitionsRecordActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java index 0bad2183a2fde..ac053c2d2e4f2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordAction.java @@ -43,13 +43,9 @@ public class ChildPartitionsRecordAction { private static final Logger LOG = LoggerFactory.getLogger(ChildPartitionsRecordAction.class); private final PartitionMetadataDao partitionMetadataDao; - private final WaitForChildPartitionsAction waitForChildPartitionsAction; - public ChildPartitionsRecordAction( - PartitionMetadataDao partitionMetadataDao, - WaitForChildPartitionsAction waitForChildPartitionsAction) { + public ChildPartitionsRecordAction(PartitionMetadataDao partitionMetadataDao) { this.partitionMetadataDao = partitionMetadataDao; - this.waitForChildPartitionsAction = waitForChildPartitionsAction; } public Optional run( @@ -116,8 +112,7 @@ public Optional run( } LOG.info("Child partitions action completed successfully"); - // Needs to hold the watermark until all my children have finished - return waitForChildPartitionsAction.run(partition, tracker, record.getChildPartitions().size()); + return Optional.empty(); } private boolean isSplit(ChildPartition childPartition) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java index 71dd64f757a14..8bd98c0d3803e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsAction.java @@ -17,10 +17,9 @@ */ package org.apache.beam.sdk.io.gcp.spanner.cdc.actions; -import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.FINISHED; -import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.SCHEDULED; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.CREATED; -import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata; @@ -47,29 +46,23 @@ public WaitForChildPartitionsAction( public Optional run( PartitionMetadata partition, - RestrictionTracker tracker, - long childPartitionsToWaitFor) { + RestrictionTracker tracker) { LOG.info("Waiting for child partitions for " + partition.getPartitionToken()); - if (!tracker.tryClaim(PartitionPosition.waitForChildPartitions(childPartitionsToWaitFor))) { + if (!tracker.tryClaim(PartitionPosition.waitForChildPartitions())) { LOG.info("Could not claim, stopping"); return Optional.of(ProcessContinuation.stop()); } - long numberOfFinishedChildren = + long numberOfUnscheduledChildren = partitionMetadataDao.countChildPartitionsInStates( - partition.getPartitionToken(), Arrays.asList(SCHEDULED, FINISHED)); - LOG.info( - "Number of finished children is " - + numberOfFinishedChildren - + " and expected children to wait for is " - + childPartitionsToWaitFor); - if (numberOfFinishedChildren < childPartitionsToWaitFor) { + partition.getPartitionToken(), Collections.singletonList(CREATED)); + LOG.info("Number of unscheduled children is " + numberOfUnscheduledChildren); + if (numberOfUnscheduledChildren > 0) { LOG.info( - "Resuming, not all children are scheduled / finished (only " - + numberOfFinishedChildren - + " of " - + childPartitionsToWaitFor - + ")"); + "Resuming, there are " + + numberOfUnscheduledChildren + + " unscheduled / not finished children"); + return Optional.of(ProcessContinuation.resume().withResumeDelay(resumeDuration)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPosition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPosition.java index eb8b7729e4db6..26c4a202e04b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPosition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPosition.java @@ -27,53 +27,42 @@ public class PartitionPosition implements Serializable { private static final long serialVersionUID = -9088898012221404492L; + private final Optional maybeTimestamp; private final PartitionMode mode; - private final Optional maybeChildPartitionsToWaitFor; public static PartitionPosition queryChangeStream(Timestamp timestamp) { - return new PartitionPosition( - Optional.of(timestamp), PartitionMode.QUERY_CHANGE_STREAM, Optional.empty()); + return new PartitionPosition(Optional.of(timestamp), PartitionMode.QUERY_CHANGE_STREAM); } - public static PartitionPosition waitForChildPartitions(long childPartitionsToWaitFor) { - return new PartitionPosition( - Optional.empty(), - PartitionMode.WAIT_FOR_CHILD_PARTITIONS, - Optional.of(childPartitionsToWaitFor)); + public static PartitionPosition waitForChildPartitions() { + return new PartitionPosition(Optional.empty(), PartitionMode.WAIT_FOR_CHILD_PARTITIONS); } public static PartitionPosition finishPartition() { - return new PartitionPosition( - Optional.empty(), PartitionMode.FINISH_PARTITION, Optional.empty()); + return new PartitionPosition(Optional.empty(), PartitionMode.FINISH_PARTITION); } public static PartitionPosition waitForParentPartitions() { - return new PartitionPosition( - Optional.empty(), PartitionMode.WAIT_FOR_PARENT_PARTITIONS, Optional.empty()); + return new PartitionPosition(Optional.empty(), PartitionMode.WAIT_FOR_PARENT_PARTITIONS); } public static PartitionPosition deletePartition() { - return new PartitionPosition( - Optional.empty(), PartitionMode.DELETE_PARTITION, Optional.empty()); + return new PartitionPosition(Optional.empty(), PartitionMode.DELETE_PARTITION); } public static PartitionPosition done() { - return new PartitionPosition(Optional.empty(), PartitionMode.DONE, Optional.empty()); + return new PartitionPosition(Optional.empty(), PartitionMode.DONE); } public static PartitionPosition stop() { - return new PartitionPosition(Optional.empty(), PartitionMode.STOP, Optional.empty()); + return new PartitionPosition(Optional.empty(), PartitionMode.STOP); } @VisibleForTesting - protected PartitionPosition( - Optional maybeTimestamp, - PartitionMode mode, - Optional maybeChildPartitionsToWaitFor) { + protected PartitionPosition(Optional maybeTimestamp, PartitionMode mode) { this.maybeTimestamp = maybeTimestamp; this.mode = mode; - this.maybeChildPartitionsToWaitFor = maybeChildPartitionsToWaitFor; } public Optional getTimestamp() { @@ -84,10 +73,6 @@ public PartitionMode getMode() { return mode; } - public Optional getChildPartitionsToWaitFor() { - return maybeChildPartitionsToWaitFor; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -97,25 +82,16 @@ public boolean equals(Object o) { return false; } PartitionPosition that = (PartitionPosition) o; - return Objects.equals(maybeTimestamp, that.maybeTimestamp) - && mode == that.mode - && Objects.equals(maybeChildPartitionsToWaitFor, that.maybeChildPartitionsToWaitFor); + return Objects.equals(maybeTimestamp, that.maybeTimestamp) && mode == that.mode; } @Override public int hashCode() { - return Objects.hash(maybeTimestamp, mode, maybeChildPartitionsToWaitFor); + return Objects.hash(maybeTimestamp, mode); } @Override public String toString() { - return "PartitionPosition{" - + "maybeTimestamp=" - + maybeTimestamp - + ", mode=" - + mode - + ", maybeChildPartitionsToWaitFor=" - + maybeChildPartitionsToWaitFor - + '}'; + return "PartitionPosition{" + "maybeTimestamp=" + maybeTimestamp + ", mode=" + mode + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java index 8a5bc3eace89d..e3d00598f57eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java @@ -37,47 +37,41 @@ public class PartitionRestriction implements Serializable { private final Timestamp startTimestamp; private final Timestamp endTimestamp; private final PartitionMode mode; - private final Long childPartitionsToWaitFor; public static PartitionRestriction queryChangeStream( Timestamp startTimestamp, Timestamp endTimestamp) { - return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM, null); + return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM); } - public static PartitionRestriction waitForChildPartitions(Long childPartitionsToWaitFor) { - return new PartitionRestriction( - null, null, WAIT_FOR_CHILD_PARTITIONS, childPartitionsToWaitFor); + public static PartitionRestriction waitForChildPartitions() { + return new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS); } public static PartitionRestriction finishPartition() { - return new PartitionRestriction(null, null, FINISH_PARTITION, null); + return new PartitionRestriction(null, null, FINISH_PARTITION); } public static PartitionRestriction waitForParentPartitions() { - return new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS, null); + return new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS); } public static PartitionRestriction deletePartition() { - return new PartitionRestriction(null, null, DELETE_PARTITION, null); + return new PartitionRestriction(null, null, DELETE_PARTITION); } public static PartitionRestriction done() { - return new PartitionRestriction(null, null, DONE, null); + return new PartitionRestriction(null, null, DONE); } public static PartitionRestriction stop() { - return new PartitionRestriction(null, null, STOP, null); + return new PartitionRestriction(null, null, STOP); } public PartitionRestriction( - Timestamp startTimestamp, - Timestamp endTimestamp, - PartitionMode mode, - Long childPartitionsToWaitFor) { + Timestamp startTimestamp, Timestamp endTimestamp, PartitionMode mode) { this.startTimestamp = startTimestamp; this.endTimestamp = endTimestamp; this.mode = mode; - this.childPartitionsToWaitFor = childPartitionsToWaitFor; } public Timestamp getStartTimestamp() { @@ -92,10 +86,6 @@ public PartitionMode getMode() { return mode; } - public Long getChildPartitionsToWaitFor() { - return childPartitionsToWaitFor; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -107,13 +97,12 @@ public boolean equals(Object o) { PartitionRestriction that = (PartitionRestriction) o; return Objects.equals(startTimestamp, that.startTimestamp) && Objects.equals(endTimestamp, that.endTimestamp) - && mode == that.mode - && Objects.equals(childPartitionsToWaitFor, that.childPartitionsToWaitFor); + && mode == that.mode; } @Override public int hashCode() { - return Objects.hash(startTimestamp, endTimestamp, mode, childPartitionsToWaitFor); + return Objects.hash(startTimestamp, endTimestamp, mode); } @Override @@ -125,8 +114,6 @@ public String toString() { + endTimestamp + ", mode=" + mode - + ", childPartitionsToWaitFor=" - + childPartitionsToWaitFor + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimer.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimer.java index 977f5db876987..a4c2f5aef4237 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimer.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimer.java @@ -88,14 +88,6 @@ public boolean tryClaim( toMode != QUERY_CHANGE_STREAM || position.getTimestamp().isPresent(), "%s mode must specify a timestamp (no value sent)", toMode); - checkArgument( - toMode != WAIT_FOR_CHILD_PARTITIONS - || position.getChildPartitionsToWaitFor().orElse(0L) > 0, - "%s mode must specify positive child partitions to wait for (value sent was %s)", - toMode, - position.getChildPartitionsToWaitFor()); - // TODO: Check if the position wait child partitions to wait for == lastClaimedPosition wait - // child partitions to wait for boolean tryClaimResult; switch (toMode) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java index 10b205c0c1fd4..44340b486c04e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java @@ -20,7 +20,6 @@ import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampFromMicros; import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampToMicros; import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM; -import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.WAIT_FOR_CHILD_PARTITIONS; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import com.google.cloud.Timestamp; @@ -52,11 +51,6 @@ public SplitResult trySplit( positionMode != QUERY_CHANGE_STREAM || lastClaimedPosition.getTimestamp().isPresent(), "%s mode must specify a timestamp (no value sent)", positionMode); - checkArgument( - positionMode != WAIT_FOR_CHILD_PARTITIONS - || lastClaimedPosition.getChildPartitionsToWaitFor().isPresent(), - "%s mode must specify the number of child partitions to wait for (no value sent)", - positionMode); SplitResult splitResult = null; switch (positionMode) { @@ -64,15 +58,12 @@ public SplitResult trySplit( splitResult = splitQueryChangeStream(fractionOfRemainder, restriction, lastClaimedPosition); break; case WAIT_FOR_CHILD_PARTITIONS: - final Long childPartitionsToWaitFor = - lastClaimedPosition.getChildPartitionsToWaitFor().get(); // If we need to split the wait for child partitions, we remain at the same mode. That is // because the primary restriction might resume and it might so happen that the residual // restriction gets scheduled before the primary. splitResult = SplitResult.of( - PartitionRestriction.stop(), - PartitionRestriction.waitForChildPartitions(childPartitionsToWaitFor)); + PartitionRestriction.stop(), PartitionRestriction.waitForChildPartitions()); break; case FINISH_PARTITION: splitResult = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java index f1a1318601099..a41c2e2c94dfd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFnTest.java @@ -20,7 +20,6 @@ import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -152,8 +151,7 @@ public void setUp() { when(actionFactory.donePartitionAction()).thenReturn(donePartitionAction); when(actionFactory.dataChangeRecordAction()).thenReturn(dataChangeRecordAction); when(actionFactory.heartbeatRecordAction()).thenReturn(heartbeatRecordAction); - when(actionFactory.childPartitionsRecordAction( - partitionMetadataDao, waitForChildPartitionsAction)) + when(actionFactory.childPartitionsRecordAction(partitionMetadataDao)) .thenReturn(childPartitionsRecordAction); doFn.setup(); @@ -196,7 +194,7 @@ public void testQueryChangeStreamModeWithDataChangeRecord() { verify(heartbeatRecordAction, never()).run(any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); verify(deletePartitionAction, never()).run(any(), any()); @@ -237,7 +235,7 @@ public void testQueryChangeStreamModeWithHeartbeatRecord() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); verify(deletePartitionAction, never()).run(any(), any()); @@ -282,7 +280,7 @@ public void testQueryChangeStreamModeWithChildPartitionsRecord() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); verify(deletePartitionAction, never()).run(any(), any()); @@ -309,6 +307,7 @@ public void testQueryChangeStreamModeWithStreamFinished() { doFn.processElement(partition, restrictionTracker, outputReceiver, watermarkEstimator); assertEquals(ProcessContinuation.stop(), result); + verify(waitForChildPartitionsAction).run(partition, restrictionTracker); verify(finishPartitionAction).run(partition, restrictionTracker); verify(waitForParentPartitionsAction).run(partition, restrictionTracker); verify(deletePartitionAction).run(partition, restrictionTracker); @@ -317,23 +316,19 @@ public void testQueryChangeStreamModeWithStreamFinished() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); } @Test public void testWaitForChildPartitionsMode() { - final long childPartitionsToWaitFor = 10L; when(restriction.getMode()).thenReturn(PartitionMode.WAIT_FOR_CHILD_PARTITIONS); - when(restriction.getChildPartitionsToWaitFor()).thenReturn(childPartitionsToWaitFor); - when(waitForChildPartitionsAction.run(any(), any(), anyLong())) + when(waitForChildPartitionsAction.run(any(), any())) .thenReturn(Optional.of(ProcessContinuation.stop())); final ProcessContinuation result = doFn.processElement(partition, restrictionTracker, outputReceiver, watermarkEstimator); assertEquals(ProcessContinuation.stop(), result); - verify(waitForChildPartitionsAction) - .run(partition, restrictionTracker, childPartitionsToWaitFor); + verify(waitForChildPartitionsAction).run(partition, restrictionTracker); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); @@ -360,7 +355,7 @@ public void testFinishPartitionMode() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); verify(deletePartitionAction, never()).run(any(), any()); verify(donePartitionAction, never()).run(any(), any()); @@ -382,7 +377,7 @@ public void testWaitForParentPartitionsMode() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(deletePartitionAction, never()).run(any(), any()); verify(donePartitionAction, never()).run(any(), any()); @@ -404,7 +399,7 @@ public void testDeletePartitionMode() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); verify(donePartitionAction, never()).run(any(), any()); @@ -425,7 +420,7 @@ public void testDoneMode() { verify(dataChangeRecordAction, never()).run(any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); - verify(waitForChildPartitionsAction, never()).run(any(), any(), anyLong()); + verify(waitForChildPartitionsAction, never()).run(any(), any()); verify(finishPartitionAction, never()).run(any(), any()); verify(waitForParentPartitionsAction, never()).run(any(), any()); verify(deletePartitionAction, never()).run(any(), any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java index 4d86135a7597c..aa2295c81ef48 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/ChildPartitionsRecordActionTest.java @@ -51,7 +51,6 @@ public class ChildPartitionsRecordActionTest { private PartitionMetadataDao dao; - private WaitForChildPartitionsAction waitForChildPartitionsAction; private ChildPartitionsRecordAction action; private RestrictionTracker tracker; private ManualWatermarkEstimator watermarkEstimator; @@ -59,8 +58,7 @@ public class ChildPartitionsRecordActionTest { @Before public void setUp() { dao = mock(PartitionMetadataDao.class); - waitForChildPartitionsAction = mock(WaitForChildPartitionsAction.class); - action = new ChildPartitionsRecordAction(dao, waitForChildPartitionsAction); + action = new ChildPartitionsRecordAction(dao); tracker = mock(RestrictionTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -82,7 +80,6 @@ public void testRestrictionClaimedAndIsSplitCase() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(tracker.tryClaim(PartitionPosition.queryChangeStream(startTimestamp))).thenReturn(true); - when(waitForChildPartitionsAction.run(partition, tracker, 2)).thenReturn(Optional.empty()); final Optional maybeContinuation = action.run(record, partition, tracker, watermarkEstimator); @@ -137,7 +134,6 @@ public void testRestrictionClaimedAndIsMergeCaseAndAllParentsFinished() { .thenAnswer(new TestTransactionAnswer(transaction)); when(transaction.countPartitionsInStates(parentTokens, Collections.singletonList(FINISHED))) .thenReturn(1L); - when(waitForChildPartitionsAction.run(partition, tracker, 1)).thenReturn(Optional.empty()); final Optional maybeContinuation = action.run(record, partition, tracker, watermarkEstimator); @@ -180,7 +176,6 @@ public void testRestrictionClaimedAndIsMergeCaseAndAtLeastOneParentIsNotFinished .thenAnswer(new TestTransactionAnswer(transaction)); when(transaction.countPartitionsInStates(parentTokens, Collections.singletonList(FINISHED))) .thenReturn(0L); - when(waitForChildPartitionsAction.run(partition, tracker, 1)).thenReturn(Optional.empty()); final Optional maybeContinuation = action.run(record, partition, tracker, watermarkEstimator); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsActionTest.java index 45e7c724b572f..d3a8ca5e6afec 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/actions/WaitForChildPartitionsActionTest.java @@ -17,13 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.spanner.cdc.actions; -import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.FINISHED; -import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.SCHEDULED; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.CREATED; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao; import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata; @@ -41,7 +40,6 @@ public class WaitForChildPartitionsActionTest { private PartitionMetadataDao dao; private Duration resumeDuration; private RestrictionTracker tracker; - private Long childPartitionsToWaitFor; @Before public void setUp() { @@ -49,21 +47,18 @@ public void setUp() { resumeDuration = Duration.millis(100L); action = new WaitForChildPartitionsAction(dao, resumeDuration); tracker = mock(RestrictionTracker.class); - childPartitionsToWaitFor = 10L; } @Test public void testRestrictionClaimedAndChildrenFinished() { final String partitionToken = "partitionToken"; final PartitionMetadata partition = mock(PartitionMetadata.class); - when(tracker.tryClaim(PartitionPosition.waitForChildPartitions(childPartitionsToWaitFor))) - .thenReturn(true); + when(tracker.tryClaim(PartitionPosition.waitForChildPartitions())).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(dao.countChildPartitionsInStates(partitionToken, Arrays.asList(SCHEDULED, FINISHED))) - .thenReturn(10L); + when(dao.countChildPartitionsInStates(partitionToken, Collections.singletonList(CREATED))) + .thenReturn(0L); - final Optional maybeContinuation = - action.run(partition, tracker, childPartitionsToWaitFor); + final Optional maybeContinuation = action.run(partition, tracker); assertEquals(Optional.empty(), maybeContinuation); } @@ -72,14 +67,12 @@ public void testRestrictionClaimedAndChildrenFinished() { public void testRestrictionClaimedAndChildrenNotFinished() { final String partitionToken = "partitionToken"; final PartitionMetadata partition = mock(PartitionMetadata.class); - when(tracker.tryClaim(PartitionPosition.waitForChildPartitions(childPartitionsToWaitFor))) - .thenReturn(true); + when(tracker.tryClaim(PartitionPosition.waitForChildPartitions())).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(dao.countChildPartitionsInStates(partitionToken, Arrays.asList(SCHEDULED, FINISHED))) - .thenReturn(9L); + when(dao.countChildPartitionsInStates(partitionToken, Collections.singletonList(CREATED))) + .thenReturn(1L); - final Optional maybeContinuation = - action.run(partition, tracker, childPartitionsToWaitFor); + final Optional maybeContinuation = action.run(partition, tracker); assertEquals( Optional.of(ProcessContinuation.resume().withResumeDelay(resumeDuration)), @@ -89,11 +82,9 @@ public void testRestrictionClaimedAndChildrenNotFinished() { @Test public void testRestrictionNotClaimed() { final PartitionMetadata partition = mock(PartitionMetadata.class); - when(tracker.tryClaim(PartitionPosition.waitForChildPartitions(childPartitionsToWaitFor))) - .thenReturn(false); + when(tracker.tryClaim(PartitionPosition.waitForChildPartitions())).thenReturn(false); - final Optional maybeContinuation = - action.run(partition, tracker, childPartitionsToWaitFor); + final Optional maybeContinuation = action.run(partition, tracker); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPositionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPositionTest.java index 97ddaa3915263..c7249623b85e0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPositionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionPositionTest.java @@ -43,47 +43,45 @@ public void setUp() { @Test public void testPositionQueryChangeStream() { assertEquals( - new PartitionPosition(Optional.of(timestamp), QUERY_CHANGE_STREAM, Optional.empty()), + new PartitionPosition(Optional.of(timestamp), QUERY_CHANGE_STREAM), PartitionPosition.queryChangeStream(timestamp)); } @Test public void testPositionWaitForChildPartitions() { assertEquals( - new PartitionPosition(Optional.empty(), WAIT_FOR_CHILD_PARTITIONS, Optional.of(20L)), - PartitionPosition.waitForChildPartitions(20L)); + new PartitionPosition(Optional.empty(), WAIT_FOR_CHILD_PARTITIONS), + PartitionPosition.waitForChildPartitions()); } @Test public void testPositionFinishPartition() { assertEquals( - new PartitionPosition(Optional.empty(), FINISH_PARTITION, Optional.empty()), + new PartitionPosition(Optional.empty(), FINISH_PARTITION), PartitionPosition.finishPartition()); } @Test public void testPositionWaitForParentPartitions() { assertEquals( - new PartitionPosition(Optional.empty(), WAIT_FOR_PARENT_PARTITIONS, Optional.empty()), + new PartitionPosition(Optional.empty(), WAIT_FOR_PARENT_PARTITIONS), PartitionPosition.waitForParentPartitions()); } @Test public void testPositionDeletePartition() { assertEquals( - new PartitionPosition(Optional.empty(), DELETE_PARTITION, Optional.empty()), + new PartitionPosition(Optional.empty(), DELETE_PARTITION), PartitionPosition.deletePartition()); } @Test public void testPositionDone() { - assertEquals( - new PartitionPosition(Optional.empty(), DONE, Optional.empty()), PartitionPosition.done()); + assertEquals(new PartitionPosition(Optional.empty(), DONE), PartitionPosition.done()); } @Test public void testPositionStop() { - assertEquals( - new PartitionPosition(Optional.empty(), STOP, Optional.empty()), PartitionPosition.stop()); + assertEquals(new PartitionPosition(Optional.empty(), STOP), PartitionPosition.stop()); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java index ececbc0e8caa5..9edf3cc3eca25 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java @@ -98,15 +98,7 @@ public void testQueryChangeStreamWithoutTimestamp() { claimer.tryClaim( PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE), PartitionPosition.queryChangeStream(Timestamp.MIN_VALUE), - new PartitionPosition(Optional.empty(), QUERY_CHANGE_STREAM, Optional.empty())); - } - - @Test(expected = IllegalArgumentException.class) - public void testWaitForChildPartitionsWithoutChildPartitionsToWaitFor() { - claimer.tryClaim( - PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE), - PartitionPosition.queryChangeStream(Timestamp.MIN_VALUE), - new PartitionPosition(Optional.empty(), WAIT_FOR_CHILD_PARTITIONS, Optional.empty())); + new PartitionPosition(Optional.empty(), QUERY_CHANGE_STREAM)); } @Test @@ -219,7 +211,7 @@ private PartitionRestriction partitionRestrictionFrom(PartitionMode mode) { case QUERY_CHANGE_STREAM: return PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE); case WAIT_FOR_CHILD_PARTITIONS: - return PartitionRestriction.waitForChildPartitions(10L); + return PartitionRestriction.waitForChildPartitions(); case FINISH_PARTITION: return PartitionRestriction.finishPartition(); case WAIT_FOR_PARENT_PARTITIONS: @@ -240,7 +232,7 @@ private PartitionPosition partitionPositionFrom(PartitionMode mode) { case QUERY_CHANGE_STREAM: return PartitionPosition.queryChangeStream(Timestamp.ofTimeSecondsAndNanos(1L, 0)); case WAIT_FOR_CHILD_PARTITIONS: - return PartitionPosition.waitForChildPartitions(10L); + return PartitionPosition.waitForChildPartitions(); case FINISH_PARTITION: return PartitionPosition.finishPartition(); case WAIT_FOR_PARENT_PARTITIONS: diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitCheckerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitCheckerTest.java index 819c31c72daf0..a1af5809170bf 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitCheckerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitCheckerTest.java @@ -69,7 +69,7 @@ public void testQueryChangeStreamNullLastAttemptedPosition() { public void testQueryChangeStreamNoPreviousTimestampOnLastAttemptedPosition() { final boolean isSplitAllowed = splitChecker.isSplitAllowed( - new PartitionPosition(Optional.empty(), QUERY_CHANGE_STREAM, Optional.empty()), + new PartitionPosition(Optional.empty(), QUERY_CHANGE_STREAM), PartitionPosition.queryChangeStream(Timestamp.ofTimeSecondsAndNanos(11L, 29))); assertFalse( @@ -81,7 +81,7 @@ public void testWaitForChildPartitions() { final boolean isSplitAllowed = splitChecker.isSplitAllowed( PartitionPosition.queryChangeStream(Timestamp.MAX_VALUE), - PartitionPosition.waitForChildPartitions(10L)); + PartitionPosition.waitForChildPartitions()); assertTrue(isSplitAllowed); } @@ -90,7 +90,7 @@ public void testWaitForChildPartitions() { public void testFinishPartition() { final boolean isSplitAllowed = splitChecker.isSplitAllowed( - PartitionPosition.waitForChildPartitions(10L), PartitionPosition.finishPartition()); + PartitionPosition.waitForChildPartitions(), PartitionPosition.finishPartition()); assertTrue(isSplitAllowed); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java index 27b26cd949fc0..3fd6e242d8fd3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.gcp.spanner.cdc.restriction; import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM; -import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.WAIT_FOR_CHILD_PARTITIONS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -106,33 +105,23 @@ public void testQueryChangeStreamGreaterThanEndTimestamp() { @Test(expected = IllegalArgumentException.class) public void testQueryChangeStreamWithoutTimestamp() { - final PartitionPosition position = - new PartitionPosition(Optional.empty(), QUERY_CHANGE_STREAM, Optional.empty()); + final PartitionPosition position = new PartitionPosition(Optional.empty(), QUERY_CHANGE_STREAM); splitter.trySplit(0D, true, position, restriction); } @Test public void testWaitForChildPartitions() { - final PartitionPosition position = PartitionPosition.waitForChildPartitions(10L); + final PartitionPosition position = PartitionPosition.waitForChildPartitions(); final SplitResult splitResult = splitter.trySplit(0D, true, position, restriction); assertEquals( - SplitResult.of( - PartitionRestriction.stop(), PartitionRestriction.waitForChildPartitions(10L)), + SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.waitForChildPartitions()), splitResult); } - @Test(expected = IllegalArgumentException.class) - public void testWaitForChildPartitionsWithoutTimestamp() { - final PartitionPosition position = - new PartitionPosition(Optional.empty(), WAIT_FOR_CHILD_PARTITIONS, Optional.empty()); - - splitter.trySplit(0D, true, position, restriction); - } - @Test public void testFinishPartition() { final PartitionPosition position = PartitionPosition.finishPartition(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java index bbe413709a057..8a854e3888e17 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java @@ -35,45 +35,44 @@ public class PartitionRestrictionTest { public void testQueryChangeStreamRestriction() { assertEquals( PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE), - new PartitionRestriction( - Timestamp.MIN_VALUE, Timestamp.MAX_VALUE, QUERY_CHANGE_STREAM, null)); + new PartitionRestriction(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE, QUERY_CHANGE_STREAM)); } @Test public void testWaitForChildPartitionsRestriction() { assertEquals( - PartitionRestriction.waitForChildPartitions(10L), - new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS, 10L)); + PartitionRestriction.waitForChildPartitions(), + new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS)); } @Test public void testFinishPartitionRestriction() { assertEquals( PartitionRestriction.finishPartition(), - new PartitionRestriction(null, null, FINISH_PARTITION, null)); + new PartitionRestriction(null, null, FINISH_PARTITION)); } @Test public void testWaitForParentPartitionsRestriction() { assertEquals( PartitionRestriction.waitForParentPartitions(), - new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS, null)); + new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS)); } @Test public void testDeletePartitionRestriction() { assertEquals( PartitionRestriction.deletePartition(), - new PartitionRestriction(null, null, DELETE_PARTITION, null)); + new PartitionRestriction(null, null, DELETE_PARTITION)); } @Test public void testDoneRestriction() { - assertEquals(PartitionRestriction.done(), new PartitionRestriction(null, null, DONE, null)); + assertEquals(PartitionRestriction.done(), new PartitionRestriction(null, null, DONE)); } @Test public void testStopRestriction() { - assertEquals(PartitionRestriction.stop(), new PartitionRestriction(null, null, STOP, null)); + assertEquals(PartitionRestriction.stop(), new PartitionRestriction(null, null, STOP)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java index bf48cb1154965..7f6c172bf6fb4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java @@ -65,9 +65,7 @@ public void testIsSplitAllowedQueryChangeStreamInitialization() { public void testIsSplitAllowedNonQueryChangeStreamInitialization() { Arrays.stream(PartitionMode.values()) .filter(mode -> mode != QUERY_CHANGE_STREAM) - .map( - mode -> - new PartitionRestrictionTracker(new PartitionRestriction(null, null, mode, null))) + .map(mode -> new PartitionRestrictionTracker(new PartitionRestriction(null, null, mode))) .forEach( tracker -> assertTrue(