From e896b63dfd974c7e65050bbd3f19584ab716fa77 Mon Sep 17 00:00:00 2001 From: Jack Dingilian Date: Fri, 12 May 2023 15:28:45 -0400 Subject: [PATCH] Improve autoscaler throughput estimates and account for heartbeats The throughput estimates are improved by storing the estimates per partition (in StreamProgress) rather than per dofn instance. This required some refactoring of ThroughputEstimator/SizeEstimator. It now estimates throughput (per second) of the last ReadChangeStream request. --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 21 ++- .../changestreams/action/ActionFactory.java | 20 ++- .../action/ChangeStreamAction.java | 50 ++++-- .../ReadChangeStreamPartitionAction.java | 20 ++- .../dofn/FilterForMutationDoFn.java | 40 +++++ .../dofn/ReadChangeStreamPartitionDoFn.java | 79 ++++++--- .../estimator/BytesThroughputEstimator.java | 152 +++++------------- .../estimator/CoderSizeEstimator.java | 71 ++++++++ ...tEstimator.java => NullSizeEstimator.java} | 32 +--- .../estimator/SizeEstimator.java | 52 +----- .../estimator/ThroughputEstimator.java | 46 ------ .../restriction/StreamProgress.java | 36 ++++- .../action/ChangeStreamActionTest.java | 127 ++++++++++++--- .../ReadChangeStreamPartitionActionTest.java | 39 +++-- .../dofn/FilterForMutationDoFnTest.java | 71 ++++++++ .../ReadChangeStreamPartitionDoFnTest.java | 71 ++++++-- .../BytesThroughputEstimatorTest.java | 98 ++++------- ...orTest.java => NullSizeEstimatorTest.java} | 14 +- ...ngeStreamPartitionProgressTrackerTest.java | 4 +- 19 files changed, 623 insertions(+), 420 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java rename sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/{NullThroughputEstimator.java => NullSizeEstimator.java} (59%) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/{NullThroughputEstimatorTest.java => NullSizeEstimatorTest.java} (68%) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index fc8b5e78e8a05..319c31390217a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -29,6 +29,7 @@ import com.google.bigtable.v2.RowFilter; import com.google.cloud.bigtable.config.BigtableOptions; import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.protobuf.ByteString; import java.io.IOException; @@ -50,10 +51,10 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.FilterForMutationDoFn; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.InitializeDoFn; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; @@ -2068,22 +2069,20 @@ public PCollection> expand(PBegin input) { ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics); - PCollection> output = + PCollection> readChangeStreamOutput = input .apply(Impulse.create()) .apply("Initialize", ParDo.of(initializeDoFn)) .apply("DetectNewPartition", ParDo.of(detectNewPartitionsDoFn)) .apply("ReadChangeStreamPartition", ParDo.of(readChangeStreamPartitionDoFn)); - Coder> outputCoder = output.getCoder(); - SizeEstimator> sizeEstimator = - new SizeEstimator<>(outputCoder); - BytesThroughputEstimator> throughputEstimator = - new BytesThroughputEstimator<>( - ReadChangeStreamPartitionDoFn.THROUGHPUT_ESTIMATION_WINDOW_SECONDS, sizeEstimator); - readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator); + Coder> outputCoder = readChangeStreamOutput.getCoder(); + CoderSizeEstimator> sizeEstimator = + new CoderSizeEstimator<>(outputCoder); + readChangeStreamPartitionDoFn.setSizeEstimator(sizeEstimator); - return output; + return readChangeStreamOutput.apply( + "FilterForMutation", ParDo.of(new FilterForMutationDoFn())); } @AutoValue.Builder diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java index 43623b5de160c..6aabcb081ad4e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.protobuf.ByteString; import java.io.Serializable; import javax.annotation.Nullable; @@ -25,7 +25,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; import org.apache.beam.sdk.values.KV; import org.joda.time.Duration; import org.joda.time.Instant; @@ -56,11 +56,9 @@ public class ActionFactory implements Serializable { * * @return singleton instance of the {@link ChangeStreamAction} */ - public synchronized ChangeStreamAction changeStreamAction( - ChangeStreamMetrics metrics, - ThroughputEstimator> throughputEstimator) { + public synchronized ChangeStreamAction changeStreamAction(ChangeStreamMetrics metrics) { if (changeStreamAction == null) { - changeStreamAction = new ChangeStreamAction(metrics, throughputEstimator); + changeStreamAction = new ChangeStreamAction(metrics); } return changeStreamAction; } @@ -145,11 +143,17 @@ public synchronized ReadChangeStreamPartitionAction readChangeStreamPartitionAct ChangeStreamDao changeStreamDao, ChangeStreamMetrics metrics, ChangeStreamAction changeStreamAction, - Duration heartbeatDuration) { + Duration heartbeatDuration, + SizeEstimator> sizeEstimator) { if (readChangeStreamPartitionAction == null) { readChangeStreamPartitionAction = new ReadChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); } return readChangeStreamPartitionAction; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java index 4c3f9c5a83353..3ab3b1973b973 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java @@ -31,7 +31,7 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; import org.apache.beam.sdk.transforms.DoFn; @@ -46,20 +46,15 @@ @Internal public class ChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamAction.class); - private final ChangeStreamMetrics metrics; - private final ThroughputEstimator> throughputEstimator; /** * Constructs ChangeStreamAction to process individual ChangeStreamRecord. * * @param metrics record beam metrics. */ - public ChangeStreamAction( - ChangeStreamMetrics metrics, - ThroughputEstimator> throughputEstimator) { + public ChangeStreamAction(ChangeStreamMetrics metrics) { this.metrics = metrics; - this.throughputEstimator = throughputEstimator; } /** @@ -111,14 +106,27 @@ public Optional run( PartitionRecord partitionRecord, ChangeStreamRecord record, RestrictionTracker tracker, - DoFn.OutputReceiver> receiver, + DoFn.OutputReceiver> receiver, ManualWatermarkEstimator watermarkEstimator, + BytesThroughputEstimator> throughputEstimator, boolean shouldDebug) { if (record instanceof Heartbeat) { Heartbeat heartbeat = (Heartbeat) record; final Instant watermark = toJodaTime(heartbeat.getEstimatedLowWatermark()); + + // These will be filtered so the key doesn't really matter but the most logical thing to + // key a heartbeat by is the partition it corresponds to. + ByteString heartbeatKey = + Range.ByteStringRange.serializeToByteString(partitionRecord.getPartition()); + KV outputRecord = KV.of(heartbeatKey, heartbeat); + throughputEstimator.update(Instant.now(), outputRecord); StreamProgress streamProgress = - new StreamProgress(heartbeat.getChangeStreamContinuationToken(), watermark); + new StreamProgress( + heartbeat.getChangeStreamContinuationToken(), + watermark, + throughputEstimator.get(), + Instant.now(), + true); watermarkEstimator.setWatermark(watermark); if (shouldDebug) { @@ -142,6 +150,15 @@ public Optional run( return Optional.of(DoFn.ProcessContinuation.stop()); } metrics.incHeartbeatCount(); + // We output heartbeats so that they are factored into throughput and can be used to + // autoscale. These will be filtered in a downstream step and never returned to users. This is + // to prevent autoscaler from scaling down when we have large tables with no throughput but + // we need enough workers to keep up with heartbeats. + + // We are outputting elements with timestamp of 0 to prevent reliance on event time. This + // limits the ability to window on commit time of any data changes. It is still possible to + // window on processing time. + receiver.outputWithTimestamp(outputRecord, Instant.EPOCH); } else if (record instanceof CloseStream) { CloseStream closeStream = (CloseStream) record; StreamProgress streamProgress = new StreamProgress(closeStream); @@ -185,7 +202,17 @@ public Optional run( partitionRecord.getPartition().getStart(), partitionRecord.getPartition().getEnd()), changeStreamMutation.getToken()); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, watermark); + + KV outputRecord = + KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); + throughputEstimator.update(Instant.now(), outputRecord); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + watermark, + throughputEstimator.get(), + Instant.now(), + false); // If the tracker fail to claim the streamProgress, it most likely means the runner initiated // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding // runner initiated checkpoints. @@ -206,9 +233,6 @@ public Optional run( metrics.updateProcessingDelayFromCommitTimestamp( Instant.now().getMillis() - delay.getMillis()); - KV outputRecord = - KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); - throughputEstimator.update(Instant.now(), outputRecord); // We are outputting elements with timestamp of 0 to prevent reliance on event time. This // limits the ability to window on commit time of any data changes. It is still possible to // window on processing time. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java index 2270cbd08ac13..87273c9894d91 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java @@ -25,7 +25,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.bigtable.common.Status; import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; @@ -40,6 +39,8 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -66,18 +67,21 @@ public class ReadChangeStreamPartitionAction { private final ChangeStreamMetrics metrics; private final ChangeStreamAction changeStreamAction; private final Duration heartbeatDuration; + private final SizeEstimator> sizeEstimator; public ReadChangeStreamPartitionAction( MetadataTableDao metadataTableDao, ChangeStreamDao changeStreamDao, ChangeStreamMetrics metrics, ChangeStreamAction changeStreamAction, - Duration heartbeatDuration) { + Duration heartbeatDuration, + SizeEstimator> sizeEstimator) { this.metadataTableDao = metadataTableDao; this.changeStreamDao = changeStreamDao; this.metrics = metrics; this.changeStreamAction = changeStreamAction; this.heartbeatDuration = heartbeatDuration; + this.sizeEstimator = sizeEstimator; } /** @@ -131,12 +135,14 @@ public ReadChangeStreamPartitionAction( public ProcessContinuation run( PartitionRecord partitionRecord, RestrictionTracker tracker, - OutputReceiver> receiver, + OutputReceiver> receiver, ManualWatermarkEstimator watermarkEstimator) throws IOException { // Watermark being delayed beyond 5 minutes signals a possible problem. boolean shouldDebug = watermarkEstimator.getState().plus(Duration.standardMinutes(5)).isBeforeNow(); + BytesThroughputEstimator> throughputEstimator = + new BytesThroughputEstimator<>(sizeEstimator, Instant.now()); if (shouldDebug) { LOG.info( @@ -298,7 +304,13 @@ public ProcessContinuation run( for (ChangeStreamRecord record : stream) { Optional result = changeStreamAction.run( - partitionRecord, record, tracker, receiver, watermarkEstimator, shouldDebug); + partitionRecord, + record, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + shouldDebug); // changeStreamAction will usually return Optional.empty() except for when a checkpoint // (either runner or pipeline initiated) is required. if (result.isPresent()) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java new file mode 100644 index 0000000000000..968e2ecddaa65 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; +import com.google.protobuf.ByteString; +import java.io.Serializable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +public class FilterForMutationDoFn + extends DoFn, KV> + implements Serializable { + + @ProcessElement + public void processElement( + @Element KV changeStreamRecordKV, + OutputReceiver> receiver) { + ChangeStreamRecord inputRecord = changeStreamRecordKV.getValue(); + if (inputRecord instanceof ChangeStreamMutation) { + receiver.output(KV.of(changeStreamRecordKV.getKey(), (ChangeStreamMutation) inputRecord)); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index 9fcf9e12968f5..91210caa5972d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.protobuf.ByteString; import java.io.IOException; import java.math.BigDecimal; +import java.math.RoundingMode; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction; @@ -29,9 +31,9 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.NullThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.NullSizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -41,17 +43,18 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // Allows for readChangeStreamPartitionAction setup -@SuppressWarnings("initialization.fields.uninitialized") +@SuppressWarnings({"initialization.fields.uninitialized", "dereference.of.nullable"}) @Internal @UnboundedPerElement public class ReadChangeStreamPartitionDoFn - extends DoFn> { + extends DoFn> { private static final long serialVersionUID = 4418739381635104479L; private static final BigDecimal MAX_DOUBLE = BigDecimal.valueOf(Double.MAX_VALUE); private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class); @@ -61,8 +64,7 @@ public class ReadChangeStreamPartitionDoFn private final DaoFactory daoFactory; private final ChangeStreamMetrics metrics; private final ActionFactory actionFactory; - private ThroughputEstimator> throughputEstimator; - + private SizeEstimator> sizeEstimator; private ReadChangeStreamPartitionAction readChangeStreamPartitionAction; public ReadChangeStreamPartitionDoFn( @@ -74,7 +76,7 @@ public ReadChangeStreamPartitionDoFn( this.daoFactory = daoFactory; this.metrics = metrics; this.actionFactory = actionFactory; - this.throughputEstimator = new NullThroughputEstimator<>(); + this.sizeEstimator = new NullSizeEstimator<>(); } @GetInitialWatermarkEstimatorState @@ -106,30 +108,55 @@ public double getSize(@Restriction StreamProgress streamProgress) { return 0d; } Instant lowWatermark = streamProgress.getEstimatedLowWatermark(); + BigDecimal estimatedThroughput = streamProgress.getThroughputEstimate(); + Instant lastRunTimestamp = streamProgress.getLastRunTimestamp(); // This should only be null if: // 1) We've failed to lock for the partition in which case we expect 0 throughput // 2) We've received a CloseStream in which case we won't process any more data for // this partition // 3) RCSP has just started and hasn't completed a checkpoint yet, in which case we can't // estimate throughput yet - if (lowWatermark == null) { + if (lowWatermark == null || estimatedThroughput == null || lastRunTimestamp == null) { return 0; } + String partition = ""; + if (streamProgress.getCurrentToken() != null) { + partition = + ByteStringRangeHelper.formatByteStringRange( + Preconditions.checkNotNull(streamProgress.getCurrentToken()).getPartition()); + } + + // Heartbeat lowWatermark takes up to a minute to update on the server. We don't want + // this to count against the backlog and prevent scaling down, so we estimate heartbeat backlog + // using the time we most recently processed a heartbeat. Otherwise, (for mutations) we use the + // watermark. + Duration processingTimeLag = + Duration.millis( + Instant.now().getMillis() - streamProgress.getLastRunTimestamp().getMillis()); Duration watermarkLag = Duration.millis(Instant.now().getMillis() - lowWatermark.getMillis()); - BigDecimal estimatedThroughput = BigDecimal.valueOf(throughputEstimator.get()); + long lagInMillis = + (streamProgress.isHeartbeat() ? processingTimeLag : watermarkLag).getMillis(); // Return the estimated bytes per second throughput multiplied by the amount of known work // outstanding (watermark lag). Cap at max double to avoid overflow. - final double estimatedSize = + double estimatedSize = estimatedThroughput - .multiply(BigDecimal.valueOf(watermarkLag.getStandardSeconds())) + .multiply(BigDecimal.valueOf(lagInMillis)) + .divide(BigDecimal.valueOf(1000), 3, RoundingMode.DOWN) .min(MAX_DOUBLE) + // Lag can be negative from clock skew. We treat that as caught up, so + // it should return zero. + .max(BigDecimal.ZERO) .doubleValue(); + LOG.debug( - "Estimated size: throughputBytes: {} x watermarkLag {} = {}", + "Estimated size (per second): partition: {}, isHeartbeat: {}, throughputBytes: {} x watermarkLagMillis {} = {}, lastRun = {}", + partition, + streamProgress.isHeartbeat(), estimatedThroughput, - watermarkLag, - estimatedSize); + lagInMillis, + estimatedSize, + streamProgress.getLastRunTimestamp()); return estimatedSize; } @@ -137,18 +164,22 @@ public double getSize(@Restriction StreamProgress streamProgress) { public void setup() throws IOException { MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao(); ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao(); - ChangeStreamAction changeStreamAction = - actionFactory.changeStreamAction(this.metrics, this.throughputEstimator); + ChangeStreamAction changeStreamAction = actionFactory.changeStreamAction(this.metrics); readChangeStreamPartitionAction = actionFactory.readChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); } @ProcessElement public ProcessContinuation processElement( @Element PartitionRecord partitionRecord, RestrictionTracker tracker, - OutputReceiver> receiver, + OutputReceiver> receiver, ManualWatermarkEstimator watermarkEstimator) throws InterruptedException, IOException { return readChangeStreamPartitionAction.run( @@ -158,10 +189,10 @@ public ProcessContinuation processElement( /** * Sets the estimator to track throughput for each DoFn instance. * - * @param throughputEstimator an estimator to calculate DoFn instance level throughput + * @param sizeEstimator an estimator to calculate the size of records for throughput estimates */ - public void setThroughputEstimator( - BytesThroughputEstimator> throughputEstimator) { - this.throughputEstimator = throughputEstimator; + public void setSizeEstimator( + CoderSizeEstimator> sizeEstimator) { + this.sizeEstimator = sizeEstimator; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java index d9fa19e87e1cb..ec7a71ba5d3ee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java @@ -17,15 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; -import java.io.Serializable; import java.math.BigDecimal; -import java.math.MathContext; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Random; +import java.math.RoundingMode; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -42,64 +39,36 @@ * every element */ @Internal -public class BytesThroughputEstimator implements ThroughputEstimator { +public class BytesThroughputEstimator { - private static final long serialVersionUID = -1147130541208370666L; - private static final BigDecimal MAX_DOUBLE = BigDecimal.valueOf(Double.MAX_VALUE); + private static final long serialVersionUID = 6014227751984587954L; private static final int DEFAULT_SAMPLE_RATE = 50; - - /** Keeps track of how many bytes of throughput have been seen in a given timestamp. */ - private static class ThroughputEntry implements Serializable { - - private static final long serialVersionUID = 3752325891215855332L; - - private final Instant instant; - private BigDecimal bytes; - - public ThroughputEntry(Instant instant, long bytes) { - this.instant = instant; - this.bytes = BigDecimal.valueOf(bytes); - } - - public Instant getTimestamp() { - return instant; - } - - public long getSeconds() { - return TimestampConverter.toSeconds(instant); - } - - public BigDecimal getBytes() { - return bytes; - } - - public void addBytes(long bytesToAdd) { - bytes = bytes.add(BigDecimal.valueOf(bytesToAdd)); - } - } - - // The deque holds a number of windows in the past in order to calculate - // a rolling windowing throughput. - private final Deque deque; - // The number of seconds to be accounted for when calculating the throughput - private final int windowSizeSeconds; - // Estimates the size in bytes of throughput elements private final SizeEstimator sizeEstimator; private final int sampleRate; - private final Random random; + private long elementCount; + private BigDecimal currentElementSizeEstimate; + private final Instant startTimestamp; + private Instant lastElementTimestamp; + private BigDecimal totalThroughputEstimate; - public BytesThroughputEstimator(int windowSizeSeconds, SizeEstimator sizeEstimator) { - this(windowSizeSeconds, sizeEstimator, DEFAULT_SAMPLE_RATE); + public BytesThroughputEstimator( + SizeEstimator sizeEstimator, @Nullable Instant lastRunTimestamp) { + this( + sizeEstimator, + DEFAULT_SAMPLE_RATE, + lastRunTimestamp != null ? lastRunTimestamp : Instant.now()); } @VisibleForTesting public BytesThroughputEstimator( - int windowSizeSeconds, SizeEstimator sizeEstimator, int sampleRate) { - this.deque = new ArrayDeque<>(); - this.windowSizeSeconds = windowSizeSeconds; + SizeEstimator sizeEstimator, int sampleRate, Instant startTimestamp) { this.sizeEstimator = sizeEstimator; this.sampleRate = sampleRate; - this.random = new Random(); + this.startTimestamp = startTimestamp; + this.elementCount = 0; + this.currentElementSizeEstimate = BigDecimal.ZERO; + lastElementTimestamp = this.startTimestamp; + totalThroughputEstimate = BigDecimal.ZERO; } /** @@ -108,66 +77,31 @@ public BytesThroughputEstimator( * @param timeOfRecords the committed timestamp of the records * @param element the element to estimate the byte size of */ - @SuppressWarnings("nullness") // queue is never null, nor the peeked element - @Override public void update(Instant timeOfRecords, T element) { - if (random.nextInt(sampleRate) == 0) { - long bytes = sizeEstimator.sizeOf(element); - synchronized (deque) { - if (deque.isEmpty() - || TimestampConverter.toSeconds(timeOfRecords) > deque.getLast().getSeconds()) { - deque.addLast(new ThroughputEntry(timeOfRecords, bytes)); - } else { - deque.getLast().addBytes(bytes); - } - cleanQueue(deque.getLast().getTimestamp()); - } - } - } - - /** Returns the estimated throughput bytes for now. */ - @Override - public double get() { - return getFrom(Instant.now()); - } - - /** - * Returns the estimated throughput bytes for a specified time. - * - * @param time the specified timestamp to check throughput - */ - @Override - public double getFrom(Instant time) { - synchronized (deque) { - cleanQueue(time); - if (deque.size() == 0) { - return 0D; - } - BigDecimal throughput = BigDecimal.ZERO; - for (ThroughputEntry entry : deque) { - throughput = throughput.add(entry.getBytes()); - } - return throughput - // Prevents negative values - .max(BigDecimal.ZERO) - .divide(BigDecimal.valueOf(windowSizeSeconds), MathContext.DECIMAL128) - .multiply(BigDecimal.valueOf(sampleRate)) - // Cap it to Double.MAX_VALUE - .min(MAX_DOUBLE) - .doubleValue(); + // Always updates on first element re-estimates size based on sample rate. + // This is expensive so we avoid doing it too often. + if (elementCount % sampleRate == 0) { + currentElementSizeEstimate = BigDecimal.valueOf(sizeEstimator.sizeOf(element)); } + lastElementTimestamp = timeOfRecords; + elementCount += 1; + totalThroughputEstimate = totalThroughputEstimate.add(currentElementSizeEstimate); } - private void cleanQueue(Instant time) { - while (deque.size() > 0) { - final ThroughputEntry entry = deque.getFirst(); - if (entry != null - && entry.getSeconds() >= TimestampConverter.toSeconds(time) - windowSizeSeconds) { - break; - } - // Remove the element if the timestamp of the first element is beyond - // the time range to look backward. - deque.removeFirst(); + /** Returns the estimated throughput bytes for this run. */ + public BigDecimal get() { + if (elementCount == 0) { + return BigDecimal.ZERO; + } else { + BigDecimal processingTimeMillis = + BigDecimal.valueOf(new Duration(startTimestamp, lastElementTimestamp).getMillis()) + // Avoid divide by zero by rounding up to 1 ms when the difference is less + // than a full millisecond + .max(BigDecimal.ONE); + + return totalThroughputEstimate + .divide(processingTimeMillis, 3, RoundingMode.DOWN) + .multiply(BigDecimal.valueOf(1000)); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java new file mode 100644 index 0000000000000..528bbd7baf88b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * This class is used to estimate the size in bytes of a given element. It uses the given {@link + * Coder} to calculate the size of the element. + */ +@Internal +public class CoderSizeEstimator implements SizeEstimator, Serializable { + + private static final long serialVersionUID = 5564948506493524158L; + + private static class SizeEstimatorObserver extends ElementByteSizeObserver + implements Serializable { + + private static final long serialVersionUID = 4569562919962045617L; + private long observedBytes; + + @Override + protected void reportElementSize(long elementByteSize) { + observedBytes = elementByteSize; + } + } + + private final Coder coder; + private final SizeEstimatorObserver observer; + + public CoderSizeEstimator(Coder coder) { + this.coder = coder; + this.observer = new SizeEstimatorObserver(); + } + + /** + * Estimates the size in bytes of the given element with the configured {@link Coder} . + * + * @param element the element instance to be estimated + * @return the estimated size in bytes of the given element + */ + @Override + public long sizeOf(T element) { + try { + coder.registerByteSizeObserver(element, observer); + observer.advance(); + + return observer.observedBytes; + } catch (Exception e) { + throw new EncodingException(e); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimator.java similarity index 59% rename from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimator.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimator.java index 98b5761feda72..3af627e12ec12 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimator.java @@ -18,43 +18,23 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; import org.apache.beam.sdk.annotations.Internal; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * NoOp implementation of a throughput estimator. This will always return 0 as the throughput and it - * will warn users that this is being used (it should not be used in production). + * NoOp implementation of a size estimator. This will always return 0 as the size and it will warn + * users that this is being used (it should not be used in production). */ @Internal -public class NullThroughputEstimator implements ThroughputEstimator { +public class NullSizeEstimator implements SizeEstimator { private static final long serialVersionUID = 7088120208289907630L; - private static final Logger LOG = LoggerFactory.getLogger(NullThroughputEstimator.class); + private static final Logger LOG = LoggerFactory.getLogger(NullSizeEstimator.class); - /** - * NoOp. - * - * @param timeOfRecords ignored - * @param element ignored - */ @Override - public void update(Instant timeOfRecords, T element) { + public long sizeOf(T element) { LOG.warn( - "Trying to update throughput using {}, this operation will have no effect", - this.getClass().getSimpleName()); - } - - /** - * Always returns 0. - * - * @param time ignored - * @return 0 - */ - @Override - public double getFrom(Instant time) { - LOG.warn( - "Trying to retrieve throughput using {}, this operation will always return 0", + "Trying to estimate size using {}, this operation will always return 0", this.getClass().getSimpleName()); return 0; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java index 30d6245059653..b79013f41af2f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java @@ -17,54 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; - -/** - * This class is used to estimate the size in bytes of a given element. It uses the given {@link - * Coder} to calculate the size of the element. - */ -@Internal -public class SizeEstimator implements Serializable { - - private static final long serialVersionUID = 5564948506493524158L; - - private static class SizeEstimatorObserver extends ElementByteSizeObserver - implements Serializable { - - private static final long serialVersionUID = 4569562919962045617L; - private long observedBytes; - - @Override - protected void reportElementSize(long elementByteSize) { - observedBytes = elementByteSize; - } - } - - private final Coder coder; - private final SizeEstimatorObserver observer; - - public SizeEstimator(Coder coder) { - this.coder = coder; - this.observer = new SizeEstimatorObserver(); - } - - /** - * Estimates the size in bytes of the given element with the configured {@link Coder} . - * - * @param element the element instance to be estimated - * @return the estimated size in bytes of the given element - */ - public long sizeOf(T element) { - try { - coder.registerByteSizeObserver(element, observer); - observer.advance(); - - return observer.observedBytes; - } catch (Exception e) { - throw new EncodingException(e); - } - } +public interface SizeEstimator { + long sizeOf(T element); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java deleted file mode 100644 index aeedb31ed134a..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Internal; -import org.joda.time.Instant; - -/** An estimator to calculate the throughput of the outputted elements from a DoFn. */ -@Internal -public interface ThroughputEstimator extends Serializable { - /** - * Updates the estimator with the size of the records. - * - * @param timeOfRecords the committed timestamp of the records - * @param element the element to estimate the byte size of - */ - void update(Instant timeOfRecords, T element); - - /** Returns the estimated throughput for now. */ - default double get() { - return getFrom(Instant.now()); - } - - /** - * Returns the estimated throughput for a specified time. - * - * @param time the specified timestamp to check throughput - */ - double getFrom(Instant time); -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java index ce2cf11a842e5..fe3cdb15a2382 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java @@ -20,6 +20,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; import com.google.cloud.bigtable.data.v2.models.CloseStream; import java.io.Serializable; +import java.math.BigDecimal; import java.util.Objects; import org.apache.beam.sdk.annotations.Internal; import org.checkerframework.checker.nullness.qual.Nullable; @@ -41,15 +42,25 @@ public class StreamProgress implements Serializable { private @Nullable ChangeStreamContinuationToken currentToken; private @Nullable Instant estimatedLowWatermark; + private @Nullable BigDecimal throughputEstimate; + private @Nullable Instant lastRunTimestamp; private @Nullable CloseStream closeStream; private boolean failToLock; + private boolean isHeartbeat; public StreamProgress() {} public StreamProgress( - @Nullable ChangeStreamContinuationToken token, Instant estimatedLowWatermark) { + @Nullable ChangeStreamContinuationToken token, + Instant estimatedLowWatermark, + BigDecimal throughputEstimate, + Instant lastRunTimestamp, + boolean isHeartbeat) { this.currentToken = token; this.estimatedLowWatermark = estimatedLowWatermark; + this.throughputEstimate = throughputEstimate; + this.lastRunTimestamp = lastRunTimestamp; + this.isHeartbeat = isHeartbeat; } public StreamProgress(@Nullable CloseStream closeStream) { @@ -64,6 +75,14 @@ public StreamProgress(@Nullable CloseStream closeStream) { return estimatedLowWatermark; } + public @Nullable BigDecimal getThroughputEstimate() { + return throughputEstimate; + } + + public @Nullable Instant getLastRunTimestamp() { + return lastRunTimestamp; + } + public @Nullable CloseStream getCloseStream() { return closeStream; } @@ -76,6 +95,10 @@ public void setFailToLock(boolean failToLock) { this.failToLock = failToLock; } + public boolean isHeartbeat() { + return this.isHeartbeat; + } + public boolean isEmpty() { return closeStream == null && currentToken == null; } @@ -92,7 +115,10 @@ public boolean equals(@Nullable Object o) { return Objects.equals(getCurrentToken(), that.getCurrentToken()) && Objects.equals(getEstimatedLowWatermark(), that.getEstimatedLowWatermark()) && Objects.equals(getCloseStream(), that.getCloseStream()) - && (isFailToLock() == that.isFailToLock()); + && (isFailToLock() == that.isFailToLock()) + && Objects.equals(getThroughputEstimate(), that.getThroughputEstimate()) + && Objects.equals(getLastRunTimestamp(), that.getLastRunTimestamp()) + && (isHeartbeat() == that.isHeartbeat()); } @Override @@ -111,6 +137,12 @@ public String toString() { + closeStream + ", failToLock=" + failToLock + + ", throughputEstimate=" + + throughputEstimate + + ", lastRunTimestamp=" + + lastRunTimestamp + + ", isHeartbeat=" + + isHeartbeat + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java index 6fe272009ccb4..2e340c2b30b5b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java @@ -30,15 +30,17 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.Heartbeat; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.protobuf.ByteString; import com.google.rpc.Status; +import java.math.BigDecimal; import java.util.Collections; import java.util.Optional; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -49,8 +51,13 @@ import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.Silent.class) public class ChangeStreamActionTest { private ChangeStreamMetrics metrics; @@ -58,9 +65,10 @@ public class ChangeStreamActionTest { private RestrictionTracker tracker; private PartitionRecord partitionRecord; - private DoFn.OutputReceiver> receiver; + private DoFn.OutputReceiver> receiver; private ManualWatermarkEstimator watermarkEstimator; - private ThroughputEstimator> throughputEstimator; + private BytesThroughputEstimator> throughputEstimator; + @Captor private ArgumentCaptor streamProgressArgumentCaptor; @Before public void setUp() { @@ -69,10 +77,12 @@ public void setUp() { partitionRecord = mock(PartitionRecord.class); receiver = mock(DoFn.OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); - throughputEstimator = mock(ThroughputEstimator.class); + throughputEstimator = mock(BytesThroughputEstimator.class); - action = new ChangeStreamAction(metrics, throughputEstimator); + action = new ChangeStreamAction(metrics); when(tracker.tryClaim(any())).thenReturn(true); + when(partitionRecord.getPartition()).thenReturn(ByteStringRange.create("a", "b")); + when(throughputEstimator.get()).thenReturn(BigDecimal.valueOf(1000)); } @Test @@ -87,14 +97,41 @@ public void testHeartBeat() { .thenReturn(changeStreamContinuationToken); final Optional result = - action.run(partitionRecord, mockHeartBeat, tracker, receiver, watermarkEstimator, false); + action.run( + partitionRecord, + mockHeartBeat, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertFalse(result.isPresent()); verify(metrics).incHeartbeatCount(); verify(watermarkEstimator).setWatermark(eq(lowWatermark)); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark); - verify(tracker).tryClaim(eq(streamProgress)); - verify(throughputEstimator, never()).update(any(), any()); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + lowWatermark, + BigDecimal.valueOf(1000), + Instant.now(), + true); + verify(tracker).tryClaim(streamProgressArgumentCaptor.capture()); + assertEquals( + streamProgress.getCurrentToken(), + streamProgressArgumentCaptor.getValue().getCurrentToken()); + assertEquals( + streamProgress.getThroughputEstimate(), + streamProgressArgumentCaptor.getValue().getThroughputEstimate()); + assertEquals( + streamProgress.getEstimatedLowWatermark(), + streamProgressArgumentCaptor.getValue().getEstimatedLowWatermark()); + assertEquals( + streamProgress.isHeartbeat(), streamProgressArgumentCaptor.getValue().isHeartbeat()); + KV record = + KV.of(ByteStringRange.serializeToByteString(partitionRecord.getPartition()), mockHeartBeat); + verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH)); + verify(throughputEstimator).update(any(), eq(record)); } @Test @@ -109,14 +146,20 @@ public void testCloseStreamResume() { .thenReturn(Collections.singletonList(changeStreamContinuationToken)); final Optional result = - action.run(partitionRecord, mockCloseStream, tracker, receiver, watermarkEstimator, false); + action.run( + partitionRecord, + mockCloseStream, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertTrue(result.isPresent()); assertEquals(DoFn.ProcessContinuation.resume(), result.get()); verify(metrics).incClosestreamCount(); StreamProgress streamProgress = new StreamProgress(mockCloseStream); verify(tracker).tryClaim(eq(streamProgress)); - verify(throughputEstimator, never()).update(any(), any()); } @Test @@ -134,18 +177,41 @@ public void testChangeStreamMutationUser() { Mockito.when(changeStreamMutation.getEstimatedLowWatermark()) .thenReturn(toThreetenInstant(lowWatermark)); Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.USER); - KV record = + KV record = KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); final Optional result = action.run( - partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false); + partitionRecord, + changeStreamMutation, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertFalse(result.isPresent()); verify(metrics).incChangeStreamMutationUserCounter(); verify(metrics, never()).incChangeStreamMutationGcCounter(); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark); - verify(tracker).tryClaim(eq(streamProgress)); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + lowWatermark, + BigDecimal.valueOf(1000), + Instant.now(), + false); + verify(tracker).tryClaim(streamProgressArgumentCaptor.capture()); + assertEquals( + streamProgress.getCurrentToken(), + streamProgressArgumentCaptor.getValue().getCurrentToken()); + assertEquals( + streamProgress.getThroughputEstimate(), + streamProgressArgumentCaptor.getValue().getThroughputEstimate()); + assertEquals( + streamProgress.getEstimatedLowWatermark(), + streamProgressArgumentCaptor.getValue().getEstimatedLowWatermark()); + assertEquals( + streamProgress.isHeartbeat(), streamProgressArgumentCaptor.getValue().isHeartbeat()); verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH)); verify(watermarkEstimator).setWatermark(eq(lowWatermark)); verify(throughputEstimator).update(any(), eq(record)); @@ -167,18 +233,41 @@ public void testChangeStreamMutationGc() { .thenReturn(toThreetenInstant(lowWatermark)); Mockito.when(changeStreamMutation.getType()) .thenReturn(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION); - KV record = + KV record = KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); final Optional result = action.run( - partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false); + partitionRecord, + changeStreamMutation, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertFalse(result.isPresent()); verify(metrics).incChangeStreamMutationGcCounter(); verify(metrics, never()).incChangeStreamMutationUserCounter(); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark); - verify(tracker).tryClaim(eq(streamProgress)); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + lowWatermark, + BigDecimal.valueOf(1000), + Instant.now(), + false); + verify(tracker).tryClaim(streamProgressArgumentCaptor.capture()); + assertEquals( + streamProgress.getCurrentToken(), + streamProgressArgumentCaptor.getValue().getCurrentToken()); + assertEquals( + streamProgress.getThroughputEstimate(), + streamProgressArgumentCaptor.getValue().getThroughputEstimate()); + assertEquals( + streamProgress.getEstimatedLowWatermark(), + streamProgressArgumentCaptor.getValue().getEstimatedLowWatermark()); + assertEquals( + streamProgress.isHeartbeat(), streamProgressArgumentCaptor.getValue().isHeartbeat()); verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH)); verify(watermarkEstimator).setWatermark(eq(lowWatermark)); verify(throughputEstimator).update(any(), eq(record)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java index f997db07a8aea..305be15688516 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java @@ -30,7 +30,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.Heartbeat; @@ -45,6 +44,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; @@ -71,8 +71,9 @@ public class ReadChangeStreamPartitionActionTest { // private PartitionRecord partitionRecord; private StreamProgress restriction; private RestrictionTracker tracker; - private DoFn.OutputReceiver> receiver; + private DoFn.OutputReceiver> receiver; private ManualWatermarkEstimator watermarkEstimator; + private CoderSizeEstimator> sizeEstimator; private ByteStringRange partition; private String uuid; @@ -84,11 +85,17 @@ public void setUp() throws Exception { changeStreamDao = mock(ChangeStreamDao.class); metrics = mock(ChangeStreamMetrics.class); changeStreamAction = mock(ChangeStreamAction.class); + sizeEstimator = mock(CoderSizeEstimator.class); Duration heartbeatDuration = Duration.standardSeconds(1); action = new ReadChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); restriction = mock(StreamProgress.class); tracker = mock(ReadChangeStreamPartitionProgressTracker.class); @@ -124,7 +131,7 @@ public void testLockingRowSucceed() throws IOException { when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), anyBoolean())) .thenReturn(responses); - when(changeStreamAction.run(any(), any(), any(), any(), any(), anyBoolean())) + when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), anyBoolean())) .thenReturn(Optional.of(DoFn.ProcessContinuation.stop())); final DoFn.ProcessContinuation result = @@ -133,7 +140,7 @@ public void testLockingRowSucceed() throws IOException { assertEquals(DoFn.ProcessContinuation.stop(), result); // Verify that on successful lock, we don't tryClaim on the tracker. verify(tracker, never()).tryClaim(any()); - verify(changeStreamAction).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -150,7 +157,7 @@ public void testLockingRowFailsStops() throws IOException { StreamProgress streamProgress = new StreamProgress(); streamProgress.setFailToLock(true); verify(tracker).tryClaim(streamProgress); - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -170,7 +177,7 @@ public void testLockingRowNotNeededAfterFirstRun() throws IOException { when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), anyBoolean())) .thenReturn(responses); - when(changeStreamAction.run(any(), any(), any(), any(), any(), anyBoolean())) + when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), anyBoolean())) .thenReturn(Optional.of(DoFn.ProcessContinuation.stop())); final DoFn.ProcessContinuation result = @@ -179,7 +186,7 @@ public void testLockingRowNotNeededAfterFirstRun() throws IOException { assertEquals(DoFn.ProcessContinuation.stop(), result); // Verify that on successful lock, we don't tryClaim on the tracker. verify(tracker, never()).tryClaim(any()); - verify(changeStreamAction).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -198,7 +205,7 @@ public void testLockingRowNotNeededAfterFirstRunNotSame() throws IOException { StreamProgress streamProgress = new StreamProgress(); streamProgress.setFailToLock(true); verify(tracker).tryClaim(streamProgress); - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -217,13 +224,13 @@ public void testThatChangeStreamWorkerCounterIsIncrementedOnInitialRun() throws when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), anyBoolean())) .thenReturn(responses); - when(changeStreamAction.run(any(), any(), any(), any(), any(), anyBoolean())) + when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), anyBoolean())) .thenReturn(Optional.of(DoFn.ProcessContinuation.stop())); final DoFn.ProcessContinuation result = action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); - verify(changeStreamAction).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -240,7 +247,7 @@ public void testCloseStreamTerminateOKStatus() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -266,7 +273,7 @@ public void testCloseStreamTerminateNotOutOfRangeStatus() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -301,7 +308,7 @@ public void testCloseStreamWritesContinuationTokens() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -353,7 +360,7 @@ public void testCloseStreamNewPartitionMerge() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -399,7 +406,7 @@ public void testCloseStreamMergeWithoutNewPartitionsField() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java new file mode 100644 index 0000000000000..46fe78dd878e9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.CloseStream; +import com.google.cloud.bigtable.data.v2.models.Heartbeat; +import com.google.protobuf.ByteString; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class FilterForMutationDoFnTest { + + private FilterForMutationDoFn doFn; + @Mock private DoFn.OutputReceiver> outputReceiver; + + @Before + public void setup() { + doFn = new FilterForMutationDoFn(); + } + + @Test + public void shouldNotOutputHeartbeats() { + Heartbeat heartbeat = mock(Heartbeat.class); + doFn.processElement(KV.of(ByteString.copyFromUtf8("test"), heartbeat), outputReceiver); + verify(outputReceiver, never()).output(any()); + } + + @Test + public void shouldOutputChangeStreamMutations() { + ChangeStreamMutation mutation = mock(ChangeStreamMutation.class); + doFn.processElement(KV.of(ByteString.copyFromUtf8("test"), mutation), outputReceiver); + verify(outputReceiver, times(1)).output(KV.of(ByteString.copyFromUtf8("test"), mutation)); + } + + @Test + public void shouldOutputCloseStreams() { + // This shouldn't happen but if it were to we wouldn't want the CloseStreams to be returned to + // users + CloseStream closeStream = mock(CloseStream.class); + doFn.processElement(KV.of(ByteString.copyFromUtf8("test"), closeStream), outputReceiver); + verify(outputReceiver, never()).output(any()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index e8f59c2bd1776..cb82fc6d19ca6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -33,6 +33,7 @@ import com.google.cloud.bigtable.data.v2.models.Range; import com.google.protobuf.ByteString; import java.io.IOException; +import java.math.BigDecimal; import java.util.Collections; import java.util.Iterator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; @@ -42,8 +43,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -58,7 +58,7 @@ public class ReadChangeStreamPartitionDoFnTest { private ChangeStreamDao changeStreamDao; private MetadataTableDao metadataTableDao; - private SizeEstimator> sizeEstimator; + private CoderSizeEstimator> sizeEstimator; private ReadChangeStreamPartitionDoFn doFn; @Before @@ -74,22 +74,28 @@ public void setup() throws IOException { ActionFactory actionFactory = mock(ActionFactory.class); ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); - sizeEstimator = mock(SizeEstimator.class); - BytesThroughputEstimator> throughputEstimator = - new BytesThroughputEstimator<>( - ReadChangeStreamPartitionDoFn.THROUGHPUT_ESTIMATION_WINDOW_SECONDS, sizeEstimator, 1); - ChangeStreamAction changeStreamAction = new ChangeStreamAction(metrics, throughputEstimator); + sizeEstimator = mock(CoderSizeEstimator.class); + ChangeStreamAction changeStreamAction = new ChangeStreamAction(metrics); ReadChangeStreamPartitionAction readChangeStreamPartitionAction = new ReadChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); - when(actionFactory.changeStreamAction(metrics, throughputEstimator)) - .thenReturn(changeStreamAction); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); + when(actionFactory.changeStreamAction(metrics)).thenReturn(changeStreamAction); when(actionFactory.readChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration)) + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator)) .thenReturn(readChangeStreamPartitionAction); doFn = new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics); - doFn.setThroughputEstimator(throughputEstimator); + doFn.setSizeEstimator(sizeEstimator); } @Test @@ -112,7 +118,7 @@ public void testProcessElementAndGetSize() throws IOException, InterruptedExcept ReadChangeStreamPartitionProgressTracker restrictionTracker = mock(ReadChangeStreamPartitionProgressTracker.class); when(restrictionTracker.currentRestriction()).thenReturn(new StreamProgress()); - DoFn.OutputReceiver> receiver = + DoFn.OutputReceiver> receiver = mock(DoFn.OutputReceiver.class); ManualWatermarkEstimator watermarkEstimator = mock(ManualWatermarkEstimator.class); doFn.setup(); @@ -138,10 +144,43 @@ public void testProcessElementAndGetSize() throws IOException, InterruptedExcept when(restrictionTracker.tryClaim(any())).thenReturn(true, true, false); doFn.processElement(partition, restrictionTracker, receiver, watermarkEstimator); - double sizeEstimate = doFn.getSize(new StreamProgress(testToken, tenSecondsAgo)); + double sizeEstimate = + doFn.getSize( + new StreamProgress( + testToken, tenSecondsAgo, BigDecimal.valueOf(20), Instant.now(), false)); // we should have output 2 100B mutations in the past 10s long bytesPerSecond = (mutationSize * 2) / 10; - assertEquals(sizeEstimate, bytesPerSecond * watermarkLag, .1d); + assertEquals(sizeEstimate, bytesPerSecond * watermarkLag, 10); verify(receiver, times(2)).outputWithTimestamp(KV.of(rowKey, mockMutation), Instant.EPOCH); } + + @Test + public void testGetSizeCantBeNegative() throws IOException { + long mutationSize = 100L; + when(sizeEstimator.sizeOf(any())).thenReturn(mutationSize); + Range.ByteStringRange partitionRange = Range.ByteStringRange.create("", ""); + ChangeStreamContinuationToken testToken = + ChangeStreamContinuationToken.create(partitionRange, "test"); + doFn.setup(); + + double mutationEstimate = + doFn.getSize( + new StreamProgress( + testToken, + Instant.now().plus(Duration.standardMinutes(10)), + BigDecimal.valueOf(1000), + Instant.now().plus(Duration.standardMinutes(10)), + false)); + assertEquals(0, mutationEstimate, 0); + + double heartbeatEstimate = + doFn.getSize( + new StreamProgress( + testToken, + Instant.now().plus(Duration.standardMinutes(10)), + BigDecimal.valueOf(1000), + Instant.now().plus(Duration.standardMinutes(10)), + true)); + assertEquals(0, heartbeatEstimate, 0); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java index e10f730720668..90dd863bba106 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java @@ -29,113 +29,79 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; public class BytesThroughputEstimatorTest { private static final double DELTA = 1e-10; - private static final int WINDOW_SIZE_SECONDS = 10; - private BytesThroughputEstimator estimator; - - @Before - public void setup() { - final SizeEstimator sizeEstimator = new SizeEstimator<>(new TestCoder()); - estimator = new BytesThroughputEstimator<>(WINDOW_SIZE_SECONDS, sizeEstimator, 1); - } + private final SizeEstimator sizeEstimator = new CoderSizeEstimator<>(new TestCoder()); @Test public void testThroughputIsZeroWhenNothingsBeenRegistered() { - assertEquals(0D, estimator.get(), DELTA); - assertEquals(0D, estimator.getFrom(Instant.now()), DELTA); + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, Instant.now()); + assertEquals(0D, estimator.get().doubleValue(), DELTA); } @Test public void testThroughputCalculation() { + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(0)); estimator.update(Instant.ofEpochSecond(2), new byte[10]); estimator.update(Instant.ofEpochSecond(3), new byte[20]); estimator.update(Instant.ofEpochSecond(5), new byte[30]); estimator.update(Instant.ofEpochSecond(10), new byte[40]); // (10 + 20 + 30 + 40) / 10 sec window = 10 - assertEquals(10D, estimator.getFrom(Instant.ofEpochSecond(11)), DELTA); - - estimator.update(Instant.ofEpochSecond(20), new byte[10]); - estimator.update(Instant.ofEpochSecond(21), new byte[20]); - estimator.update(Instant.ofEpochSecond(21), new byte[10]); - estimator.update(Instant.ofEpochSecond(29), new byte[40]); + assertEquals(10D, estimator.get().doubleValue(), DELTA); + + BytesThroughputEstimator estimator2 = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(20)); + estimator2.update(Instant.ofEpochSecond(21), new byte[10]); + estimator2.update(Instant.ofEpochSecond(22), new byte[20]); + estimator2.update(Instant.ofEpochSecond(23), new byte[10]); + estimator2.update(Instant.ofEpochSecond(30), new byte[40]); // (10 + 20 + 10 + 40) / 10 sec window = 8 - assertEquals(8D, estimator.getFrom(Instant.ofEpochSecond(30)), DELTA); + assertEquals(8D, estimator2.get().doubleValue(), DELTA); - estimator.update(Instant.ofEpochSecond(31), new byte[10]); - estimator.update(Instant.ofEpochSecond(35), new byte[40]); + BytesThroughputEstimator estimator3 = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(30)); + estimator3.update(Instant.ofEpochSecond(31), new byte[10]); + estimator3.update(Instant.ofEpochSecond(40), new byte[40]); // (10 + 40) / 10 sec window = 5 - assertEquals(5D, estimator.getFrom(Instant.ofEpochSecond(41)), DELTA); - - // No values in the past 10 seconds - assertEquals(0D, estimator.getFrom(Instant.ofEpochSecond(50)), DELTA); + assertEquals(5D, estimator3.get().doubleValue(), DELTA); } @Test public void testThroughputIsAccumulatedWithin60SecondsWindow() { - List> pairs = generateTestData(100, 0, 10); + List> pairs = generateTestData(100, 0, 11); pairs.sort(Comparator.comparing(ImmutablePair::getLeft)); - final Instant lastUpdateTimestamp = pairs.get(pairs.size() - 1).getLeft(); BigDecimal sum = BigDecimal.valueOf(0L); for (ImmutablePair pair : pairs) { sum = sum.add(BigDecimal.valueOf(pair.getRight().length)); } - final BigDecimal want = - sum.divide(BigDecimal.valueOf(WINDOW_SIZE_SECONDS), MathContext.DECIMAL128); + final BigDecimal want = sum.divide(BigDecimal.valueOf(10), MathContext.DECIMAL128); + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(0)); for (ImmutablePair pair : pairs) { estimator.update(pair.getLeft(), pair.getRight()); } - double actual = estimator.getFrom(Instant.ofEpochSecond(10)); - assertEquals(want.doubleValue(), actual, DELTA); - - // After window without updates the throughput should be zero - final Instant afterWindowTimestamp = - Instant.ofEpochSecond( - TimestampConverter.toSeconds(lastUpdateTimestamp) + WINDOW_SIZE_SECONDS + 1); - assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA); + double actual = estimator.get().doubleValue(); + assertEquals(want.doubleValue(), actual, 1); } @Test - public void testThroughputIsAccumulatedWithin50SecondsWindow() { - final List> excludedPairs = generateTestData(300, 0, 10); - final List> expectedPairs = generateTestData(50, 10, 20); - final List> pairs = - Stream.concat(excludedPairs.stream(), expectedPairs.stream()) - .sorted(Comparator.comparing(ImmutablePair::getLeft)) - .collect(Collectors.toList()); - final Instant lastUpdateTimestamp = pairs.get(pairs.size() - 1).getLeft(); - - BigDecimal sum = BigDecimal.valueOf(0L); - for (ImmutablePair pair : expectedPairs) { - sum = sum.add(BigDecimal.valueOf(pair.getRight().length)); - } - final BigDecimal want = - sum.divide(BigDecimal.valueOf(WINDOW_SIZE_SECONDS), MathContext.DECIMAL128); - for (ImmutablePair pair : pairs) { - estimator.update(pair.getLeft(), pair.getRight()); - } - - double actual = estimator.getFrom(Instant.ofEpochSecond(20)); - assertEquals(want.doubleValue(), actual, DELTA); - - // After window without updates the throughput should be zero - final Instant afterWindowTimestamp = - Instant.ofEpochSecond( - TimestampConverter.toSeconds(lastUpdateTimestamp) + WINDOW_SIZE_SECONDS + 1); - assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA); + public void testThroughputHandlesNoTimeDifference() { + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(0)); + estimator.update(Instant.ofEpochSecond(0), new byte[10]); + // (10 / 1) * 1000 because we round up to one millisecond + assertEquals(10000D, estimator.get().doubleValue(), DELTA); } private List> generateTestData( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimatorTest.java similarity index 68% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimatorTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimatorTest.java index 266dc1255e529..a229c0e66bef3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimatorTest.java @@ -19,20 +19,16 @@ import static org.junit.Assert.assertEquals; -import org.joda.time.Instant; import org.junit.Test; -public class NullThroughputEstimatorTest { +public class NullSizeEstimatorTest { private static final double DELTA = 1e-10; @Test public void alwaysReturns0AsEstimatedThroughput() { - final NullThroughputEstimator estimator = new NullThroughputEstimator<>(); - assertEquals(estimator.get(), 0D, DELTA); - - estimator.update(Instant.ofEpochSecond(1), new byte[10]); - assertEquals(estimator.getFrom(Instant.ofEpochSecond(1)), 0D, DELTA); - estimator.update(Instant.ofEpochSecond(2), new byte[20]); - assertEquals(estimator.getFrom(Instant.ofEpochSecond(2)), 0D, DELTA); + final NullSizeEstimator estimator = new NullSizeEstimator<>(); + assertEquals(estimator.sizeOf(new byte[40]), 0D, DELTA); + assertEquals(estimator.sizeOf(new byte[20]), 0D, DELTA); + assertEquals(estimator.sizeOf(new byte[10]), 0D, DELTA); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java index b0bc8de7c8114..e2f0cbb62a614 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; import com.google.cloud.bigtable.data.v2.models.Range; +import java.math.BigDecimal; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.joda.time.Instant; import org.junit.Test; @@ -41,7 +42,8 @@ public void testTryClaim() { ChangeStreamContinuationToken changeStreamContinuationToken = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("a", "b"), "1234"); final StreamProgress streamProgress2 = - new StreamProgress(changeStreamContinuationToken, Instant.now()); + new StreamProgress( + changeStreamContinuationToken, Instant.now(), BigDecimal.ONE, Instant.now(), false); assertTrue(tracker.tryClaim(streamProgress2)); assertEquals(streamProgress2, tracker.currentRestriction()); assertEquals(