Skip to content

Commit

Permalink
feat: implement getProgress for ReadChangeStreamPartitionDoFn (#33)
Browse files Browse the repository at this point in the history
* feat: add timestamps to all restriction modes

This is necessary to track progress.

* feat: add progress checker for spanner read dofn

Adds progress checker for read change stream partition dofn. This
provides a progress estimate for the function completion.

* feat: add stopped mode to partition restriction

This will help us calculate the progress left for a stopped restriction.

* chore: spotless apply
  • Loading branch information
thiagotnunes authored Jul 14, 2021
1 parent 73d2c3c commit e9b24f6
Show file tree
Hide file tree
Showing 10 changed files with 638 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ public ManualWatermarkEstimator<Instant> newWatermarkEstimator(
@GetInitialRestriction
public PartitionRestriction initialRestriction(@Element PartitionMetadata partition) {
LOG.info("[" + partition.getPartitionToken() + "] Initial restriction");
return new PartitionRestriction(
partition.getStartTimestamp(),
partition.getEndTimestamp(),
PartitionMode.QUERY_CHANGE_STREAM);
return PartitionRestriction.queryChangeStream(
partition.getStartTimestamp(), partition.getEndTimestamp());
}

@NewTracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,54 @@ public class PartitionRestriction implements Serializable {
private final Timestamp startTimestamp;
private final Timestamp endTimestamp;
private final PartitionMode mode;
private final PartitionMode stoppedMode;

public static PartitionRestriction queryChangeStream(
Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM);
return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM, null);
}

public static PartitionRestriction waitForChildPartitions() {
return new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS);
public static PartitionRestriction waitForChildPartitions(
Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, WAIT_FOR_CHILD_PARTITIONS, null);
}

public static PartitionRestriction finishPartition() {
return new PartitionRestriction(null, null, FINISH_PARTITION);
public static PartitionRestriction finishPartition(
Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, FINISH_PARTITION, null);
}

public static PartitionRestriction waitForParentPartitions() {
return new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS);
public static PartitionRestriction waitForParentPartitions(
Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, WAIT_FOR_PARENT_PARTITIONS, null);
}

public static PartitionRestriction deletePartition() {
return new PartitionRestriction(null, null, DELETE_PARTITION);
public static PartitionRestriction deletePartition(
Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, DELETE_PARTITION, null);
}

public static PartitionRestriction done() {
return new PartitionRestriction(null, null, DONE);
public static PartitionRestriction done(Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, DONE, null);
}

