Skip to content

Commit

Permalink
feat: add user-specified partition metadata table DB + refactor (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
zoercai authored Jul 12, 2021
1 parent b9fbc97 commit 60a68bb
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.NameGenerator.generateMetadataTableName;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import avro.shaded.com.google.common.base.Objects;
import com.google.auto.value.AutoValue;
import com.google.cloud.ServiceFactory;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.KeySet;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.apache.beam.sdk.io.gcp.spanner.cdc.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.cdc.actions.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -1282,6 +1280,8 @@ public abstract static class ReadChangeStream

abstract String getChangeStreamName();

abstract String getMetadataDatabase();

abstract Timestamp getInclusiveStartAt();

abstract @Nullable Timestamp getInclusiveEndAt();
Expand All @@ -1297,6 +1297,8 @@ abstract static class Builder {

abstract Builder setChangeStreamName(String changeStreamName);

abstract Builder setMetadataDatabase(String metadataDatabase);

abstract Builder setInclusiveStartAt(Timestamp inclusiveStartAt);

abstract Builder setInclusiveEndAt(Timestamp inclusiveEndAt);
Expand Down Expand Up @@ -1349,6 +1351,11 @@ public ReadChangeStream withChangeStreamName(String changeStreamName) {
return toBuilder().setChangeStreamName(changeStreamName).build();
}

/** Specifies the metadata database. */
public ReadChangeStream withMetadataDatabase(String metadataDatabase) {
return toBuilder().setMetadataDatabase(metadataDatabase).build();
}

/** Specifies the time that the change stream should be read from. */
public ReadChangeStream withInclusiveStartAt(Timestamp timestamp) {
return toBuilder().setInclusiveStartAt(timestamp).build();
Expand Down Expand Up @@ -1394,51 +1401,47 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
throw new IllegalArgumentException("Start time cannot be after end time.");
}

final SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(getSpannerConfig());
final DatabaseAdminClient databaseAdminClient = spannerAccessor.getDatabaseAdminClient();
final DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
final DatabaseId databaseId =
final DatabaseId changeStreamDatabaseId =
DatabaseId.of(
getSpannerConfig().getProjectId().get(),
getSpannerConfig().getInstanceId().get(),
getSpannerConfig().getDatabaseId().get());

final String partitionMetadataTableName = generateMetadataTableName(databaseId.getDatabase());

// FIXME: This should be removed and only the dao factory should be used
final PartitionMetadataDao partitionMetadataDao =
new PartitionMetadataDao(partitionMetadataTableName, databaseClient);

// TODO: See if we can remove the metadata table name from the source
final List<ChangeStreamSourceDescriptor> sources = new ArrayList<>();
sources.add(
ChangeStreamSourceDescriptor.of(
getChangeStreamName(),
partitionMetadataTableName,
getInclusiveStartAt(),
getInclusiveEndAt()));
// FIXME: This should come from the generated table name
final String partitionMetadataDatabaseId =
Objects.firstNonNull(getMetadataDatabase(), changeStreamDatabaseId.getDatabase());
final String partitionMetadataTableName =
generatePartitionMetadataTableName(partitionMetadataDatabaseId);

final SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
final SpannerConfig partitionMetadataSpannerConfig =
SpannerConfig.create()
.withProjectId(changeStreamSpannerConfig.getProjectId())
.withInstanceId(changeStreamSpannerConfig.getInstanceId())
.withDatabaseId(partitionMetadataDatabaseId)
.withCommitDeadline(changeStreamSpannerConfig.getCommitDeadline())
.withEmulatorHost(changeStreamSpannerConfig.getEmulatorHost())
.withMaxCumulativeBackoff(changeStreamSpannerConfig.getMaxCumulativeBackoff());
final DaoFactory daoFactory =
new DaoFactory(getChangeStreamName(), partitionMetadataTableName);
final MapperFactory mapperFactory = new MapperFactory();
final ActionFactory actionFactory = new ActionFactory();
// FIXME: We should use the DAOFactory here instead of passing in the table name
new DaoFactory(
changeStreamSpannerConfig,
getChangeStreamName(),
partitionMetadataSpannerConfig,
partitionMetadataTableName);

final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(getSpannerConfig(), partitionMetadataTableName);
new DetectNewPartitionsDoFn(daoFactory);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(
getSpannerConfig(), daoFactory, mapperFactory, actionFactory);
new ReadChangeStreamPartitionDoFn(daoFactory, new MapperFactory(), new ActionFactory());
final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn();

// FIXME: Remove the partitionMetadataDAO as a parameter
// TODO: See if we can have a DAO for the admin operations
PipelineInitializer.initialize(
databaseAdminClient,
partitionMetadataDao,
databaseId,
partitionMetadataTableName,
daoFactory.getPartitionMetadataAdminDao(),
daoFactory.getPartitionMetadataDao(),
getInclusiveStartAt(),
getInclusiveEndAt());
final List<ChangeStreamSourceDescriptor> sources = new ArrayList<>();
sources.add(
ChangeStreamSourceDescriptor.of(
getChangeStreamName(), getInclusiveStartAt(), getInclusiveEndAt()));
return input
.apply("Generate change stream sources", Create.of(sources))
.apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public abstract class ChangeStreamSourceDescriptor implements Serializable {
abstract String getChangeStreamName();

abstract String getMetadataTableName();
abstract String getChangeStreamName();

abstract @Nullable Timestamp getStartAt();

abstract @Nullable Timestamp getEndAt();

public static ChangeStreamSourceDescriptor of(
String changeStreamName, String metadataTableName, Timestamp startAt, Timestamp endAt) {
return new AutoValue_ChangeStreamSourceDescriptor(
changeStreamName, metadataTableName, startAt, endAt);
String changeStreamName, Timestamp startAt, Timestamp endAt) {
return new AutoValue_ChangeStreamSourceDescriptor(changeStreamName, startAt, endAt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITIONS_DETECTED_COUNTER;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITION_CREATED_TO_SCHEDULED_MS;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.cdc.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata.State;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
Expand Down Expand Up @@ -58,23 +56,19 @@
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class DetectNewPartitionsDoFn extends DoFn<ChangeStreamSourceDescriptor, PartitionMetadata> {

private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsDoFn.class);
private SpannerAccessor spannerAccessor;
private DatabaseClient databaseClient;
private SpannerConfig spannerConfig;
// TODO(hengfeng): Make this field configurable via constructor or spanner config.
private Duration resumeDuration = Duration.millis(100L);;
private String metadataTableName;
private PartitionMetadataDao partitionMetadataDao;
private Duration resumeDuration = Duration.millis(100L);
private final DaoFactory daoFactory;
private transient PartitionMetadataDao partitionMetadataDao;

public DetectNewPartitionsDoFn(SpannerConfig config, String metadataTableName) {
this.spannerConfig = config;
this.metadataTableName = metadataTableName;
public DetectNewPartitionsDoFn(DaoFactory daoFactory) {
this.daoFactory = daoFactory;
}

public DetectNewPartitionsDoFn(
SpannerConfig config, String metadataTableName, Duration resumeDuration) {
this(config, metadataTableName);
public DetectNewPartitionsDoFn(DaoFactory daoFactory, Duration resumeDuration) {
this(daoFactory);
this.resumeDuration = resumeDuration;
}

Expand Down Expand Up @@ -102,16 +96,8 @@ public OffsetRangeTracker restrictionTracker(
}

@Setup
public void setup() throws Exception {
this.spannerAccessor = SpannerAccessor.getOrCreate(this.spannerConfig);
this.databaseClient = spannerAccessor.getDatabaseClient();
this.partitionMetadataDao =
new PartitionMetadataDao(this.metadataTableName, this.databaseClient);
}

@Teardown
public void teardown() throws Exception {
this.spannerAccessor.close();
public void setup() {
this.partitionMetadataDao = daoFactory.getPartitionMetadataDao();
}

@ProcessElement
Expand All @@ -120,16 +106,10 @@ public ProcessContinuation processElement(
RestrictionTracker<OffsetRange, Long> tracker,
WatermarkEstimator watermarkEstimator,
OutputReceiver<PartitionMetadata> receiver) {

Instant start = Instant.now();

LOG.debug("Calling process element:" + start);

// Find all records where their states are CREATED.
// TODO(hengfeng): move this to DAO.
String query =
String.format("SELECT * FROM `%s` WHERE State = 'CREATED'", this.metadataTableName);
try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) {
try (ResultSet resultSet = partitionMetadataDao.getPartitionsInState(State.CREATED)) {
long currentIndex = tracker.currentRestriction().getFrom();

// Output the records.
Expand All @@ -151,7 +131,6 @@ public ProcessContinuation processElement(
new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis())
.getMillis());
}

LOG.debug(
String.format(
"Get partition metadata currentIndex:%d meta:%s", currentIndex, metadata));
Expand All @@ -163,41 +142,18 @@ public ProcessContinuation processElement(
LOG.info("Scheduling partition: " + metadata);
receiver.output(metadata);

// TODO(hengfeng): investigate if we can move this to DAO.
this.databaseClient
.readWriteTransaction()
.run(
transaction -> {
// Update the record to SCHEDULED.
// TODO(hengfeng): use mutations instead.
Statement updateStatement =
Statement.newBuilder(
String.format(
"UPDATE `%s` "
+ "SET State = 'SCHEDULED' "
+ "WHERE PartitionToken = @PartitionToken",
this.metadataTableName))
.bind("PartitionToken")
.to(metadata.getPartitionToken())
.build();
transaction.executeUpdate(updateStatement);
LOG.debug("Updated the record:" + metadata.getPartitionToken());
return null;
});
partitionMetadataDao.updateState(metadata.getPartitionToken(), State.SCHEDULED);
LOG.debug("Updated the record:" + metadata.getPartitionToken());
}
}

// If there are no partitions in the table, we should stop this SDF
// function.
// TODO(hengfeng): move this query to DAO.
query = String.format("SELECT COUNT(*) FROM `%s`", this.metadataTableName);
try (ResultSet resultSet = this.databaseClient.singleUse().executeQuery(Statement.of(query))) {
if (resultSet.next() && resultSet.getLong(0) == 0) {
if (!tracker.tryClaim(tracker.currentRestriction().getTo() - 1)) {
LOG.warn("Failed to claim the end of range in DetectNewPartitionsDoFn.");
}
return ProcessContinuation.stop();
// If there are no partitions in the table, we should stop this SDF function.
long numOfPartitions = partitionMetadataDao.countPartitions();
if (numOfPartitions == 0) {
if (!tracker.tryClaim(tracker.currentRestriction().getTo() - 1)) {
LOG.warn("Failed to claim the end of range in DetectNewPartitionsDoFn.");
}
return ProcessContinuation.stop();
}
return ProcessContinuation.resume().withResumeDelay(resumeDuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class NameGenerator {
private static final String METADATA_TABLE_NAME_FORMAT = "CDC_Partitions_%s_%s";

// TODO: Add java docs
public static String generateMetadataTableName(String databaseId) {
public static String generatePartitionMetadataTableName(String databaseId) {
// Maximum Spanner table name length is 128 characters.
// There are 16 characters in the name format.
// Maximum Spanner database ID length is 30 characters.
Expand Down
Loading

0 comments on commit 60a68bb

Please sign in to comment.