Skip to content

Commit

Permalink
feat: fixes issues with the current implementation and refactors (#17)
Browse files Browse the repository at this point in the history
* feat: adds extra log message in detect dofn

* feat: adds token to log mdc in read dofn

* refactor: list of parents as hash set

This should prevent duplicates

* fix: adds the initial partition as parent

Adds the initial (fake) partition as a parent of any of its children.
This is necessary to correctly apply wait for parents / wait for
children.

* refactor: makes factory methods use singletons

Makes the action factory, dao factory and mapper factory use singletons
in order to avoid duplicate instance creation. This might be a
bottleneck in the future, so we will need to analyse the latency here.

* refactor: renames the column ParentToken

To the plural ParentTokens
  • Loading branch information
thiagotnunes authored Jun 23, 2021
1 parent 993b695 commit 7a1f632
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.common.collect.Sets;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
Expand Down Expand Up @@ -136,12 +137,14 @@ public ProcessContinuation processElement(
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
LOG.debug(
String.format("Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));
String.format(
"Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));

currentIndex++;

Instant now = Instant.now();
LOG.debug("Read watermark:" + watermarkEstimator.currentWatermark() + " now:" + now);
LOG.debug("Scheduling partition: " + metadata);
receiver.output(metadata);

// TODO(hengfeng): investigate if we can move this to DAO.
Expand Down Expand Up @@ -175,7 +178,7 @@ public ProcessContinuation processElement(
try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) {
if (resultSet.next() && resultSet.getLong(0) == 0) {
if (!tracker.tryClaim(Long.MAX_VALUE)) {
LOG.warning("Failed to claim the end of range in DetectNewPartitionsDoFn.");
LOG.warn("Failed to claim the end of range in DetectNewPartitionsDoFn.");
}
return ProcessContinuation.stop();
}
Expand All @@ -186,7 +189,7 @@ public ProcessContinuation processElement(
private PartitionMetadata buildPartitionMetadata(ResultSet resultSet) {
return new PartitionMetadata(
resultSet.getString(PartitionMetadataDao.COLUMN_PARTITION_TOKEN),
resultSet.getStringList(PartitionMetadataDao.COLUMN_PARENT_TOKEN),
Sets.newHashSet(resultSet.getStringList(PartitionMetadataDao.COLUMN_PARENT_TOKENS)),
resultSet.getTimestamp(PartitionMetadataDao.COLUMN_START_TIMESTAMP),
resultSet.getBoolean(PartitionMetadataDao.COLUMN_INCLUSIVE_START),
!resultSet.isNull(PartitionMetadataDao.COLUMN_END_TIMESTAMP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_HEARTBEAT_MILLIS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_INCLUSIVE_END;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_INCLUSIVE_START;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_PARENT_TOKEN;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_PARENT_TOKENS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_PARTITION_TOKEN;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_START_TIMESTAMP;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_STATE;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao.COLUMN_UPDATED_AT;

import com.google.api.client.util.Sets;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseAdminClient;
Expand All @@ -36,17 +37,17 @@
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

public class PipelineInitializer {

public static final String DEFAULT_PARENT_PARTITION_TOKEN = "Parent0";
private static final ImmutableList<String> DEFAULT_PARENT_TOKENS = ImmutableList.of();
private static final HashSet<String> DEFAULT_PARENT_TOKENS = Sets.newHashSet();
private static final long DEFAULT_HEARTBEAT_MILLIS = 1000;

// TODO: See if we can get away with not passing in the database id, but the generated table name
Expand All @@ -70,7 +71,7 @@ private static void createMetadataTable(
+ " ("
+ COLUMN_PARTITION_TOKEN
+ " STRING(MAX) NOT NULL,"
+ COLUMN_PARENT_TOKEN
+ COLUMN_PARENT_TOKENS
+ " ARRAY<STRING(MAX)> NOT NULL,"
+ COLUMN_START_TIMESTAMP
+ " TIMESTAMP NOT NULL,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

// TODO: Add java docs
@UnboundedPerElement
Expand Down Expand Up @@ -142,6 +143,7 @@ public ProcessContinuation processElement(
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangesRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
MDC.put("partitionToken", partition.getPartitionToken());
LOG.debug(
"Processing element "
+ partition.getPartitionToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,85 @@
public class ActionFactory implements Serializable {

private static final long serialVersionUID = -4060958761369602619L;
private static DataChangesRecordAction DATA_CHANGES_RECORD_ACTION_INSTANCE;
private static HeartbeatRecordAction HEARTBEAT_RECORD_ACTION_INSTANCE;
private static ChildPartitionsRecordAction CHILD_PARTITIONS_RECORD_ACTION_INSTANCE;
private static FinishPartitionAction FINISH_PARTITION_ACTION_INSTANCE;
private static WaitForChildPartitionsAction WAIT_FOR_CHILD_PARTITIONS_ACTION_INSTANCE;
private static WaitForParentPartitionsAction WAIT_FOR_PARENT_PARTITIONS_ACTION_INSTANCE;
private static DeletePartitionAction DELETE_PARTITION_ACTION_INSTANCE;
private static DonePartitionAction DONE_PARTITION_ACTION_INSTANCE;

public DataChangesRecordAction dataChangesRecordAction() {
return new DataChangesRecordAction();
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized DataChangesRecordAction dataChangesRecordAction() {
if (DATA_CHANGES_RECORD_ACTION_INSTANCE == null) {
DATA_CHANGES_RECORD_ACTION_INSTANCE = new DataChangesRecordAction();
}
return DATA_CHANGES_RECORD_ACTION_INSTANCE;
}

public HeartbeatRecordAction heartbeatRecordAction() {
return new HeartbeatRecordAction();
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized HeartbeatRecordAction heartbeatRecordAction() {
if (HEARTBEAT_RECORD_ACTION_INSTANCE == null) {
HEARTBEAT_RECORD_ACTION_INSTANCE = new HeartbeatRecordAction();
}
return HEARTBEAT_RECORD_ACTION_INSTANCE;
}

public ChildPartitionsRecordAction childPartitionsRecordAction(
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized ChildPartitionsRecordAction childPartitionsRecordAction(
PartitionMetadataDao partitionMetadataDao,
WaitForChildPartitionsAction waitForChildPartitionsAction) {
return new ChildPartitionsRecordAction(partitionMetadataDao, waitForChildPartitionsAction);
if (CHILD_PARTITIONS_RECORD_ACTION_INSTANCE == null) {
CHILD_PARTITIONS_RECORD_ACTION_INSTANCE =
new ChildPartitionsRecordAction(partitionMetadataDao, waitForChildPartitionsAction);
}
return CHILD_PARTITIONS_RECORD_ACTION_INSTANCE;
}

public FinishPartitionAction finishPartitionAction(PartitionMetadataDao partitionMetadataDao) {
return new FinishPartitionAction(partitionMetadataDao);
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized FinishPartitionAction finishPartitionAction(
PartitionMetadataDao partitionMetadataDao) {
if (FINISH_PARTITION_ACTION_INSTANCE == null) {
FINISH_PARTITION_ACTION_INSTANCE = new FinishPartitionAction(partitionMetadataDao);
}
return FINISH_PARTITION_ACTION_INSTANCE;
}

public WaitForChildPartitionsAction waitForChildPartitionsAction(
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized WaitForChildPartitionsAction waitForChildPartitionsAction(
PartitionMetadataDao partitionMetadataDao, Duration resumeDuration) {
return new WaitForChildPartitionsAction(partitionMetadataDao, resumeDuration);
if (WAIT_FOR_CHILD_PARTITIONS_ACTION_INSTANCE == null) {
WAIT_FOR_CHILD_PARTITIONS_ACTION_INSTANCE =
new WaitForChildPartitionsAction(partitionMetadataDao, resumeDuration);
}
return WAIT_FOR_CHILD_PARTITIONS_ACTION_INSTANCE;
}

public WaitForParentPartitionsAction waitForParentPartitionsAction(
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized WaitForParentPartitionsAction waitForParentPartitionsAction(
PartitionMetadataDao partitionMetadataDao, Duration resumeDuration) {
return new WaitForParentPartitionsAction(partitionMetadataDao, resumeDuration);
if (WAIT_FOR_PARENT_PARTITIONS_ACTION_INSTANCE == null) {
WAIT_FOR_PARENT_PARTITIONS_ACTION_INSTANCE =
new WaitForParentPartitionsAction(partitionMetadataDao, resumeDuration);
}
return WAIT_FOR_PARENT_PARTITIONS_ACTION_INSTANCE;
}

public DeletePartitionAction deletePartitionAction(PartitionMetadataDao partitionMetadataDao) {
return new DeletePartitionAction(partitionMetadataDao);
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized DeletePartitionAction deletePartitionAction(
PartitionMetadataDao partitionMetadataDao) {
if (DELETE_PARTITION_ACTION_INSTANCE == null) {
DELETE_PARTITION_ACTION_INSTANCE = new DeletePartitionAction(partitionMetadataDao);
}
return DELETE_PARTITION_ACTION_INSTANCE;
}

public DonePartitionAction donePartitionAction() {
return new DonePartitionAction();
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized DonePartitionAction donePartitionAction() {
if (DONE_PARTITION_ACTION_INSTANCE == null) {
DONE_PARTITION_ACTION_INSTANCE = new DonePartitionAction();
}
return DONE_PARTITION_ACTION_INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
public class DaoFactory implements Serializable {

private static final long serialVersionUID = 7929063669009832487L;
private static PartitionMetadataDao PARTITION_METADATA_DAO_INSTANCE;
private static ChangeStreamDao CHANGE_STREAM_DAO_INSTANCE;

private final String changeStreamName;
private final String partitionMetadataTableName;
Expand All @@ -34,14 +36,24 @@ public DaoFactory(String changeStreamName, String partitionMetadataTableName) {
this.partitionMetadataTableName = partitionMetadataTableName;
}

public PartitionMetadataDao partitionMetadataDaoFrom(SpannerConfig spannerConfig) {
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized PartitionMetadataDao partitionMetadataDaoFrom(SpannerConfig spannerConfig) {
final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
return new PartitionMetadataDao(
this.partitionMetadataTableName, spannerAccessor.getDatabaseClient());
if (PARTITION_METADATA_DAO_INSTANCE == null) {
PARTITION_METADATA_DAO_INSTANCE =
new PartitionMetadataDao(
this.partitionMetadataTableName, spannerAccessor.getDatabaseClient());
}
return PARTITION_METADATA_DAO_INSTANCE;
}

public ChangeStreamDao changeStreamDaoFrom(SpannerConfig spannerConfig) {
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized ChangeStreamDao changeStreamDaoFrom(SpannerConfig spannerConfig) {
final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
return new ChangeStreamDao(this.changeStreamName, spannerAccessor.getDatabaseClient());
if (CHANGE_STREAM_DAO_INSTANCE == null) {
CHANGE_STREAM_DAO_INSTANCE =
new ChangeStreamDao(this.changeStreamName, spannerAccessor.getDatabaseClient());
}
return CHANGE_STREAM_DAO_INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.Value;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
Expand All @@ -36,7 +37,7 @@ public class PartitionMetadataDao {

// Metadata table column names
public static final String COLUMN_PARTITION_TOKEN = "PartitionToken";
public static final String COLUMN_PARENT_TOKEN = "ParentToken";
public static final String COLUMN_PARENT_TOKENS = "ParentTokens";
public static final String COLUMN_START_TIMESTAMP = "StartTimestamp";
public static final String COLUMN_INCLUSIVE_START = "InclusiveStart";
public static final String COLUMN_END_TIMESTAMP = "EndTimestamp";
Expand All @@ -63,7 +64,7 @@ public long countChildPartitionsInStates(
+ " FROM "
+ tableName
+ " WHERE @partition IN UNNEST ("
+ COLUMN_PARENT_TOKEN
+ COLUMN_PARENT_TOKENS
+ ")"
+ " AND "
+ COLUMN_STATE
Expand All @@ -89,7 +90,7 @@ public long countExistingParents(String partitionToken) {
+ COLUMN_PARTITION_TOKEN
+ " IN UNNEST (("
+ " SELECT "
+ COLUMN_PARENT_TOKEN
+ COLUMN_PARENT_TOKENS
+ " FROM "
+ tableName
+ " WHERE "
Expand Down Expand Up @@ -155,7 +156,7 @@ public Void delete(String partitionToken) {
}

public long countPartitionsInStates(
List<String> partitionTokens, List<PartitionMetadata.State> states) {
Set<String> partitionTokens, List<PartitionMetadata.State> states) {
try (final ResultSet resultSet =
transaction.executeQuery(
Statement.newBuilder(
Expand Down Expand Up @@ -183,7 +184,7 @@ private Mutation createInsertMutationFrom(PartitionMetadata partitionMetadata) {
.set(COLUMN_PARTITION_TOKEN)
.to(partitionMetadata.getPartitionToken())
// FIXME: This should be a list of parents
.set(COLUMN_PARENT_TOKEN)
.set(COLUMN_PARENT_TOKENS)
.toStringArray(partitionMetadata.getParentTokens())
.set(COLUMN_START_TIMESTAMP)
.to(partitionMetadata.getStartTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc.mapper;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.PipelineInitializer.DEFAULT_PARENT_PARTITION_TOKEN;

import com.google.cloud.spanner.Struct;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -62,7 +66,7 @@ private Stream<ChangeStreamRecord> toChangeStreamRecord(String partitionToken, S
final Stream<ChildPartitionsRecord> childPartitionsRecords =
row.getStructList("child_partitions_record").stream()
.filter(this::isNonNullChildPartitionsRecord)
.map(this::toChildPartitionsRecord);
.map(struct -> toChildPartitionsRecord(partitionToken, struct));
return Stream.concat(
Stream.concat(dataChangeRecords, heartbeatRecords), childPartitionsRecords);
}
Expand Down Expand Up @@ -103,13 +107,13 @@ private HeartbeatRecord toHeartbeatRecord(Struct row) {
return new HeartbeatRecord(row.getTimestamp("timestamp"));
}

private ChildPartitionsRecord toChildPartitionsRecord(Struct row) {
private ChildPartitionsRecord toChildPartitionsRecord(String partitionToken, Struct row) {
return new ChildPartitionsRecord(
row.getTimestamp("start_timestamp"),
// FIXME: The spec has this as a String, but an int64 is returned
row.getLong("record_sequence") + "",
row.getStructList("child_partitions").stream()
.map(this::childPartitionFrom)
.map(struct -> childPartitionFrom(partitionToken, struct))
.collect(Collectors.toList()));
}

Expand All @@ -128,8 +132,12 @@ private Mod modFrom(Struct struct) {
return new Mod(keys, oldValues, newValues);
}

private ChildPartition childPartitionFrom(Struct struct) {
return new ChildPartition(
struct.getString("token"), struct.getStringList("parent_partition_tokens"));
private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
final HashSet<String> parentTokens =
Sets.newHashSet(struct.getStringList("parent_partition_tokens"));
if (partitionToken.equals(DEFAULT_PARENT_PARTITION_TOKEN)) {
parentTokens.add(partitionToken);
}
return new ChildPartition(struct.getString("token"), parentTokens);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
public class MapperFactory implements Serializable {

private static final long serialVersionUID = -813434573067800902L;
private static ChangeStreamRecordMapper CHANGE_STREAM_RECORD_MAPPER_INSTANCE;

public ChangeStreamRecordMapper changeStreamRecordMapper() {
return new ChangeStreamRecordMapper(new Gson());
// TODO: See if synchronized is a bottleneck and refactor if so
public synchronized ChangeStreamRecordMapper changeStreamRecordMapper() {
if (CHANGE_STREAM_RECORD_MAPPER_INSTANCE == null) {
CHANGE_STREAM_RECORD_MAPPER_INSTANCE = new ChangeStreamRecordMapper(new Gson());
}
return CHANGE_STREAM_RECORD_MAPPER_INSTANCE;
}
}
Loading

0 comments on commit 7a1f632

Please sign in to comment.