Skip to content

Commit

Permalink
Improve autoscaler throughput estimates and account for heartbeats
Browse files Browse the repository at this point in the history
The throughput estimates are improved by storing the estimates per partition (in StreamProgress) rather than per dofn instance. This required some refactoring of ThroughputEstimator/SizeEstimator. It now estimates throughput (per second) of the last ReadChangeStream request.
  • Loading branch information
jackdingilian committed Jun 8, 2023
1 parent 95ecb31 commit e896b63
Show file tree
Hide file tree
Showing 19 changed files with 623 additions and 420 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.bigtable.v2.RowFilter;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
import java.io.IOException;
Expand All @@ -50,10 +51,10 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.FilterForMutationDoFn;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.InitializeDoFn;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
Expand Down Expand Up @@ -2068,22 +2069,20 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics);

PCollection<KV<ByteString, ChangeStreamMutation>> output =
PCollection<KV<ByteString, ChangeStreamRecord>> readChangeStreamOutput =
input
.apply(Impulse.create())
.apply("Initialize", ParDo.of(initializeDoFn))
.apply("DetectNewPartition", ParDo.of(detectNewPartitionsDoFn))
.apply("ReadChangeStreamPartition", ParDo.of(readChangeStreamPartitionDoFn));

Coder<KV<ByteString, ChangeStreamMutation>> outputCoder = output.getCoder();
SizeEstimator<KV<ByteString, ChangeStreamMutation>> sizeEstimator =
new SizeEstimator<>(outputCoder);
BytesThroughputEstimator<KV<ByteString, ChangeStreamMutation>> throughputEstimator =
new BytesThroughputEstimator<>(
ReadChangeStreamPartitionDoFn.THROUGHPUT_ESTIMATION_WINDOW_SECONDS, sizeEstimator);
readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator);
Coder<KV<ByteString, ChangeStreamRecord>> outputCoder = readChangeStreamOutput.getCoder();
CoderSizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator =
new CoderSizeEstimator<>(outputCoder);
readChangeStreamPartitionDoFn.setSizeEstimator(sizeEstimator);

return output;
return readChangeStreamOutput.apply(
"FilterForMutation", ParDo.of(new FilterForMutationDoFn()));
}

@AutoValue.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action;

