Skip to content

Commit

Permalink
feat: metrics for read latency and logging (#31)
Browse files Browse the repository at this point in the history
* feat: add new data record read metrics

Adds the following metrics:

- Initial partition from created to scheduled state time (in ms)
- Data record commit timestamp to read time (in ms)
- Data record read time to emitted time (in ms)

* feat: refine logging

Adds the partition token to all possible logging. Add temporary metric
of data record count as well.

* chore: spotless apply
  • Loading branch information
thiagotnunes authored Jul 12, 2021
1 parent fce070f commit b9fbc97
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
public class CdcMetrics {

public static final Counter PARTITIONS_DETECTED_COUNTER =
Metrics.counter(ReadChangeStream.class, "partitions_detected");
Metrics.counter(ReadChangeStream.class, "partitions_detected_count");

public static final Counter PARTITION_SPLIT_COUNTER =
Metrics.counter(ReadChangeStream.class, "partition_split_count");

public static final Counter PARTITION_MERGE_COUNTER =
Metrics.counter(ReadChangeStream.class, "partition_merge_count");

public static final Distribution INITIAL_PARTITION_CREATED_TO_SCHEDULED_MS =
Metrics.distribution(ReadChangeStream.class, "initial_partition_created_to_scheduled_ms");

public static final Distribution PARTITION_CREATED_TO_SCHEDULED_MS =
Metrics.distribution(ReadChangeStream.class, "partition_created_to_scheduled_ms");

Expand All @@ -41,6 +44,15 @@ public class CdcMetrics {
Metrics.distribution(
ReadChangeStream.class, "watermark_to_latest_record_commit_timestamp_ms");

public static final Distribution RECORD_COMMIT_TIMESTAMP_TO_READ_MS =
Metrics.distribution(ReadChangeStream.class, "record_commit_timestamp_to_read_ms");

public static final Distribution RECORD_READ_TO_EMITTED_MS =
Metrics.distribution(ReadChangeStream.class, "record_read_to_emitted_ms");

public static final Distribution RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS =
Metrics.distribution(ReadChangeStream.class, "record_commit_timestamp_to_emitted_ms");

public static final Counter DATA_RECORD_COUNTER =
Metrics.counter(ReadChangeStream.class, "data_record_count");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.INITIAL_PARTITION_CREATED_TO_SCHEDULED_MS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITIONS_DETECTED_COUNTER;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.PARTITION_CREATED_TO_SCHEDULED_MS;

Expand Down Expand Up @@ -141,9 +142,15 @@ public ProcessContinuation processElement(
return ProcessContinuation.stop();
}
PartitionMetadata metadata = buildPartitionMetadata(resultSet);
PARTITION_CREATED_TO_SCHEDULED_MS.update(
new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis())
.getMillis());
if (InitialPartition.isInitialPartition(metadata.getPartitionToken())) {
INITIAL_PARTITION_CREATED_TO_SCHEDULED_MS.update(
new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis())
.getMillis());
} else {
PARTITION_CREATED_TO_SCHEDULED_MS.update(
new Duration(metadata.getCreatedAt().toDate().getTime(), Instant.now().getMillis())
.getMillis());
}

LOG.debug(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.cdc;

import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.DATA_RECORD_COUNTER;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_COMMIT_TIMESTAMP_TO_READ_MS;
import static org.apache.beam.sdk.io.gcp.spanner.cdc.CdcMetrics.RECORD_READ_TO_EMITTED_MS;

import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord.Metadata;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand All @@ -31,11 +35,17 @@ public class PostProcessingMetricsDoFn extends DoFn<DataChangeRecord, DataChange
@ProcessElement
public void processElement(
@Element DataChangeRecord dataChangeRecord, OutputReceiver<DataChangeRecord> receiver) {
RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS.update(
new Duration(
new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime()),
new Instant())
.getMillis());
receiver.output(dataChangeRecord);
final Instant now = new Instant();
final Metadata metadata = dataChangeRecord.getMetadata();
final Instant readInstant = new Instant(metadata.getReadAt().toSqlTimestamp().getTime());
final Instant commitInstant =
new Instant(dataChangeRecord.getCommitTimestamp().toSqlTimestamp().getTime());

RECORD_COMMIT_TIMESTAMP_TO_READ_MS.update(new Duration(commitInstant, readInstant).getMillis());
RECORD_READ_TO_EMITTED_MS.update(new Duration(readInstant, now).getMillis());
RECORD_COMMIT_TIMESTAMP_TO_EMITTED_MS.update(new Duration(commitInstant, now).getMillis());
DATA_RECORD_COUNTER.inc();

receiver.outputWithTimestamp(dataChangeRecord, commitInstant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

// TODO: Add java docs
@UnboundedPerElement
Expand Down Expand Up @@ -92,24 +91,33 @@ public ReadChangeStreamPartitionDoFn(
}

@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
public Instant getInitialWatermarkEstimatorState(
@Element PartitionMetadata partition, @Timestamp Instant currentElementTimestamp) {
LOG.info("[" + partition.getPartitionToken() + "] Get initial watermark estimator");
return currentElementTimestamp;
}

@NewWatermarkEstimator
public ManualWatermarkEstimator<Instant> newWatermarkEstimator(
@Element PartitionMetadata partition,
@WatermarkEstimatorState Instant watermarkEstimatorState) {
LOG.info("[" + partition.getPartitionToken() + "] New watermark estimator");
return new Manual(watermarkEstimatorState);
}

@GetInitialRestriction
public PartitionRestriction initialRestriction(@Element PartitionMetadata element) {
public PartitionRestriction initialRestriction(@Element PartitionMetadata partition) {
LOG.info("[" + partition.getPartitionToken() + "] Initial restriction");
return new PartitionRestriction(
element.getStartTimestamp(), element.getEndTimestamp(), PartitionMode.QUERY_CHANGE_STREAM);
partition.getStartTimestamp(),
partition.getEndTimestamp(),
PartitionMode.QUERY_CHANGE_STREAM);
}

@NewTracker
public PartitionRestrictionTracker newTracker(@Restriction PartitionRestriction restriction) {
public PartitionRestrictionTracker newTracker(
@Element PartitionMetadata partition, @Restriction PartitionRestriction restriction) {
LOG.info("[" + partition.getPartitionToken() + "] New tracker");
return new PartitionRestrictionTracker(restriction);
}

Expand Down Expand Up @@ -142,14 +150,11 @@ public ProcessContinuation processElement(
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
MDC.put("partitionToken", partition.getPartitionToken());
LOG.info(
"Processing element "
+ partition.getPartitionToken()
+ " with restriction "
+ tracker.currentRestriction());
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Processing element with restriction " + tracker.currentRestriction());

switch (tracker.currentRestriction().getMode()) {
final PartitionMode mode = tracker.currentRestriction().getMode();
switch (mode) {
case QUERY_CHANGE_STREAM:
return queryChangeStream(partition, tracker, receiver, watermarkEstimator);
case WAIT_FOR_CHILD_PARTITIONS:
Expand All @@ -164,8 +169,8 @@ public ProcessContinuation processElement(
return done(partition, tracker);
default:
// TODO: Verify what to do here
throw new IllegalArgumentException(
"Unknown mode " + tracker.currentRestriction().getMode());
LOG.error("[" + token + "] Unknown mode " + mode);
throw new IllegalArgumentException("Unknown mode " + mode);
}
}

Expand Down Expand Up @@ -212,9 +217,10 @@ private ProcessContinuation queryChangeStream(
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
final String token = partition.getPartitionToken();
try (ResultSet resultSet =
changeStreamDao.changeStreamQuery(
partition.getPartitionToken(),
token,
tracker.currentRestriction().getStartTimestamp(),
partition.isInclusiveStart(),
partition.getEndTimestamp(),
Expand All @@ -224,27 +230,30 @@ private ProcessContinuation queryChangeStream(
// TODO: Check what should we do if there is an error here
final List<ChangeStreamRecord> records =
changeStreamRecordMapper.toChangeStreamRecords(
partition.getPartitionToken(), resultSet.getCurrentRowAsStruct());
token, resultSet.getCurrentRowAsStruct());
LOG.debug("Mapped records: " + records);

Optional<ProcessContinuation> maybeContinuation;
for (ChangeStreamRecord record : records) {
if (record instanceof DataChangeRecord) {
maybeContinuation =
dataChangeRecordAction.run(
(DataChangeRecord) record, tracker, receiver, watermarkEstimator);
partition, (DataChangeRecord) record, tracker, receiver, watermarkEstimator);
} else if (record instanceof HeartbeatRecord) {
maybeContinuation =
heartbeatRecordAction.run((HeartbeatRecord) record, tracker, watermarkEstimator);
heartbeatRecordAction.run(
partition, (HeartbeatRecord) record, tracker, watermarkEstimator);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
(ChildPartitionsRecord) record, partition, tracker, watermarkEstimator);
partition, (ChildPartitionsRecord) record, tracker, watermarkEstimator);
} else {
LOG.error("[" + token + "] Unknown record type " + record.getClass());
// FIXME: Check what should we do if the record is unknown
throw new IllegalArgumentException("Unknown record type " + record.getClass());
}
if (maybeContinuation.isPresent()) {
LOG.info("[" + token + "] Continuation present, returning " + maybeContinuation);
return maybeContinuation.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,24 @@ public ChildPartitionsRecordAction(PartitionMetadataDao partitionMetadataDao) {
}

public Optional<ProcessContinuation> run(
ChildPartitionsRecord record,
PartitionMetadata partition,
ChildPartitionsRecord record,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
LOG.info("Processing child partition record " + record);
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Processing child partition record " + record);

final Timestamp startTimestamp = record.getStartTimestamp();
if (!tracker.tryClaim(PartitionPosition.queryChangeStream(startTimestamp))) {
LOG.info("Could not claim, stopping");
LOG.info(
"[" + token + "] Could not claim queryChangeStream(" + startTimestamp + "), stopping");
return Optional.of(ProcessContinuation.stop());
}
watermarkEstimator.setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));

for (ChildPartition childPartition : record.getChildPartitions()) {
if (isSplit(childPartition)) {
LOG.info("Processing child partition split event");
LOG.info("[" + token + "] Processing child partition split event");
PARTITION_SPLIT_COUNTER.inc();

final PartitionMetadata row =
Expand All @@ -78,7 +80,7 @@ public Optional<ProcessContinuation> run(
// TODO: Make sure this does not fail if the rows already exist
partitionMetadataDao.insert(row);
} else {
LOG.info("Processing child partition merge event");
LOG.info("[" + token + "] Processing child partition merge event");
PARTITION_MERGE_COUNTER.inc();

partitionMetadataDao.runInTransaction(
Expand All @@ -88,7 +90,11 @@ public Optional<ProcessContinuation> run(
childPartition.getParentTokens(), Collections.singletonList(FINISHED));

if (finishedParents == childPartition.getParentTokens().size() - 1) {
LOG.info("All parents are finished, inserting child partition " + childPartition);
LOG.info(
"["
+ token
+ "] All parents are finished, inserting child partition "
+ childPartition);
transaction.insert(
toPartitionMetadata(
record.getStartTimestamp(),
Expand All @@ -97,7 +103,9 @@ public Optional<ProcessContinuation> run(
childPartition));
} else {
LOG.info(
"At least one parent is not finished ("
"["
+ token
+ "] At least one parent is not finished ("
+ "finishedParents = "
+ finishedParents
+ ", "
Expand All @@ -111,7 +119,7 @@ public Optional<ProcessContinuation> run(
}
}

LOG.info("Child partitions action completed successfully");
LOG.info("[" + token + "] Child partitions action completed successfully");
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.Timestamp;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.cdc.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionPosition;
import org.apache.beam.sdk.io.gcp.spanner.cdc.restriction.PartitionRestriction;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand All @@ -35,22 +36,25 @@ public class DataChangeRecordAction {
private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class);

public Optional<ProcessContinuation> run(
PartitionMetadata partition,
DataChangeRecord record,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker,
OutputReceiver<DataChangeRecord> outputReceiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
LOG.info("Processing data record for " + record.getPartitionToken());
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Processing data record " + record.getCommitTimestamp());

final Timestamp commitTimestamp = record.getCommitTimestamp();
if (!tracker.tryClaim(PartitionPosition.queryChangeStream(commitTimestamp))) {
LOG.info("Could not claim, stopping");
LOG.info(
"[" + token + "] Could not claim queryChangeStream(" + commitTimestamp + "), stopping");
return Optional.of(ProcessContinuation.stop());
}
// TODO: Ask about this, do we need to output with timestamp?
outputReceiver.output(record);
watermarkEstimator.setWatermark(new Instant(commitTimestamp.toSqlTimestamp().getTime()));

LOG.info("Data record action completed successfully");
LOG.info("[" + token + "] Data record action completed successfully");
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ public DeletePartitionAction(PartitionMetadataDao partitionMetadataDao) {
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker) {
LOG.info("Deleting partition " + partition.getPartitionToken());
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Deleting partition");

if (!tracker.tryClaim(PartitionPosition.deletePartition())) {
LOG.info("Could not claim, stopping");
LOG.info("[" + token + "] Could not claim deletePartition(), stopping");
return Optional.of(ProcessContinuation.stop());
}
partitionMetadataDao.delete(partition.getPartitionToken());
partitionMetadataDao.delete(token);

LOG.info("Delete partition action completed successfully");
LOG.info("[" + token + "] Delete partition action completed successfully");
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ public class DonePartitionAction {
public ProcessContinuation run(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker) {
LOG.info("Marking partition " + partition.getPartitionToken() + " as done");
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Marking partition as done");

if (!tracker.tryClaim(PartitionPosition.done())) {
LOG.info("Could not claim, stopping");
LOG.info("[" + token + "] Could not claim done(), stopping");
return ProcessContinuation.stop();
}

LOG.info("Done partition action completed successfully");
LOG.info("[" + token + "] Done partition action completed successfully");
return ProcessContinuation.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ public FinishPartitionAction(PartitionMetadataDao partitionMetadataDao) {
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
RestrictionTracker<PartitionRestriction, PartitionPosition> tracker) {
LOG.info("Finishing partition " + partition.getPartitionToken());
final String token = partition.getPartitionToken();
LOG.info("[" + token + "] Finishing partition");

if (!tracker.tryClaim(PartitionPosition.finishPartition())) {
LOG.info("Could not claim, stopping");
LOG.info("[" + token + "] Could not claim finishPartition(), stopping");
return Optional.of(ProcessContinuation.stop());
}

partitionMetadataDao.updateState(partition.getPartitionToken(), FINISHED);
partitionMetadataDao.updateState(token, FINISHED);

LOG.info("Finish partition action completed successfully");
LOG.info("[" + token + "] Finish partition action completed successfully");
return Optional.empty();
}
}
Loading

0 comments on commit b9fbc97

Please sign in to comment.