public static PartitionRestriction stop() {
return new PartitionRestriction(null, null, STOP);
public static PartitionRestriction stop(PartitionRestriction restriction) {
return new PartitionRestriction(
restriction.getStartTimestamp(),
restriction.getEndTimestamp(),
STOP,
restriction.getMode());
}

public PartitionRestriction(
Timestamp startTimestamp, Timestamp endTimestamp, PartitionMode mode) {
Timestamp startTimestamp,
Timestamp endTimestamp,
PartitionMode mode,
PartitionMode stoppedMode) {
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.mode = mode;
this.stoppedMode = stoppedMode;
}

public Timestamp getStartTimestamp() {
Expand All @@ -86,6 +99,10 @@ public PartitionMode getMode() {
return mode;
}

public PartitionMode getStoppedMode() {
return stoppedMode;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -97,12 +114,13 @@ public boolean equals(Object o) {
PartitionRestriction that = (PartitionRestriction) o;
return Objects.equals(startTimestamp, that.startTimestamp)
&& Objects.equals(endTimestamp, that.endTimestamp)
&& mode == that.mode;
&& mode == that.mode
&& stoppedMode == that.stoppedMode;
}

@Override
public int hashCode() {
return Objects.hash(startTimestamp, endTimestamp, mode);
return Objects.hash(startTimestamp, endTimestamp, mode, stoppedMode);
}

@Override
Expand All @@ -114,6 +132,8 @@ public String toString() {
+ endTimestamp
+ ", mode="
+ mode
+ ", stoppedMode="
+ stoppedMode
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc.restriction;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter.timestampToMicros;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.DELETE_PARTITION;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.DONE;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.FINISH_PARTITION;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.STOP;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.WAIT_FOR_CHILD_PARTITIONS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.WAIT_FOR_PARENT_PARTITIONS;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.cdc.TimestampConverter;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;

public class PartitionRestrictionProgressChecker {

private static final BigDecimal TOTAL_MODE_TRANSITIONS = BigDecimal.valueOf(5L);

/**
* Indicates how many mode transitions have been completed for the current mode. The transitions
* are as follows: * (1) QUERY_CHANGE_STREAM, (2) WAIT_FOR_CHILD_PARTITIONS, (3) FINISH_PARTITION,
* (4) WAIT_FOR_PARENT_PARTITIONS, (5) DELETE_PARTITION, (6) DONE.
*
* <p>This is used to calculate the units of work left, meaning that 1 transition = 1 unit of
* work.
*/
private final Map<PartitionMode, BigDecimal> modeToTransitionsCompleted;

public PartitionRestrictionProgressChecker() {
modeToTransitionsCompleted = new HashMap<>();
modeToTransitionsCompleted.put(QUERY_CHANGE_STREAM, BigDecimal.valueOf(0L));
modeToTransitionsCompleted.put(WAIT_FOR_CHILD_PARTITIONS, BigDecimal.valueOf(1L));
modeToTransitionsCompleted.put(FINISH_PARTITION, BigDecimal.valueOf(2L));
modeToTransitionsCompleted.put(WAIT_FOR_PARENT_PARTITIONS, BigDecimal.valueOf(3L));
modeToTransitionsCompleted.put(DELETE_PARTITION, BigDecimal.valueOf(4L));
modeToTransitionsCompleted.put(DONE, BigDecimal.valueOf(5L));
}

public Progress getProgress(
PartitionRestriction restriction, PartitionPosition lastClaimedPosition) {
final PartitionMode currentMode =
Optional.ofNullable(lastClaimedPosition)
.map(PartitionPosition::getMode)
.orElse(
restriction.getMode() == STOP
? restriction.getStoppedMode()
: restriction.getMode());
final BigDecimal transitionsCompleted =
modeToTransitionsCompleted.getOrDefault(currentMode, BigDecimal.ZERO);

final BigDecimal startTimestampAsMicros = timestampToMicros(restriction.getStartTimestamp());
final BigDecimal endTimestampAsMicros = timestampToMicros(restriction.getEndTimestamp());
final BigDecimal currentTimestampAsMicros =
Optional.ofNullable(lastClaimedPosition)
.flatMap(PartitionPosition::getTimestamp)
.map(TimestampConverter::timestampToMicros)
.orElse(
currentMode == QUERY_CHANGE_STREAM ? startTimestampAsMicros : endTimestampAsMicros);

final BigDecimal workCompleted =
currentTimestampAsMicros.subtract(startTimestampAsMicros).add(transitionsCompleted);
final BigDecimal workLeft =
endTimestampAsMicros
.subtract(startTimestampAsMicros)
.add(TOTAL_MODE_TRANSITIONS)
.subtract(workCompleted);

return Progress.from(workCompleted.doubleValue(), workLeft.doubleValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public SplitResult<PartitionRestriction> trySplit(
"%s mode must specify a timestamp (no value sent)",
positionMode);

final Timestamp startTimestamp = restriction.getStartTimestamp();
final Timestamp endTimestamp = restriction.getEndTimestamp();

SplitResult<PartitionRestriction> splitResult = null;
switch (positionMode) {
case QUERY_CHANGE_STREAM:
Expand All @@ -63,23 +66,29 @@ public SplitResult<PartitionRestriction> trySplit(
// restriction gets scheduled before the primary.
splitResult =
SplitResult.of(
PartitionRestriction.stop(), PartitionRestriction.waitForChildPartitions());
PartitionRestriction.stop(restriction),
PartitionRestriction.waitForChildPartitions(startTimestamp, endTimestamp));
break;
case FINISH_PARTITION:
splitResult =
SplitResult.of(
PartitionRestriction.stop(), PartitionRestriction.waitForParentPartitions());
PartitionRestriction.stop(restriction),
PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp));
break;
case WAIT_FOR_PARENT_PARTITIONS:
// If we need to split the wait for parent partitions, we remain at the same mode. That is
// because the primary restriction might resume and it might so happen that the residual
// restriction gets scheduled before the primary.
splitResult =
SplitResult.of(
PartitionRestriction.stop(), PartitionRestriction.waitForParentPartitions());
PartitionRestriction.stop(restriction),
PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp));
break;
case DELETE_PARTITION:
splitResult = SplitResult.of(PartitionRestriction.stop(), PartitionRestriction.done());
splitResult =
SplitResult.of(
PartitionRestriction.stop(restriction),
PartitionRestriction.done(startTimestamp, endTimestamp));
break;
case DONE:
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@
import static org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode.QUERY_CHANGE_STREAM;

import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

// TODO: Add java docs
// TODO: Implement duration waiting for returning false on try claim
public class PartitionRestrictionTracker
extends RestrictionTracker<PartitionRestriction, PartitionPosition> {
extends RestrictionTracker<PartitionRestriction, PartitionPosition> implements HasProgress {

private final PartitionRestrictionSplitter splitter;
private final PartitionRestrictionClaimer claimer;
private final PartitionRestrictionSplitChecker splitChecker;
private final PartitionRestrictionProgressChecker progressChecker;
private PartitionRestriction restriction;
private PartitionPosition lastClaimedPosition;
private boolean isSplitAllowed;
Expand All @@ -41,19 +43,22 @@ public PartitionRestrictionTracker(PartitionRestriction restriction) {
restriction,
new PartitionRestrictionSplitter(),
new PartitionRestrictionClaimer(),
new PartitionRestrictionSplitChecker());
new PartitionRestrictionSplitChecker(),
new PartitionRestrictionProgressChecker());
}

PartitionRestrictionTracker(
PartitionRestriction restriction,
PartitionRestrictionSplitter splitter,
PartitionRestrictionClaimer claimer,
PartitionRestrictionSplitChecker splitChecker) {
PartitionRestrictionSplitChecker splitChecker,
PartitionRestrictionProgressChecker progressChecker) {
this.splitter = splitter;
this.claimer = claimer;
this.splitChecker = splitChecker;
this.restriction = restriction;
this.isSplitAllowed = restriction.getMode() != QUERY_CHANGE_STREAM;
this.progressChecker = progressChecker;
}

@Override
Expand All @@ -78,6 +83,11 @@ public boolean tryClaim(PartitionPosition position) {
return canClaim;
}

@Override
public Progress getProgress() {
return progressChecker.getProgress(restriction, lastClaimedPosition);
}

@Override
public PartitionRestriction currentRestriction() {
return restriction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,21 +207,24 @@ public void run() {
}

private PartitionRestriction partitionRestrictionFrom(PartitionMode mode) {
final Timestamp startTimestamp = Timestamp.MIN_VALUE;
final Timestamp endTimestamp = Timestamp.MAX_VALUE;
switch (mode) {
case QUERY_CHANGE_STREAM:
return PartitionRestriction.queryChangeStream(Timestamp.MIN_VALUE, Timestamp.MAX_VALUE);
return PartitionRestriction.queryChangeStream(startTimestamp, endTimestamp);
case WAIT_FOR_CHILD_PARTITIONS:
return PartitionRestriction.waitForChildPartitions();
return PartitionRestriction.waitForChildPartitions(startTimestamp, endTimestamp);
case FINISH_PARTITION:
return PartitionRestriction.finishPartition();
return PartitionRestriction.finishPartition(startTimestamp, endTimestamp);
case WAIT_FOR_PARENT_PARTITIONS:
return PartitionRestriction.waitForParentPartitions();
return PartitionRestriction.waitForParentPartitions(startTimestamp, endTimestamp);
case DELETE_PARTITION:
return PartitionRestriction.deletePartition();
return PartitionRestriction.deletePartition(startTimestamp, endTimestamp);
case DONE:
return PartitionRestriction.done();
return PartitionRestriction.done(startTimestamp, endTimestamp);
case STOP:
return PartitionRestriction.stop();
return PartitionRestriction.stop(
PartitionRestriction.queryChangeStream(startTimestamp, endTimestamp));
default:
throw new IllegalArgumentException("Unknown mode " + mode);
}
Expand Down
Loading

0 comments on commit e9b24f6

Please sign in to comment.