Skip to content

Commit

Permalink
feat(spannerio): add ReadChangeStreamPartitionDoFn (#10)
Browse files Browse the repository at this point in the history
* feat: skeleton implementation of read fn

Adds skeleton implementation of the ReadChangeStreamPartitionDoFn with a
simple test.

* feat: adds initial impl for read component

Adds initial implementation for read change stream partition component,
along with an initial test and a test plan.

* test: adds unit test for record mapping

Adds unit test for mapping a struct to a data changes record.

* refactor: refactor tests

* feat: set the correct bounds for consuming SDF

Initialises the offset range based on the start and end timestamp of the
partition record.

* feat: process heartbeat records

Adds heartbeat record processing to the read change stream partition
dofn. The only thing necessary here is to update the watermark.

* chore: add package info to dao and mapper packages

* feat: add simple implementation on child partition

Adds the first implementation for child partitions, which just updates
the watermark based on the start timestamp of the record.

* feat: updates metadata table on child split

* feat: add change stream restriction tracker

Creates custom implementation of change stream restriction tracker which
tracks the timestamp position as well as the mode that should be
executed.

* refactor: add dao impls for change streams

Adds method to run change stream queries and update the partition
metadata table within a transaction.

* test: narrows unit tests for change stream fn

Unit tests only the process element method from the dofn. This way we
can verify updates to the restriction tracker and watermark estimator.

* test: verify tracker / watermark in tests

Validates that the restriction tracker and watermark were used to update
the correct state in the change streams unit tests.

* feat: add DELETE_PARTITION mode and refactor test

* test: complete unit test for child partition split

Adds assertions for waiting for children, waiting for parents and
deleting the current partition.

* feat: adds possible state transition

From partition query to wait for parents, since in all cases we will
need to wait for parents before terminating.

* feat: update termination of data / hearbeat record

Waits for parents to be deleted before terminating after a data /
hearbeat records.
On termination, marks itself as finished and deletes itself from the
metadata table.

* feat: moves the finishing of a partition state

To after a result set is consumed. This was not the case for the child
partitions, which finished before waiting for the children.

* refactor: refactors split case

* chore: removes unused file

* refactor: decompose read cdc dofn

Decomposes read change stream partition do fn into several actions.

* feat: process child partitions independently

Processes each record within a child partition record independently.

* feat: implement merge all parents finished

Implements child partition record case for merge when all parents are
finished.

* test: add test for merge one parent not finished

* test: refactor tests for read dofn

* tests: test termination case in read dofn

* feat: execute based on mode in read dofn

Does different things based on the partition mode that is currently
saved in the restriction.

* refactor: organize partition metadata dao

* refactor: refactors tests

* feat: implements change stream dao

* fix: fix restriction tracker to update correctly

The restriction tracker was not updating the restriction on each
tryClaim call. This commit fixes this behaviour.

* refactor: minor refactor in read dofn

* refactor: undo modifications to SpannerConfig

* chore: removes solved TODOs

* chore: fixes build violations

* docs: adds a few comments / todos

* feat: handles initial partition

Handles the initial partition which should be mapped to null in the
change stream query.

* fix: fixes the record mapping

Fixes the record mapping according to what is being returned by the
change stream query.

* feat: parameterises change stream query

Uses named params and parameterises the change stream query.

* docs: adds state graph for dofn

Adds documentation for the state graph on the Read Change Stream
Partition DoFn.

* docs: add TODOs / FIXMEs for missing functionality

* Remove linting (#14)

* feat: adds it test for read dofn

Here we had to change several things:

- We had to fix the restriction tracker checkDone() method, not to
  require the done partition mode. This is because this method gets
  called when we resume from a previous mode as well.
- We had to fix the heartbeat parameter for the change stream dao. It is
  now specified as millis instead of seconds.
- We added debug logging to all the actions.
- We fixed a bug in the restriction tracker allowing the mode change
  from wait_for_child_partitions to finish_partition (it was missing).

Co-authored-by: Zoe <zoc@google.com>
  • Loading branch information
thiagotnunes and zoercai authored Jun 21, 2021
1 parent 11e3c9e commit 89c13b8
Show file tree
Hide file tree
Showing 49 changed files with 4,105 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@
import org.apache.beam.sdk.io.gcp.spanner.cdc.ChangeStreamSourceDescriptor;
import org.apache.beam.sdk.io.gcp.spanner.cdc.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.PipelineInitializer;
import org.apache.beam.sdk.io.gcp.spanner.cdc.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.cdc.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangesRecord;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down Expand Up @@ -1272,7 +1277,7 @@ static byte[] encode(MutationGroup g) {

@AutoValue
public abstract static class ReadChangeStream
extends PTransform<PBegin, PCollection<PartitionMetadata>> {
extends PTransform<PBegin, PCollection<DataChangesRecord>> {

abstract SpannerConfig getSpannerConfig();

Expand All @@ -1284,9 +1289,6 @@ public abstract static class ReadChangeStream

abstract @Nullable Deserializer getDeserializer();

// TODO(hengfeng): Remove this when we can write the real IT test for the connector.
abstract @Nullable String getTestMetadataTable();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -1302,8 +1304,6 @@ abstract static class Builder {

abstract Builder setDeserializer(Deserializer deserializer);

abstract Builder setTestMetadataTable(String metadataTable);

abstract ReadChangeStream build();
}

Expand Down Expand Up @@ -1360,11 +1360,6 @@ public ReadChangeStream withExclusiveEndAt(Timestamp timestamp) {
return toBuilder().setExclusiveEndAt(timestamp).build();
}

/** Specifies the change stream metadata table name for testing purpose. */
public ReadChangeStream withTestMetadataTable(String metadataTable) {
return toBuilder().setTestMetadataTable(metadataTable).build();
}

/**
* Specifies the class to be used to transform the data records read from the change stream into
* Java objects or other serial formats.
Expand All @@ -1374,7 +1369,7 @@ public ReadChangeStream withDeserializer(Deserializer deserializer) {
}

@Override
public PCollection<PartitionMetadata> expand(PBegin input) {
public PCollection<DataChangesRecord> expand(PBegin input) {
checkArgument(
getSpannerConfig() != null,
"SpannerIO.readChangeStream() requires the spanner config to be set.");
Expand All @@ -1400,47 +1395,62 @@ && getInclusiveStartAt().toSqlTimestamp().after(getExclusiveEndAt().toSqlTimesta
throw new IllegalArgumentException("Start time cannot be after end time.");
}

SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(getSpannerConfig());
DatabaseAdminClient databaseAdminClient = spannerAccessor.getDatabaseAdminClient();
DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
Database changeStreamsDb =
databaseAdminClient.getDatabase(
getSpannerConfig().getInstanceId().get(), getSpannerConfig().getDatabaseId().get());

DatabaseId databaseId =
final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(getSpannerConfig());
final DatabaseAdminClient databaseAdminClient = spannerAccessor.getDatabaseAdminClient();
final DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
final DatabaseId databaseId =
DatabaseId.of(
getSpannerConfig().getProjectId().get(),
getSpannerConfig().getInstanceId().get(),
getSpannerConfig().getDatabaseId().get());
String partitionMetadataTableName = generateMetadataTableName(databaseId.getDatabase());
if (!getTestMetadataTable().isEmpty()) {
partitionMetadataTableName = getTestMetadataTable();
}

PartitionMetadataDao partitionMetadataDao =
new PartitionMetadataDao(databaseClient, partitionMetadataTableName);
PipelineInitializer.initialize(
databaseAdminClient,
partitionMetadataDao,
databaseId,
getInclusiveStartAt(),
getExclusiveEndAt());
final String partitionMetadataTableName = generateMetadataTableName(databaseId.getDatabase());

List<ChangeStreamSourceDescriptor> sources = new ArrayList<>();
// FIXME: This should be removed and only the dao factory should be used
final PartitionMetadataDao partitionMetadataDao =
new PartitionMetadataDao(partitionMetadataTableName, databaseClient);

// TODO: See if we can remove the metadata table name from the source
final List<ChangeStreamSourceDescriptor> sources = new ArrayList<>();
sources.add(
ChangeStreamSourceDescriptor.of(
getChangeStreamName(),
partitionMetadataTableName,
getInclusiveStartAt(),
getExclusiveEndAt()));

// FIXME: This should come from the generated table name
final DaoFactory daoFactory = new DaoFactory(
getChangeStreamName(),
partitionMetadataTableName
);
final MapperFactory mapperFactory = new MapperFactory();
final ActionFactory actionFactory = new ActionFactory();
// FIXME: We should use the DAOFactory here instead of passing in the table name
final DetectNewPartitionsDoFn detectNewPartitionsDoFn = new DetectNewPartitionsDoFn(
getSpannerConfig(),
partitionMetadataTableName);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(
getSpannerConfig(),
daoFactory,
mapperFactory,
actionFactory);

// FIXME: Remove the partitionMetadataDAO as a parameter
// TODO: See if we can have a DAO for the admin operations
PipelineInitializer.initialize(
databaseAdminClient,
partitionMetadataDao,
databaseId,
partitionMetadataTableName,
getInclusiveStartAt(),
getExclusiveEndAt());
return input
.apply("Generate change stream sources", Create.of(sources))
.apply(
"Detect new partitions",
ParDo.of(
new DetectNewPartitionsDoFn(getSpannerConfig(), partitionMetadataTableName)));
// .apply(new ReadPartitionChangeStreamDoFn());
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
.apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn));

// TODO: We need to perform cleanup after everything has terminated (delete metadata table)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,42 @@ public DetectNewPartitionsDoFn(
this.resumeDuration = resumeDuration;
}

@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
return currentElementTimestamp;
}

@NewWatermarkEstimator
public WatermarkEstimator<Instant> newWatermarkEstimator(
@WatermarkEstimatorState Instant watermarkEstimatorState) {
return new MonotonicallyIncreasing(watermarkEstimatorState);
}

@GetInitialRestriction
public OffsetRange initialRestriction(@Element ChangeStreamSourceDescriptor inputElement) {
return new OffsetRange(0, Long.MAX_VALUE);
}

// TODO: Remove @Element parameter if it is not needed
@NewTracker
public OffsetRangeTracker restrictionTracker(
@Element ChangeStreamSourceDescriptor inputElement, @Restriction OffsetRange restriction) {
return new OffsetRangeTracker(new OffsetRange(restriction.getFrom(), Long.MAX_VALUE));
}

@Setup
public void setup() throws Exception {
this.spannerAccessor = SpannerAccessor.getOrCreate(this.spannerConfig);
this.databaseClient = spannerAccessor.getDatabaseClient();
this.partitionMetadataDao =
new PartitionMetadataDao(this.metadataTableName, this.databaseClient);
}

@Teardown
public void teardown() throws Exception {
this.spannerAccessor.close();
}

@ProcessElement
public ProcessContinuation processElement(
@Element ChangeStreamSourceDescriptor inputElement,
Expand All @@ -93,7 +118,7 @@ public ProcessContinuation processElement(

Instant start = Instant.now();

LOG.info("Calling process element:" + start);
LOG.debug("Calling process element:" + start);

// Find all records where their states are CREATED.
// TODO(hengfeng): move this to DAO.
Expand All @@ -106,18 +131,18 @@ public ProcessContinuation processElement(
// Output the records.
while (resultSet.next()) {
// TODO(hengfeng): change the log level in this file.
LOG.info("Reading record currentIndex:" + currentIndex);
LOG.debug("Reading record currentIndex:" + currentIndex);
if (!tracker.tryClaim(currentIndex)) {
return ProcessContinuation.stop();
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
LOG.info(
LOG.debug(
String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));

currentIndex++;

Instant now = Instant.now();
LOG.info("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now);
LOG.debug("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now);
receiver.output(metadata);

// TODO(hengfeng): investigate if we can move this to DAO.
Expand All @@ -138,7 +163,7 @@ public ProcessContinuation processElement(
.to(metadata.getPartitionToken())
.build();
transaction.executeUpdate(updateStatement);
LOG.info("Updated the record:" + metadata.getPartitionToken());
LOG.debug("Updated the record:" + metadata.getPartitionToken());
return null;
});
}
Expand All @@ -147,30 +172,6 @@ public ProcessContinuation processElement(
return ProcessContinuation.resume().withResumeDelay(resumeDuration);
}

@Setup
public void setup() throws Exception {
this.spannerAccessor = SpannerAccessor.getOrCreate(this.spannerConfig);
this.databaseClient = spannerAccessor.getDatabaseClient();
this.partitionMetadataDao =
new PartitionMetadataDao(this.databaseClient, this.metadataTableName);
}

@Teardown
public void teardown() throws Exception {
this.spannerAccessor.close();
}

@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
return currentElementTimestamp;
}

@NewWatermarkEstimator
public WatermarkEstimator<Instant> newWatermarkEstimator(
@WatermarkEstimatorState Instant watermarkEstimatorState) {
return new MonotonicallyIncreasing(watermarkEstimatorState);
}

private PartitionMetadata buildPartitionMetadata(ResultSet resultSet) {
return new PartitionMetadata(
resultSet.getString(PartitionMetadataDao.COLUMN_PARTITION_TOKEN),
Expand All @@ -181,7 +182,7 @@ private PartitionMetadata buildPartitionMetadata(ResultSet resultSet) {
? resultSet.getTimestamp(PartitionMetadataDao.COLUMN_END_TIMESTAMP)
: null,
resultSet.getBoolean(PartitionMetadataDao.COLUMN_INCLUSIVE_END),
resultSet.getLong(PartitionMetadataDao.COLUMN_HEARTBEAT_SECONDS),
resultSet.getLong(PartitionMetadataDao.COLUMN_HEARTBEAT_MILLIS),
PartitionMetadata.State.valueOf(resultSet.getString(PartitionMetadataDao.COLUMN_STATE)),
resultSet.getTimestamp(PartitionMetadataDao.COLUMN_CREATED_AT),
resultSet.getTimestamp(PartitionMetadataDao.COLUMN_UPDATED_AT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import java.util.UUID;

// TODO: Add java docs
public class NameGenerator {

private static final String METADATA_TABLE_NAME_FORMAT = "CDC_Partitions_%s_%s";

// TODO: Add java docs
public static String generateMetadataTableName(String databaseId) {
// Maximum Spanner table name length is 128 characters.
// There are 16 characters in the name format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_CREATED_AT;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_END_TIMESTAMP;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_HEARTBEAT_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_HEARTBEAT_MILLIS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_INCLUSIVE_END;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_INCLUSIVE_START;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_PARENT_TOKEN;
Expand All @@ -34,7 +34,6 @@
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Value;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
Expand All @@ -46,17 +45,19 @@

public class PipelineInitializer {

private static final String DEFAULT_PARENT_PARTITION_TOKEN = "Parent0";
public static final String DEFAULT_PARENT_PARTITION_TOKEN = "Parent0";
private static final ImmutableList<String> DEFAULT_PARENT_TOKENS = ImmutableList.of();
private static final long DEFAULT_HEARTBEAT_SECONDS = 1;
private static final long DEFAULT_HEARTBEAT_MILLIS = 1000;

// TODO: See if we can get away with not passing in the database id, but the generated table name instead
public static void initialize(
DatabaseAdminClient databaseAdminClient,
PartitionMetadataDao partitionMetadataDao,
DatabaseId id,
String partitionMetadataTableName,
Timestamp inclusiveStartAt,
@Nullable Timestamp exclusiveEndAt) {
createMetadataTable(databaseAdminClient, id, partitionMetadataDao.getTableName());
createMetadataTable(databaseAdminClient, id, partitionMetadataTableName);
createFakeParentPartition(partitionMetadataDao, inclusiveStartAt, exclusiveEndAt);
}

Expand All @@ -78,7 +79,7 @@ private static void createMetadataTable(
+ " TIMESTAMP,"
+ COLUMN_INCLUSIVE_END
+ " BOOL,"
+ COLUMN_HEARTBEAT_SECONDS
+ COLUMN_HEARTBEAT_MILLIS
+ " INT64 NOT NULL,"
+ COLUMN_STATE
+ " STRING(MAX) NOT NULL,"
Expand Down Expand Up @@ -110,17 +111,14 @@ private static void createFakeParentPartition(
PartitionMetadataDao partitionMetadataDao,
Timestamp inclusiveStartAt,
@Nullable Timestamp exclusiveEndAt) {
PartitionMetadata parentPartition =
PartitionMetadata.newBuilder()
.setPartitionToken(DEFAULT_PARENT_PARTITION_TOKEN)
.setParentTokens(DEFAULT_PARENT_TOKENS)
.setStartTimestamp(inclusiveStartAt)
.setEndTimestamp(exclusiveEndAt)
.setHeartbeatSeconds(DEFAULT_HEARTBEAT_SECONDS)
.setState(State.CREATED)
.setCreatedAt(Value.COMMIT_TIMESTAMP)
.setUpdatedAt(Value.COMMIT_TIMESTAMP)
.build();
PartitionMetadata parentPartition = PartitionMetadata.newBuilder()
.setPartitionToken(DEFAULT_PARENT_PARTITION_TOKEN)
.setParentTokens(DEFAULT_PARENT_TOKENS)
.setStartTimestamp(inclusiveStartAt)
.setEndTimestamp(exclusiveEndAt)
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
.setState(State.CREATED)
.build();
partitionMetadataDao.insert(parentPartition);
}
}
Loading

0 comments on commit 89c13b8

Please sign in to comment.