Skip to content

Commit

Permalink
refactor: QueryChangeStreamAction (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
zoercai authored Jul 13, 2021
1 parent 1b0163f commit 73d2c3c
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,23 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import com.google.cloud.spanner.ResultSet;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.ChildPartitionsRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.DataChangeRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.DeletePartitionAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.DonePartitionAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.FinishPartitionAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.HeartbeatRecordAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.QueryChangeStreamAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.WaitForChildPartitionsAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.WaitForParentPartitionsAction;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.cdc.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionMode;
import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition;
Expand All @@ -65,16 +60,12 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
private final ActionFactory actionFactory;
private transient ChangeStreamRecordMapper changeStreamRecordMapper;
private transient ChangeStreamDao changeStreamDao;

private transient QueryChangeStreamAction queryChangeStreamAction;
private transient WaitForChildPartitionsAction waitForChildPartitionsAction;
private transient FinishPartitionAction finishPartitionAction;
private transient WaitForParentPartitionsAction waitForParentPartitionsAction;
private transient DeletePartitionAction deletePartitionAction;
private transient DataChangeRecordAction dataChangeRecordAction;
private transient HeartbeatRecordAction heartbeatRecordAction;
private transient ChildPartitionsRecordAction childPartitionsRecordAction;
private transient DonePartitionAction donePartitionAction;

public ReadChangeStreamPartitionDoFn(
Expand Down Expand Up @@ -118,53 +109,28 @@ public PartitionRestrictionTracker newTracker(
@Setup
public void setup() {
final PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
this.changeStreamDao = daoFactory.getChangeStreamDao();
this.changeStreamRecordMapper = mapperFactory.changeStreamRecordMapper();
ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
ChangeStreamRecordMapper changeStreamRecordMapper = mapperFactory.changeStreamRecordMapper();

DataChangeRecordAction dataChangeRecordAction = actionFactory.dataChangeRecordAction();
HeartbeatRecordAction heartbeatRecordAction = actionFactory.heartbeatRecordAction();
ChildPartitionsRecordAction childPartitionsRecordAction =
actionFactory.childPartitionsRecordAction(partitionMetadataDao);

this.queryChangeStreamAction =
actionFactory.queryChangeStreamAction(
changeStreamDao,
changeStreamRecordMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction);
this.waitForChildPartitionsAction =
actionFactory.waitForChildPartitionsAction(partitionMetadataDao, Duration.millis(100));
this.finishPartitionAction = actionFactory.finishPartitionAction(partitionMetadataDao);
this.waitForParentPartitionsAction =
actionFactory.waitForParentPartitionsAction(partitionMetadataDao, Duration.millis(100));
this.deletePartitionAction = actionFactory.deletePartitionAction(partitionMetadataDao);
this.donePartitionAction = actionFactory.donePartitionAction();

this.dataChangeRecordAction = actionFactory.dataChangeRecordAction();
this.heartbeatRecordAction = actionFactory.heartbeatRecordAction();
this.childPartitionsRecordAction =
actionFactory.childPartitionsRecordAction(partitionMetadataDao);
}

// TODO: Close DAOs on teardown

@ProcessElement
public ProcessContinuation processElement(
@Element PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Processing element with restriction " + tracker.currentRestriction());

final PartitionMode mode = tracker.currentRestriction().getMode();
switch (mode) {
case QUERY_CHANGE_STREAM:
return queryChangeStream(partition, tracker, receiver, watermarkEstimator);
case WAIT_FOR_CHILD_PARTITIONS:
return waitForChildPartitions(partition, tracker);
case FINISH_PARTITION:
return finishPartition(partition, tracker);
case WAIT_FOR_PARENT_PARTITIONS:
return waitForParentPartitions(partition, tracker);
case DELETE_PARTITION:
return deletePartition(partition, tracker);
case DONE:
return done(partition, tracker);
default:
// TODO: Verify what to do here
LOG.error("[" + token + "] Unknown mode " + mode);
throw new IllegalArgumentException("Unknown mode " + mode);
}
}

// spotless:off
Expand Down Expand Up @@ -205,57 +171,47 @@ public ProcessContinuation processElement(
*
*/
// spotless:on
private ProcessContinuation queryChangeStream(
PartitionMetadata partition,
// TODO: Close DAOs on teardown
@ProcessElement
public ProcessContinuation processElement(
@Element PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
final String token = partition.getPartitionToken();
try (ResultSet resultSet =
changeStreamDao.changeStreamQuery(
token,
tracker.currentRestriction().getStartTimestamp(),
partition.isInclusiveStart(),
partition.getEndTimestamp(),
partition.isInclusiveEnd(),
partition.getHeartbeatMillis())) {
while (resultSet.next()) {
// TODO: Check what should we do if there is an error here
final List<ChangeStreamRecord> records =
changeStreamRecordMapper.toChangeStreamRecords(
token, resultSet.getCurrentRowAsStruct());
LOG.debug("Mapped records: " + records);

Optional<ProcessContinuation> maybeContinuation;
for (ChangeStreamRecord record : records) {
if (record instanceof DataChangeRecord) {
maybeContinuation =
dataChangeRecordAction.run(
partition, (DataChangeRecord) record, tracker, receiver, watermarkEstimator);
} else if (record instanceof HeartbeatRecord) {
maybeContinuation =
heartbeatRecordAction.run(
partition, (HeartbeatRecord) record, tracker, watermarkEstimator);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
partition, (ChildPartitionsRecord) record, tracker, watermarkEstimator);
} else {
LOG.error("[" + token + "] Unknown record type " + record.getClass());
// FIXME: Check what should we do if the record is unknown
throw new IllegalArgumentException("Unknown record type " + record.getClass());
}
if (maybeContinuation.isPresent()) {
LOG.info("[" + token + "] Continuation present, returning " + maybeContinuation);
return maybeContinuation.get();
}
}
}
LOG.info("[" + token + "] Processing element with restriction " + tracker.currentRestriction());

return waitForChildPartitions(partition, tracker);
final PartitionMode mode = tracker.currentRestriction().getMode();
switch (mode) {
case QUERY_CHANGE_STREAM:
return queryChangeStream(partition, tracker, receiver, watermarkEstimator);
case WAIT_FOR_CHILD_PARTITIONS:
return waitForChildPartitions(partition, tracker);
case FINISH_PARTITION:
return finishPartition(partition, tracker);
case WAIT_FOR_PARENT_PARTITIONS:
return waitForParentPartitions(partition, tracker);
case DELETE_PARTITION:
return deletePartition(partition, tracker);
case DONE:
return done(partition, tracker);
default:
// TODO: Verify what to do here
LOG.error("[" + token + "] Unknown mode " + mode);
throw new IllegalArgumentException("Unknown mode " + mode);
}
}

private ProcessContinuation queryChangeStream(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
return queryChangeStreamAction
.run(partition, tracker, receiver, watermarkEstimator)
.orElseGet(() -> waitForChildPartitions(partition, tracker));
}

private ProcessContinuation waitForChildPartitions(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.beam.sdk.io.gcp.spanner.cdc.actions;

import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.mapper.ChangeStreamRecordMapper;
import org.joda.time.Duration;

// TODO: Add java docs
Expand All @@ -29,6 +31,7 @@ public class ActionFactory implements Serializable {
private static HeartbeatRecordAction heartbeatRecordActionInstance;
private static ChildPartitionsRecordAction childPartitionsRecordActionInstance;
private static FinishPartitionAction finishPartitionActionInstance;
private static QueryChangeStreamAction queryChangeStreamActionInstance;
private static WaitForChildPartitionsAction waitForChildPartitionsActionInstance;
private static WaitForParentPartitionsAction waitForParentPartitionsActionInstance;
private static DeletePartitionAction deletePartitionActionInstance;
Expand Down Expand Up @@ -78,6 +81,25 @@ public synchronized WaitForChildPartitionsAction waitForChildPartitionsAction(
return waitForChildPartitionsActionInstance;
}

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized QueryChangeStreamAction queryChangeStreamAction(
ChangeStreamDao changeStreamDao,
ChangeStreamRecordMapper changeStreamRecordMapper,
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
changeStreamDao,
changeStreamRecordMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction);
}
return queryChangeStreamActionInstance;
}

// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized WaitForParentPartitionsAction waitForParentPartitionsAction(
PartitionMetadataDao partitionMetadataDao, Duration resumeDuration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc.actions;

import com.google.cloud.spanner.ResultSet;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.ChangeStreamRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition;
import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionRestriction;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: Add java docs
public class QueryChangeStreamAction {

private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
private final ChangeStreamDao changeStreamDao;
private final ChangeStreamRecordMapper changeStreamRecordMapper;
private final DataChangeRecordAction dataChangeRecordAction;
private final HeartbeatRecordAction heartbeatRecordAction;
private final ChildPartitionsRecordAction childPartitionsRecordAction;

public QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
ChangeStreamRecordMapper changeStreamRecordMapper,
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction) {
this.changeStreamDao = changeStreamDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
this.dataChangeRecordAction = dataChangeRecordAction;
this.heartbeatRecordAction = heartbeatRecordAction;
this.childPartitionsRecordAction = childPartitionsRecordAction;
}

public Optional<ProcessContinuation> run(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
final String token = partition.getPartitionToken();
try (ResultSet resultSet =
changeStreamDao.changeStreamQuery(
token,
tracker.currentRestriction().getStartTimestamp(),
partition.isInclusiveStart(),
partition.getEndTimestamp(),
partition.isInclusiveEnd(),
partition.getHeartbeatMillis())) {
while (resultSet.next()) {
// TODO: Check what should we do if there is an error here
final List<ChangeStreamRecord> records =
changeStreamRecordMapper.toChangeStreamRecords(
token, resultSet.getCurrentRowAsStruct());
LOG.debug("Mapped records: " + records);

Optional<ProcessContinuation> maybeContinuation;
for (ChangeStreamRecord record : records) {
if (record instanceof DataChangeRecord) {
maybeContinuation =
dataChangeRecordAction.run(
partition, (DataChangeRecord) record, tracker, receiver, watermarkEstimator);
} else if (record instanceof HeartbeatRecord) {
maybeContinuation =
heartbeatRecordAction.run(
partition, (HeartbeatRecord) record, tracker, watermarkEstimator);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
partition, (ChildPartitionsRecord) record, tracker, watermarkEstimator);
} else {
LOG.error("[" + token + "] Unknown record type " + record.getClass());
// FIXME: Check what should we do if the record is unknown
throw new IllegalArgumentException("Unknown record type " + record.getClass());
}
if (maybeContinuation.isPresent()) {
LOG.info("[" + token + "] Continuation present, returning " + maybeContinuation);
return maybeContinuation;
}
}
}
LOG.info("[" + token + "] = Query change stream action completed successfully");
return Optional.empty();
}
}
}
Loading

0 comments on commit 73d2c3c

Please sign in to comment.