Skip to content

Commit

Permalink
fix: fix child record processing (#28)
Browse files Browse the repository at this point in the history
We used to wait for children to be scheduled on every child partition
record received. The problem with that is that a child was not
scheduled, it could result in resuming the function with the mode
WAIT_FOR_CHILD_PARTITION. When resumed, the function would just wait for
the children and start finishing the partition. Because of this, any
records after the first child partition record for the same stream would
be in fact ignored.

This PR changes the logic, so that the waiting for child partitions to
be scheduled is only done when the change stream ends (meaning all child
partition records have been processed).
  • Loading branch information
thiagotnunes authored Jul 8, 2021
1 parent 424d031 commit fce070f
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ public ManualWatermarkEstimator<Instant> newWatermarkEstimator(
@GetInitialRestriction
public PartitionRestriction initialRestriction(@Element PartitionMetadata element) {
return new PartitionRestriction(
element.getStartTimestamp(),
element.getEndTimestamp(),
PartitionMode.QUERY_CHANGE_STREAM,
null);
element.getStartTimestamp(), element.getEndTimestamp(), PartitionMode.QUERY_CHANGE_STREAM);
}

@NewTracker
Expand All @@ -134,8 +131,7 @@ public void setup() {
this.dataChangeRecordAction = actionFactory.dataChangeRecordAction();
this.heartbeatRecordAction = actionFactory.heartbeatRecordAction();
this.childPartitionsRecordAction =
actionFactory.childPartitionsRecordAction(
partitionMetadataDao, waitForChildPartitionsAction);
actionFactory.childPartitionsRecordAction(partitionMetadataDao);
}

// TODO: Close DAOs on teardown
Expand Down Expand Up @@ -179,33 +175,35 @@ public ProcessContinuation processElement(
* The states will be stored in the {@link PartitionRestriction} and claimed through the
* {@link PartitionPosition}.
*
* HEARTBEAT RECORD
* DATA CHANGE RECORD
* |---------------------|
* v |
* +---------------------+ |
* | QUERY_CHANGE_STREAM |----------|
* +---------------------+
* |
* CHILD_PARTITION_RECORD | NO MORE RECORDS
* |----------------------------------------------|
* v v
* +---------------------------+ +------------------+
* | WAIT_FOR_CHILD_PARTITIONS |---------------------->| FINISH_PARTITION |
* +---------------------------+ +------------------+
* |--------------------|
* v
* +----------------------------+
* | WAIT_FOR_PARENT_PARTITIONS |
* +----------------------------+
* v
* +------------------+
* | DELETE_PARTITION |
* +------------------+
* v
* +------+
* | DONE |
* +------+
* HEARTBEAT RECORD
* DATA CHANGE RECORD
* CHILD PARTITION RECORD
* |---------------------|
* v |
* +---------------------+ |
* | QUERY_CHANGE_STREAM |----------|
* +---------------------+
* | NO MORE RECORDS
* v
* +---------------------------+
* | WAIT_FOR_CHILD_PARTITIONS |
* +---------------------------+
* v
* +------------------+
* | FINISH_PARTITION |
* +------------------+
* v
* +----------------------------+
* | WAIT_FOR_PARENT_PARTITIONS |
* +----------------------------+
* v
* +------------------+
* | DELETE_PARTITION |
* +------------------+
* v
* +------+
* | DONE |
* +------+
*
*/
// spotless:on
Expand Down Expand Up @@ -252,17 +250,15 @@ private ProcessContinuation queryChangeStream(
}
}

return finishPartition(partition, tracker);
return waitForChildPartitions(partition, tracker);
}
}