import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -56,11 +56,9 @@ public class ActionFactory implements Serializable {
*
* @return singleton instance of the {@link ChangeStreamAction}
*/
public synchronized ChangeStreamAction changeStreamAction(
ChangeStreamMetrics metrics,
ThroughputEstimator<KV<ByteString, ChangeStreamMutation>> throughputEstimator) {
public synchronized ChangeStreamAction changeStreamAction(ChangeStreamMetrics metrics) {
if (changeStreamAction == null) {
changeStreamAction = new ChangeStreamAction(metrics, throughputEstimator);
changeStreamAction = new ChangeStreamAction(metrics);
}
return changeStreamAction;
}
Expand Down Expand Up @@ -145,11 +143,17 @@ public synchronized ReadChangeStreamPartitionAction readChangeStreamPartitionAct
ChangeStreamDao changeStreamDao,
ChangeStreamMetrics metrics,
ChangeStreamAction changeStreamAction,
Duration heartbeatDuration) {
Duration heartbeatDuration,
SizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator) {
if (readChangeStreamPartitionAction == null) {
readChangeStreamPartitionAction =
new ReadChangeStreamPartitionAction(
metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration);
metadataTableDao,
changeStreamDao,
metrics,
changeStreamAction,
heartbeatDuration,
sizeEstimator);
}
return readChangeStreamPartitionAction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -46,20 +46,15 @@
@Internal
public class ChangeStreamAction {
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamAction.class);

private final ChangeStreamMetrics metrics;
private final ThroughputEstimator<KV<ByteString, ChangeStreamMutation>> throughputEstimator;

/**
* Constructs ChangeStreamAction to process individual ChangeStreamRecord.
*
* @param metrics record beam metrics.
*/
public ChangeStreamAction(
ChangeStreamMetrics metrics,
ThroughputEstimator<KV<ByteString, ChangeStreamMutation>> throughputEstimator) {
public ChangeStreamAction(ChangeStreamMetrics metrics) {
this.metrics = metrics;
this.throughputEstimator = throughputEstimator;
}

/**
Expand Down Expand Up @@ -111,14 +106,27 @@ public Optional<DoFn.ProcessContinuation> run(
PartitionRecord partitionRecord,
ChangeStreamRecord record,
RestrictionTracker<StreamProgress, StreamProgress> tracker,
DoFn.OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
DoFn.OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator,
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator,
boolean shouldDebug) {
if (record instanceof Heartbeat) {
Heartbeat heartbeat = (Heartbeat) record;
final Instant watermark = toJodaTime(heartbeat.getEstimatedLowWatermark());

// These will be filtered so the key doesn't really matter but the most logical thing to
// key a heartbeat by is the partition it corresponds to.
ByteString heartbeatKey =
Range.ByteStringRange.serializeToByteString(partitionRecord.getPartition());
KV<ByteString, ChangeStreamRecord> outputRecord = KV.of(heartbeatKey, heartbeat);
throughputEstimator.update(Instant.now(), outputRecord);
StreamProgress streamProgress =
new StreamProgress(heartbeat.getChangeStreamContinuationToken(), watermark);
new StreamProgress(
heartbeat.getChangeStreamContinuationToken(),
watermark,
throughputEstimator.get(),
Instant.now(),
true);
watermarkEstimator.setWatermark(watermark);

if (shouldDebug) {
Expand All @@ -142,6 +150,15 @@ public Optional<DoFn.ProcessContinuation> run(
return Optional.of(DoFn.ProcessContinuation.stop());
}
metrics.incHeartbeatCount();
// We output heartbeats so that they are factored into throughput and can be used to
// autoscale. These will be filtered in a downstream step and never returned to users. This is
// to prevent autoscaler from scaling down when we have large tables with no throughput but
// we need enough workers to keep up with heartbeats.

// We are outputting elements with timestamp of 0 to prevent reliance on event time. This
// limits the ability to window on commit time of any data changes. It is still possible to
// window on processing time.
receiver.outputWithTimestamp(outputRecord, Instant.EPOCH);
} else if (record instanceof CloseStream) {
CloseStream closeStream = (CloseStream) record;
StreamProgress streamProgress = new StreamProgress(closeStream);
Expand Down Expand Up @@ -185,7 +202,17 @@ public Optional<DoFn.ProcessContinuation> run(
partitionRecord.getPartition().getStart(),
partitionRecord.getPartition().getEnd()),
changeStreamMutation.getToken());
StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, watermark);

KV<ByteString, ChangeStreamRecord> outputRecord =
KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
throughputEstimator.update(Instant.now(), outputRecord);
StreamProgress streamProgress =
new StreamProgress(
changeStreamContinuationToken,
watermark,
throughputEstimator.get(),
Instant.now(),
false);
// If the tracker fail to claim the streamProgress, it most likely means the runner initiated
// a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding
// runner initiated checkpoints.
Expand All @@ -206,9 +233,6 @@ public Optional<DoFn.ProcessContinuation> run(
metrics.updateProcessingDelayFromCommitTimestamp(
Instant.now().getMillis() - delay.getMillis());

KV<ByteString, ChangeStreamMutation> outputRecord =
KV.of(changeStreamMutation.getRowKey(), changeStreamMutation);
throughputEstimator.update(Instant.now(), outputRecord);
// We are outputting elements with timestamp of 0 to prevent reliance on event time. This
// limits the ability to window on commit time of any data changes. It is still possible to
// window on processing time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.common.Status;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.CloseStream;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
Expand All @@ -40,6 +39,8 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
Expand All @@ -66,18 +67,21 @@ public class ReadChangeStreamPartitionAction {
private final ChangeStreamMetrics metrics;
private final ChangeStreamAction changeStreamAction;
private final Duration heartbeatDuration;
private final SizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator;

public ReadChangeStreamPartitionAction(
MetadataTableDao metadataTableDao,
ChangeStreamDao changeStreamDao,
ChangeStreamMetrics metrics,
ChangeStreamAction changeStreamAction,
Duration heartbeatDuration) {
Duration heartbeatDuration,
SizeEstimator<KV<ByteString, ChangeStreamRecord>> sizeEstimator) {
this.metadataTableDao = metadataTableDao;
this.changeStreamDao = changeStreamDao;
this.metrics = metrics;
this.changeStreamAction = changeStreamAction;
this.heartbeatDuration = heartbeatDuration;
this.sizeEstimator = sizeEstimator;
}

/**
Expand Down Expand Up @@ -131,12 +135,14 @@ public ReadChangeStreamPartitionAction(
public ProcessContinuation run(
PartitionRecord partitionRecord,
RestrictionTracker<StreamProgress, StreamProgress> tracker,
OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver,
OutputReceiver<KV<ByteString, ChangeStreamRecord>> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator)
throws IOException {
// Watermark being delayed beyond 5 minutes signals a possible problem.
boolean shouldDebug =
watermarkEstimator.getState().plus(Duration.standardMinutes(5)).isBeforeNow();
BytesThroughputEstimator<KV<ByteString, ChangeStreamRecord>> throughputEstimator =
new BytesThroughputEstimator<>(sizeEstimator, Instant.now());

if (shouldDebug) {
LOG.info(
Expand Down Expand Up @@ -298,7 +304,13 @@ public ProcessContinuation run(
for (ChangeStreamRecord record : stream) {
Optional<ProcessContinuation> result =
changeStreamAction.run(
partitionRecord, record, tracker, receiver, watermarkEstimator, shouldDebug);
partitionRecord,
record,
tracker,
receiver,
watermarkEstimator,
throughputEstimator,
shouldDebug);
// changeStreamAction will usually return Optional.empty() except for when a checkpoint
// (either runner or pipeline initiated) is required.
if (result.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn;

import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

public class FilterForMutationDoFn
extends DoFn<KV<ByteString, ChangeStreamRecord>, KV<ByteString, ChangeStreamMutation>>
implements Serializable {

@ProcessElement
public void processElement(
@Element KV<ByteString, ChangeStreamRecord> changeStreamRecordKV,
OutputReceiver<KV<ByteString, ChangeStreamMutation>> receiver) {
ChangeStreamRecord inputRecord = changeStreamRecordKV.getValue();
if (inputRecord instanceof ChangeStreamMutation) {
receiver.output(KV.of(changeStreamRecordKV.getKey(), (ChangeStreamMutation) inputRecord));
}
}
}
Loading

0 comments on commit e896b63

Please sign in to comment.