From e9b24f61a2aef8ad1e58676f8f343826b861ab92 Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Wed, 14 Jul 2021 15:57:56 +1000 Subject: [PATCH] feat: implement getProgress for ReadChangeStreamPartitionDoFn (#33) * feat: add timestamps to all restriction modes This is necessary to track progress. * feat: add progress checker for spanner read dofn Adds progress checker for read change stream partition dofn. This provides a progress estimate for the function completion. * feat: add stopped mode to partition restriction This will help us calculate the progress left for a stopped restriction. * chore: spotless apply --- .../cdc/ReadChangeStreamPartitionDoFn.java | 6 +- .../cdc/restriction/PartitionRestriction.java | 52 ++- .../PartitionRestrictionProgressChecker.java | 91 ++++ .../PartitionRestrictionSplitter.java | 17 +- .../PartitionRestrictionTracker.java | 16 +- .../PartitionRestrictionClaimerTest.java | 17 +- ...rtitionRestrictionProgressCheckerTest.java | 424 ++++++++++++++++++ .../PartitionRestrictionSplitterTest.java | 17 +- .../restriction/PartitionRestrictionTest.java | 40 +- .../PartitionRestrictionTrackerTest.java | 10 +- 10 files changed, 638 insertions(+), 52 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressChecker.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressCheckerTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java index f64b887772b92..18e00b7408210 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/ReadChangeStreamPartitionDoFn.java @@ -93,10 +93,8 @@ public ManualWatermarkEstimator newWatermarkEstimator( @GetInitialRestriction public PartitionRestriction initialRestriction(@Element PartitionMetadata partition) { LOG.info("[" + partition.getPartitionToken() + "] Initial restriction"); - return new PartitionRestriction( - partition.getStartTimestamp(), - partition.getEndTimestamp(), - PartitionMode.QUERY_CHANGE_STREAM); + return PartitionRestriction.queryChangeStream( + partition.getStartTimestamp(), partition.getEndTimestamp()); } @NewTracker diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java index e3d00598f57eb..ee4619f205931 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestriction.java @@ -37,41 +37,54 @@ public class PartitionRestriction implements Serializable { private final Timestamp startTimestamp; private final Timestamp endTimestamp; private final PartitionMode mode; + private final PartitionMode stoppedMode; public static PartitionRestriction queryChangeStream( Timestamp startTimestamp, Timestamp endTimestamp) { - return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM); + return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM, null); } - public static PartitionRestriction waitForChildPartitions() { - return new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS); + public static PartitionRestriction waitForChildPartitions( + Timestamp startTimestamp, Timestamp endTimestamp) { + return new PartitionRestriction(startTimestamp, endTimestamp, WAIT_FOR_CHILD_PARTITIONS, null); } - public static PartitionRestriction finishPartition() { - return new PartitionRestriction(null, null, FINISH_PARTITION); + public static PartitionRestriction finishPartition( + Timestamp startTimestamp, Timestamp endTimestamp) { + return new PartitionRestriction(startTimestamp, endTimestamp, FINISH_PARTITION, null); } - public static PartitionRestriction waitForParentPartitions() { - return new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS); + public static PartitionRestriction waitForParentPartitions( + Timestamp startTimestamp, Timestamp endTimestamp) { + return new PartitionRestriction(startTimestamp, endTimestamp, WAIT_FOR_PARENT_PARTITIONS, null); } - public static PartitionRestriction deletePartition() { - return new PartitionRestriction(null, null, DELETE_PARTITION); + public static PartitionRestriction deletePartition( + Timestamp startTimestamp, Timestamp endTimestamp) { + return new PartitionRestriction(startTimestamp, endTimestamp, DELETE_PARTITION, null); } - public static PartitionRestriction done() { - return new PartitionRestriction(null, null, DONE); + public static PartitionRestriction done(Timestamp startTimestamp, Timestamp endTimestamp) { + return new PartitionRestriction(startTimestamp, endTimestamp, DONE, null); } - public static PartitionRestriction stop() { - return new PartitionRestriction(null, null, STOP); + public static PartitionRestriction stop(PartitionRestriction restriction) { + return new PartitionRestriction( + restriction.getStartTimestamp(), + restriction.getEndTimestamp(), + STOP, + restriction.getMode()); } public PartitionRestriction( - Timestamp startTimestamp, Timestamp endTimestamp, PartitionMode mode) { + Timestamp startTimestamp, + Timestamp endTimestamp, + PartitionMode mode, + PartitionMode stoppedMode) { this.startTimestamp = startTimestamp; this.endTimestamp = endTimestamp; this.mode = mode; + this.stoppedMode = stoppedMode; } public Timestamp getStartTimestamp() { @@ -86,6 +99,10 @@ public PartitionMode getMode() { return mode; } + public PartitionMode getStoppedMode() { + return stoppedMode; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -97,12 +114,13 @@ public boolean equals(Object o) { PartitionRestriction that = (PartitionRestriction) o; return Objects.equals(startTimestamp, that.startTimestamp) && Objects.equals(endTimestamp, that.endTimestamp) - && mode == that.mode; + && mode == that.mode + && stoppedMode == that.stoppedMode; } @Override public int hashCode() { - return Objects.hash(startTimestamp, endTimestamp, mode); + return Objects.hash(startTimestamp, endTimestamp, mode, stoppedMode); } @Override @@ -114,6 +132,8 @@ public String toString() { + endTimestamp + ", mode=" + mode + + ", stoppedMode=" + + stoppedMode + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressChecker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressChecker.java new file mode 100644 index 0000000000000..5da7d66714ebd --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressChecker.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.cdc.restriction; + +import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampToMicros; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.DELETE_PARTITION; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.DONE; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.FINISH_PARTITION; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.STOP; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.WAIT_FOR_CHILD_PARTITIONS; +import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.WAIT_FOR_PARENT_PARTITIONS; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; + +public class PartitionRestrictionProgressChecker { + + private static final BigDecimal TOTAL_MODE_TRANSITIONS = BigDecimal.valueOf(5L); + + /** + * Indicates how many mode transitions have been completed for the current mode. The transitions + * are as follows: * (1) QUERY_CHANGE_STREAM, (2) WAIT_FOR_CHILD_PARTITIONS, (3) FINISH_PARTITION, + * (4) WAIT_FOR_PARENT_PARTITIONS, (5) DELETE_PARTITION, (6) DONE. + * + *

This is used to calculate the units of work left, meaning that 1 transition = 1 unit of + * work. + */ + private final Map modeToTransitionsCompleted; + + public PartitionRestrictionProgressChecker() { + modeToTransitionsCompleted = new HashMap<>(); + modeToTransitionsCompleted.put(QUERY_CHANGE_STREAM, BigDecimal.valueOf(0L)); + modeToTransitionsCompleted.put(WAIT_FOR_CHILD_PARTITIONS, BigDecimal.valueOf(1L)); + modeToTransitionsCompleted.put(FINISH_PARTITION, BigDecimal.valueOf(2L)); + modeToTransitionsCompleted.put(WAIT_FOR_PARENT_PARTITIONS, BigDecimal.valueOf(3L)); + modeToTransitionsCompleted.put(DELETE_PARTITION, BigDecimal.valueOf(4L)); + modeToTransitionsCompleted.put(DONE, BigDecimal.valueOf(5L)); + } + + public Progress getProgress( + PartitionRestriction restriction, PartitionPosition lastClaimedPosition) { + final PartitionMode currentMode = + Optional.ofNullable(lastClaimedPosition) + .map(PartitionPosition::getMode) + .orElse( + restriction.getMode() == STOP + ? restriction.getStoppedMode() + : restriction.getMode()); + final BigDecimal transitionsCompleted = + modeToTransitionsCompleted.getOrDefault(currentMode, BigDecimal.ZERO); + + final BigDecimal startTimestampAsMicros = timestampToMicros(restriction.getStartTimestamp()); + final BigDecimal endTimestampAsMicros = timestampToMicros(restriction.getEndTimestamp()); + final BigDecimal currentTimestampAsMicros = + Optional.ofNullable(lastClaimedPosition) + .flatMap(PartitionPosition::getTimestamp) + .map(TimestampConverter::timestampToMicros) + .orElse( + currentMode == QUERY_CHANGE_STREAM ? startTimestampAsMicros : endTimestampAsMicros); + + final BigDecimal workCompleted = + currentTimestampAsMicros.subtract(startTimestampAsMicros).add(transitionsCompleted); + final BigDecimal workLeft = + endTimestampAsMicros + .subtract(startTimestampAsMicros) + .add(TOTAL_MODE_TRANSITIONS) + .subtract(workCompleted); + + return Progress.from(workCompleted.doubleValue(), workLeft.doubleValue()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java index 44340b486c04e..848602988584c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitter.java @@ -52,6 +52,9 @@ public SplitResult trySplit( "%s mode must specify a timestamp (no value sent)", positionMode); + final Timestamp startTimestamp = restriction.getStartTimestamp(); + final Timestamp endTimestamp = restriction.getEndTimestamp(); + SplitResult splitResult = null; switch (positionMode) { case QUERY_CHANGE_STREAM: @@ -63,12 +66,14 @@ public SplitResult trySplit( // restriction gets scheduled before the primary. splitResult = SplitResult.of( - PartitionRestriction.stop(), PartitionRestriction.waitForChildPartitions()); + PartitionRestriction.stop(restriction), + PartitionRestriction.waitForChildPartitions(startTimestamp, endTimestamp)); break; case FINISH_PARTITION: splitResult = SplitResult.of( - PartitionRestriction.stop(), PartitionRestriction.waitForParentPartitions()); + PartitionRestriction.stop(restriction), + PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp)); break; case WAIT_FOR_PARENT_PARTITIONS: // If we need to split the wait for parent partitions, we remain at the same mode. That is @@ -76,10 +81,14 @@ public SplitResult trySplit( // restriction gets scheduled before the primary. splitResult = SplitResult.of( - PartitionRestriction.stop(), PartitionRestriction.waitForParentPartitions()); + PartitionRestriction.stop(restriction), + PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp)); break; case DELETE_PARTITION: - splitResult = SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.done()); + splitResult = + SplitResult.of( + PartitionRestriction.stop(restriction), + PartitionRestriction.done(startTimestamp, endTimestamp)); break; case DONE: return null; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTracker.java index 2897ce8f3b835..7b14e4278c5d4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTracker.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; @@ -27,11 +28,12 @@ // TODO: Add java docs // TODO: Implement duration waiting for returning false on try claim public class PartitionRestrictionTracker - extends RestrictionTracker { + extends RestrictionTracker implements HasProgress { private final PartitionRestrictionSplitter splitter; private final PartitionRestrictionClaimer claimer; private final PartitionRestrictionSplitChecker splitChecker; + private final PartitionRestrictionProgressChecker progressChecker; private PartitionRestriction restriction; private PartitionPosition lastClaimedPosition; private boolean isSplitAllowed; @@ -41,19 +43,22 @@ public PartitionRestrictionTracker(PartitionRestriction restriction) { restriction, new PartitionRestrictionSplitter(), new PartitionRestrictionClaimer(), - new PartitionRestrictionSplitChecker()); + new PartitionRestrictionSplitChecker(), + new PartitionRestrictionProgressChecker()); } PartitionRestrictionTracker( PartitionRestriction restriction, PartitionRestrictionSplitter splitter, PartitionRestrictionClaimer claimer, - PartitionRestrictionSplitChecker splitChecker) { + PartitionRestrictionSplitChecker splitChecker, + PartitionRestrictionProgressChecker progressChecker) { this.splitter = splitter; this.claimer = claimer; this.splitChecker = splitChecker; this.restriction = restriction; this.isSplitAllowed = restriction.getMode() != QUERY_CHANGE_STREAM; + this.progressChecker = progressChecker; } @Override @@ -78,6 +83,11 @@ public boolean tryClaim(PartitionPosition position) { return canClaim; } + @Override + public Progress getProgress() { + return progressChecker.getProgress(restriction, lastClaimedPosition); + } + @Override public PartitionRestriction currentRestriction() { return restriction; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java index 9edf3cc3eca25..65611b4af7209 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionClaimerTest.java @@ -207,21 +207,24 @@ public void run() { } private PartitionRestriction partitionRestrictionFrom(PartitionMode mode) { + final Timestamp startTimestamp = Timestamp.MIN_VALUE; + final Timestamp endTimestamp = Timestamp.MAX_VALUE; switch (mode) { case QUERY_CHANGE_STREAM: - return PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE); + return PartitionRestriction.queryChangeStream(startTimestamp, endTimestamp); case WAIT_FOR_CHILD_PARTITIONS: - return PartitionRestriction.waitForChildPartitions(); + return PartitionRestriction.waitForChildPartitions(startTimestamp, endTimestamp); case FINISH_PARTITION: - return PartitionRestriction.finishPartition(); + return PartitionRestriction.finishPartition(startTimestamp, endTimestamp); case WAIT_FOR_PARENT_PARTITIONS: - return PartitionRestriction.waitForParentPartitions(); + return PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp); case DELETE_PARTITION: - return PartitionRestriction.deletePartition(); + return PartitionRestriction.deletePartition(startTimestamp, endTimestamp); case DONE: - return PartitionRestriction.done(); + return PartitionRestriction.done(startTimestamp, endTimestamp); case STOP: - return PartitionRestriction.stop(); + return PartitionRestriction.stop( + PartitionRestriction.queryChangeStream(startTimestamp, endTimestamp)); default: throw new IllegalArgumentException("Unknown mode " + mode); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressCheckerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressCheckerTest.java new file mode 100644 index 0000000000000..7edf3913f1ae4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionProgressCheckerTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.cdc.restriction; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.Timestamp; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; +import org.junit.Before; +import org.junit.Test; + +public class PartitionRestrictionProgressCheckerTest { + + private PartitionRestrictionProgressChecker progressChecker; + + @Before + public void setUp() { + progressChecker = new PartitionRestrictionProgressChecker(); + } + + // ------------------------ + // QUERY_CHANGE_STREAM mode + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionNull() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(0D, 55D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionQueryChangeStream() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = + PartitionPosition.queryChangeStream(Timestamp.ofTimeMicroseconds(30L)); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(20D, 35D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionEndOfQueryChangeStream() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = + PartitionPosition.queryChangeStream(Timestamp.ofTimeMicroseconds(60L)); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(50D, 5D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionWaitForChildPartitions() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.waitForChildPartitions(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(51D, 4D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionFinishPartition() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.finishPartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(52D, 3D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionWaitForParentPartitions() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.waitForParentPartitions(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(53D, 2D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionDeletePartition() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.deletePartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(54D, 1D), progress); + } + + @Test + public void testRestrictionQueryChangeStreamAndLastClaimedPositionDone() { + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.done(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(55D, 0D), progress); + } + + // ------------------------------ + // WAIT_FOR_CHILD_PARTITIONS mode + @Test + public void testRestrictionWaitForChildPartitionsAndLastClaimedPositionNull() { + final PartitionRestriction restriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(51D, 4D), progress); + } + + @Test + public void testRestrictionWaitForChildPartitionsAndLastClaimedPositionWaitForChildPartitions() { + final PartitionRestriction restriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.waitForChildPartitions(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(51D, 4D), progress); + } + + @Test + public void testRestrictionWaitForChildPartitionsAndLastClaimedPositionFinishPartition() { + final PartitionRestriction restriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.finishPartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(52D, 3D), progress); + } + + @Test + public void testRestrictionWaitForChildPartitionsAndLastClaimedPositionWaitForParentPartitions() { + final PartitionRestriction restriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.waitForParentPartitions(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(53D, 2D), progress); + } + + @Test + public void testRestrictionWaitForChildPartitionsAndLastClaimedPositionDeletePartition() { + final PartitionRestriction restriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.deletePartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(54D, 1D), progress); + } + + @Test + public void testRestrictionWaitForChildPartitionsAndLastClaimedPositionDone() { + final PartitionRestriction restriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.done(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(55D, 0D), progress); + } + + // ------------------------ + // FINISH_PARTITION mode + @Test + public void testRestrictionFinishPartitionAndLastClaimedPositionFinishPartition() { + final PartitionRestriction restriction = + PartitionRestriction.finishPartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.finishPartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(52D, 3D), progress); + } + + @Test + public void testRestrictionFinishPartitionAndLastClaimedPositionWaitForParentPartitions() { + final PartitionRestriction restriction = + PartitionRestriction.finishPartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.waitForParentPartitions(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(53D, 2D), progress); + } + + @Test + public void testRestrictionFinishPartitionAndLastClaimedPositionDeletePartition() { + final PartitionRestriction restriction = + PartitionRestriction.finishPartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.deletePartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(54D, 1D), progress); + } + + @Test + public void testRestrictionFinishPartitionAndLastClaimedPositionDone() { + final PartitionRestriction restriction = + PartitionRestriction.finishPartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.done(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(55D, 0D), progress); + } + + // ------------------------------- + // WAIT_FOR_PARENT_PARTITIONS mode + @Test + public void + testRestrictionWaitForParentPartitionsAndLastClaimedPositionWaitForParentPartitions() { + final PartitionRestriction restriction = + PartitionRestriction.waitForParentPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.waitForParentPartitions(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(53D, 2D), progress); + } + + @Test + public void testRestrictionWaitForParentPartitionsAndLastClaimedPositionDeletePartition() { + final PartitionRestriction restriction = + PartitionRestriction.waitForParentPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.deletePartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(54D, 1D), progress); + } + + @Test + public void testRestrictionWaitForParentPartitionsAndLastClaimedPositionDone() { + final PartitionRestriction restriction = + PartitionRestriction.waitForParentPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.done(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(55D, 0D), progress); + } + + // ------------------------ + // DELETE_PARTITION mode + @Test + public void testRestrictionDeletePartitionAndLastClaimedPositionDeletePartition() { + final PartitionRestriction restriction = + PartitionRestriction.deletePartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.deletePartition(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(54D, 1D), progress); + } + + @Test + public void testRestrictionDeletePartitionAndLastClaimedPositionDone() { + final PartitionRestriction restriction = + PartitionRestriction.deletePartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.done(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(55D, 0D), progress); + } + + // ------------------------ + // DONE mode + @Test + public void testRestrictionDoneAndLastClaimedPositionDone() { + final PartitionRestriction restriction = + PartitionRestriction.done( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionPosition position = PartitionPosition.done(); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(55D, 0D), progress); + } + + // ------------------------ + // STOP mode + @Test + public void testRestrictionStopQueryChangeStreamAndLastClaimedPositionNull() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(0D, 55D), progress); + } + + @Test + public void testRestrictionStopQueryChangeStreamAndLastClaimedPositionQueryChangeStream() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.queryChangeStream( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + final PartitionPosition position = + PartitionPosition.queryChangeStream(Timestamp.ofTimeMicroseconds(30L)); + + final Progress progress = progressChecker.getProgress(restriction, position); + + assertEquals(Progress.from(20D, 35D), progress); + } + + @Test + public void testRestrictionStopWaitForChildPartitionsAndLastClaimedPositionNull() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.waitForChildPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(51D, 4D), progress); + } + + @Test + public void testRestrictionStopFinishPartitionAndLastClaimedPositionNull() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.finishPartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(52D, 3D), progress); + } + + @Test + public void testRestrictionStopWaitForParentPartitionsAndLastClaimedPositionNull() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.waitForParentPartitions( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(53D, 2D), progress); + } + + @Test + public void testRestrictionStopDeleteAndLastClaimedPositionNull() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.deletePartition( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(54D, 1D), progress); + } + + @Test + public void testRestrictionStopDoneAndLastClaimedPositionNull() { + final PartitionRestriction stoppedRestriction = + PartitionRestriction.done( + Timestamp.ofTimeMicroseconds(10L), Timestamp.ofTimeMicroseconds(60L)); + final PartitionRestriction restriction = PartitionRestriction.stop(stoppedRestriction); + + final Progress progress = progressChecker.getProgress(restriction, null); + + assertEquals(Progress.from(55D, 0D), progress); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java index 3fd6e242d8fd3..c914faf0beee4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionSplitterTest.java @@ -118,7 +118,9 @@ public void testWaitForChildPartitions() { splitter.trySplit(0D, true, position, restriction); assertEquals( - SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.waitForChildPartitions()), + SplitResult.of( + PartitionRestriction.stop(restriction), + PartitionRestriction.waitForChildPartitions(startTimestamp, endTimestamp)), splitResult); } @@ -130,7 +132,9 @@ public void testFinishPartition() { splitter.trySplit(0D, true, position, restriction); assertEquals( - SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.waitForParentPartitions()), + SplitResult.of( + PartitionRestriction.stop(restriction), + PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp)), splitResult); } @@ -142,7 +146,9 @@ public void testWaitForParentPartitions() { splitter.trySplit(0D, true, position, restriction); assertEquals( - SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.waitForParentPartitions()), + SplitResult.of( + PartitionRestriction.stop(restriction), + PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp)), splitResult); } @@ -154,7 +160,10 @@ public void testDeletePartition() { splitter.trySplit(0D, true, position, restriction); assertEquals( - SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.done()), splitResult); + SplitResult.of( + PartitionRestriction.stop(restriction), + PartitionRestriction.done(startTimestamp, endTimestamp)), + splitResult); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java index 8a854e3888e17..d11b4e4676de5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTest.java @@ -27,52 +27,68 @@ import static org.junit.Assert.assertEquals; import com.google.cloud.Timestamp; +import org.junit.Before; import org.junit.Test; public class PartitionRestrictionTest { + private Timestamp startTimestamp; + private Timestamp endTimestamp; + + @Before + public void setUp() throws Exception { + startTimestamp = Timestamp.MIN_VALUE; + endTimestamp = Timestamp.MAX_VALUE; + } + @Test public void testQueryChangeStreamRestriction() { assertEquals( - PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE), - new PartitionRestriction(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE, QUERY_CHANGE_STREAM)); + PartitionRestriction.queryChangeStream(startTimestamp, endTimestamp), + new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM, null)); } @Test public void testWaitForChildPartitionsRestriction() { assertEquals( - PartitionRestriction.waitForChildPartitions(), - new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS)); + PartitionRestriction.waitForChildPartitions(startTimestamp, endTimestamp), + new PartitionRestriction(startTimestamp, endTimestamp, WAIT_FOR_CHILD_PARTITIONS, null)); } @Test public void testFinishPartitionRestriction() { assertEquals( - PartitionRestriction.finishPartition(), - new PartitionRestriction(null, null, FINISH_PARTITION)); + PartitionRestriction.finishPartition(startTimestamp, endTimestamp), + new PartitionRestriction(startTimestamp, endTimestamp, FINISH_PARTITION, null)); } @Test public void testWaitForParentPartitionsRestriction() { assertEquals( - PartitionRestriction.waitForParentPartitions(), - new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS)); + PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp), + new PartitionRestriction(startTimestamp, endTimestamp, WAIT_FOR_PARENT_PARTITIONS, null)); } @Test public void testDeletePartitionRestriction() { assertEquals( - PartitionRestriction.deletePartition(), - new PartitionRestriction(null, null, DELETE_PARTITION)); + PartitionRestriction.deletePartition(startTimestamp, endTimestamp), + new PartitionRestriction(startTimestamp, endTimestamp, DELETE_PARTITION, null)); } @Test public void testDoneRestriction() { - assertEquals(PartitionRestriction.done(), new PartitionRestriction(null, null, DONE)); + assertEquals( + PartitionRestriction.done(startTimestamp, endTimestamp), + new PartitionRestriction(startTimestamp, endTimestamp, DONE, null)); } @Test public void testStopRestriction() { - assertEquals(PartitionRestriction.stop(), new PartitionRestriction(null, null, STOP)); + final PartitionRestriction restriction = + PartitionRestriction.queryChangeStream(startTimestamp, endTimestamp); + assertEquals( + PartitionRestriction.stop(restriction), + new PartitionRestriction(startTimestamp, endTimestamp, STOP, QUERY_CHANGE_STREAM)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java index 7f6c172bf6fb4..fd0d25d328398 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/cdc/restriction/PartitionRestrictionTrackerTest.java @@ -41,6 +41,7 @@ public class PartitionRestrictionTrackerTest { private PartitionRestrictionSplitter splitter; private PartitionRestrictionClaimer claimer; private PartitionRestrictionSplitChecker splitChecker; + private PartitionRestrictionProgressChecker progressChecker; private PartitionRestrictionTracker tracker; @Before @@ -49,7 +50,10 @@ public void setUp() { splitter = mock(PartitionRestrictionSplitter.class); claimer = mock(PartitionRestrictionClaimer.class); splitChecker = mock(PartitionRestrictionSplitChecker.class); - tracker = new PartitionRestrictionTracker(restriction, splitter, claimer, splitChecker); + progressChecker = mock(PartitionRestrictionProgressChecker.class); + tracker = + new PartitionRestrictionTracker( + restriction, splitter, claimer, splitChecker, progressChecker); } @Test @@ -65,7 +69,9 @@ public void testIsSplitAllowedQueryChangeStreamInitialization() { public void testIsSplitAllowedNonQueryChangeStreamInitialization() { Arrays.stream(PartitionMode.values()) .filter(mode -> mode != QUERY_CHANGE_STREAM) - .map(mode -> new PartitionRestrictionTracker(new PartitionRestriction(null, null, mode))) + .map( + mode -> + new PartitionRestrictionTracker(new PartitionRestriction(null, null, mode, null))) .forEach( tracker -> assertTrue(