private ProcessContinuation waitForChildPartitions(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker) {
final Long childPartitionsToWaitFor =
tracker.currentRestriction().getChildPartitionsToWaitFor();
return waitForChildPartitionsAction
.run(partition, tracker, childPartitionsToWaitFor)
.run(partition, tracker)
.orElseGet(() -> finishPartition(partition, tracker));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ public synchronized HeartbeatRecordAction heartbeatRecordAction() {

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized ChildPartitionsRecordAction childPartitionsRecordAction(
PartitionMetadataDao partitionMetadataDao,
WaitForChildPartitionsAction waitForChildPartitionsAction) {
PartitionMetadataDao partitionMetadataDao) {
if (childPartitionsRecordActionInstance == null) {
childPartitionsRecordActionInstance =
new ChildPartitionsRecordAction(partitionMetadataDao, waitForChildPartitionsAction);
childPartitionsRecordActionInstance = new ChildPartitionsRecordAction(partitionMetadataDao);
}
return childPartitionsRecordActionInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@ public class ChildPartitionsRecordAction {

private static final Logger LOG = LoggerFactory.getLogger(ChildPartitionsRecordAction.class);
private final PartitionMetadataDao partitionMetadataDao;
private final WaitForChildPartitionsAction waitForChildPartitionsAction;

public ChildPartitionsRecordAction(
PartitionMetadataDao partitionMetadataDao,
WaitForChildPartitionsAction waitForChildPartitionsAction) {
public ChildPartitionsRecordAction(PartitionMetadataDao partitionMetadataDao) {
this.partitionMetadataDao = partitionMetadataDao;
this.waitForChildPartitionsAction = waitForChildPartitionsAction;
}

public Optional<ProcessContinuation> run(
Expand Down Expand Up @@ -116,8 +112,7 @@ public Optional<ProcessContinuation> run(
}

LOG.info("Child partitions action completed successfully");
// Needs to hold the watermark until all my children have finished
return waitForChildPartitionsAction.run(partition, tracker, record.getChildPartitions().size());
return Optional.empty();
}

private boolean isSplit(ChildPartition childPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc.actions;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.FINISHED;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.SCHEDULED;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State.CREATED;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
Expand All @@ -47,29 +46,23 @@ public WaitForChildPartitionsAction(

public Optional<ProcessContinuation> run(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
long childPartitionsToWaitFor) {
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker) {
LOG.info("Waiting for child partitions for " + partition.getPartitionToken());

if (!tracker.tryClaim(PartitionPosition.waitForChildPartitions(childPartitionsToWaitFor))) {
if (!tracker.tryClaim(PartitionPosition.waitForChildPartitions())) {
LOG.info("Could not claim, stopping");
return Optional.of(ProcessContinuation.stop());
}
long numberOfFinishedChildren =
long numberOfUnscheduledChildren =
partitionMetadataDao.countChildPartitionsInStates(
partition.getPartitionToken(), Arrays.asList(SCHEDULED, FINISHED));
LOG.info(
"Number of finished children is "
+ numberOfFinishedChildren
+ " and expected children to wait for is "
+ childPartitionsToWaitFor);
if (numberOfFinishedChildren < childPartitionsToWaitFor) {
partition.getPartitionToken(), Collections.singletonList(CREATED));
LOG.info("Number of unscheduled children is " + numberOfUnscheduledChildren);
if (numberOfUnscheduledChildren > 0) {
LOG.info(
"Resuming, not all children are scheduled / finished (only "
+ numberOfFinishedChildren
+ " of "
+ childPartitionsToWaitFor
+ ")");
"Resuming, there are "
+ numberOfUnscheduledChildren
+ " unscheduled / not finished children");

return Optional.of(ProcessContinuation.resume().withResumeDelay(resumeDuration));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,53 +27,42 @@
public class PartitionPosition implements Serializable {

private static final long serialVersionUID = -9088898012221404492L;

private final Optional<Timestamp> maybeTimestamp;
private final PartitionMode mode;
private final Optional<Long> maybeChildPartitionsToWaitFor;

public static PartitionPosition queryChangeStream(Timestamp timestamp) {
return new PartitionPosition(
Optional.of(timestamp), PartitionMode.QUERY_CHANGE_STREAM, Optional.empty());
return new PartitionPosition(Optional.of(timestamp), PartitionMode.QUERY_CHANGE_STREAM);
}

public static PartitionPosition waitForChildPartitions(long childPartitionsToWaitFor) {
return new PartitionPosition(
Optional.empty(),
PartitionMode.WAIT_FOR_CHILD_PARTITIONS,
Optional.of(childPartitionsToWaitFor));
public static PartitionPosition waitForChildPartitions() {
return new PartitionPosition(Optional.empty(), PartitionMode.WAIT_FOR_CHILD_PARTITIONS);
}

public static PartitionPosition finishPartition() {
return new PartitionPosition(
Optional.empty(), PartitionMode.FINISH_PARTITION, Optional.empty());
return new PartitionPosition(Optional.empty(), PartitionMode.FINISH_PARTITION);
}

public static PartitionPosition waitForParentPartitions() {
return new PartitionPosition(
Optional.empty(), PartitionMode.WAIT_FOR_PARENT_PARTITIONS, Optional.empty());
return new PartitionPosition(Optional.empty(), PartitionMode.WAIT_FOR_PARENT_PARTITIONS);
}

public static PartitionPosition deletePartition() {
return new PartitionPosition(
Optional.empty(), PartitionMode.DELETE_PARTITION, Optional.empty());
return new PartitionPosition(Optional.empty(), PartitionMode.DELETE_PARTITION);
}

public static PartitionPosition done() {
return new PartitionPosition(Optional.empty(), PartitionMode.DONE, Optional.empty());
return new PartitionPosition(Optional.empty(), PartitionMode.DONE);
}

public static PartitionPosition stop() {
return new PartitionPosition(Optional.empty(), PartitionMode.STOP, Optional.empty());
return new PartitionPosition(Optional.empty(), PartitionMode.STOP);
}

@VisibleForTesting
protected PartitionPosition(
Optional<Timestamp> maybeTimestamp,
PartitionMode mode,
Optional<Long> maybeChildPartitionsToWaitFor) {
protected PartitionPosition(Optional<Timestamp> maybeTimestamp, PartitionMode mode) {
this.maybeTimestamp = maybeTimestamp;
this.mode = mode;
this.maybeChildPartitionsToWaitFor = maybeChildPartitionsToWaitFor;
}

public Optional<Timestamp> getTimestamp() {
Expand All @@ -84,10 +73,6 @@ public PartitionMode getMode() {
return mode;
}

public Optional<Long> getChildPartitionsToWaitFor() {
return maybeChildPartitionsToWaitFor;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -97,25 +82,16 @@ public boolean equals(Object o) {
return false;
}
PartitionPosition that = (PartitionPosition) o;
return Objects.equals(maybeTimestamp, that.maybeTimestamp)
&& mode == that.mode
&& Objects.equals(maybeChildPartitionsToWaitFor, that.maybeChildPartitionsToWaitFor);
return Objects.equals(maybeTimestamp, that.maybeTimestamp) && mode == that.mode;
}

@Override
public int hashCode() {
return Objects.hash(maybeTimestamp, mode, maybeChildPartitionsToWaitFor);
return Objects.hash(maybeTimestamp, mode);
}

@Override
public String toString() {
return "PartitionPosition{"
+ "maybeTimestamp="
+ maybeTimestamp
+ ", mode="
+ mode
+ ", maybeChildPartitionsToWaitFor="
+ maybeChildPartitionsToWaitFor
+ '}';
return "PartitionPosition{" + "maybeTimestamp=" + maybeTimestamp + ", mode=" + mode + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,47 +37,41 @@ public class PartitionRestriction implements Serializable {
private final Timestamp startTimestamp;
private final Timestamp endTimestamp;
private final PartitionMode mode;
private final Long childPartitionsToWaitFor;

public static PartitionRestriction queryChangeStream(
Timestamp startTimestamp, Timestamp endTimestamp) {
return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM, null);
return new PartitionRestriction(startTimestamp, endTimestamp, QUERY_CHANGE_STREAM);
}

public static PartitionRestriction waitForChildPartitions(Long childPartitionsToWaitFor) {
return new PartitionRestriction(
null, null, WAIT_FOR_CHILD_PARTITIONS, childPartitionsToWaitFor);
public static PartitionRestriction waitForChildPartitions() {
return new PartitionRestriction(null, null, WAIT_FOR_CHILD_PARTITIONS);
}

public static PartitionRestriction finishPartition() {
return new PartitionRestriction(null, null, FINISH_PARTITION, null);
return new PartitionRestriction(null, null, FINISH_PARTITION);
}

public static PartitionRestriction waitForParentPartitions() {
return new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS, null);
return new PartitionRestriction(null, null, WAIT_FOR_PARENT_PARTITIONS);
}

public static PartitionRestriction deletePartition() {
return new PartitionRestriction(null, null, DELETE_PARTITION, null);
return new PartitionRestriction(null, null, DELETE_PARTITION);
}

public static PartitionRestriction done() {
return new PartitionRestriction(null, null, DONE, null);
return new PartitionRestriction(null, null, DONE);
}

public static PartitionRestriction stop() {
return new PartitionRestriction(null, null, STOP, null);
return new PartitionRestriction(null, null, STOP);
}

public PartitionRestriction(
Timestamp startTimestamp,
Timestamp endTimestamp,
PartitionMode mode,
Long childPartitionsToWaitFor) {
Timestamp startTimestamp, Timestamp endTimestamp, PartitionMode mode) {
this.startTimestamp = startTimestamp;
this.endTimestamp = endTimestamp;
this.mode = mode;
this.childPartitionsToWaitFor = childPartitionsToWaitFor;
}

public Timestamp getStartTimestamp() {
Expand All @@ -92,10 +86,6 @@ public PartitionMode getMode() {
return mode;
}

public Long getChildPartitionsToWaitFor() {
return childPartitionsToWaitFor;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -107,13 +97,12 @@ public boolean equals(Object o) {
PartitionRestriction that = (PartitionRestriction) o;
return Objects.equals(startTimestamp, that.startTimestamp)
&& Objects.equals(endTimestamp, that.endTimestamp)
&& mode == that.mode
&& Objects.equals(childPartitionsToWaitFor, that.childPartitionsToWaitFor);
&& mode == that.mode;
}

@Override
public int hashCode() {
return Objects.hash(startTimestamp, endTimestamp, mode, childPartitionsToWaitFor);
return Objects.hash(startTimestamp, endTimestamp, mode);
}

@Override
Expand All @@ -125,8 +114,6 @@ public String toString() {
+ endTimestamp
+ ", mode="
+ mode
+ ", childPartitionsToWaitFor="
+ childPartitionsToWaitFor
+ '}';
}
}
Loading

0 comments on commit fce070f

Please sign in to comment.