diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index fc8b5e78e8a05..319c31390217a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -29,6 +29,7 @@ import com.google.bigtable.v2.RowFilter; import com.google.cloud.bigtable.config.BigtableOptions; import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.protobuf.ByteString; import java.io.IOException; @@ -50,10 +51,10 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.FilterForMutationDoFn; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.InitializeDoFn; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; @@ -2068,22 +2069,20 @@ public PCollection> expand(PBegin input) { ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics); - PCollection> output = + PCollection> readChangeStreamOutput = input .apply(Impulse.create()) .apply("Initialize", ParDo.of(initializeDoFn)) .apply("DetectNewPartition", ParDo.of(detectNewPartitionsDoFn)) .apply("ReadChangeStreamPartition", ParDo.of(readChangeStreamPartitionDoFn)); - Coder> outputCoder = output.getCoder(); - SizeEstimator> sizeEstimator = - new SizeEstimator<>(outputCoder); - BytesThroughputEstimator> throughputEstimator = - new BytesThroughputEstimator<>( - ReadChangeStreamPartitionDoFn.THROUGHPUT_ESTIMATION_WINDOW_SECONDS, sizeEstimator); - readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator); + Coder> outputCoder = readChangeStreamOutput.getCoder(); + CoderSizeEstimator> sizeEstimator = + new CoderSizeEstimator<>(outputCoder); + readChangeStreamPartitionDoFn.setSizeEstimator(sizeEstimator); - return output; + return readChangeStreamOutput.apply( + "FilterForMutation", ParDo.of(new FilterForMutationDoFn())); } @AutoValue.Builder diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java index 43623b5de160c..6aabcb081ad4e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.protobuf.ByteString; import java.io.Serializable; import javax.annotation.Nullable; @@ -25,7 +25,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; import org.apache.beam.sdk.values.KV; import org.joda.time.Duration; import org.joda.time.Instant; @@ -56,11 +56,9 @@ public class ActionFactory implements Serializable { * * @return singleton instance of the {@link ChangeStreamAction} */ - public synchronized ChangeStreamAction changeStreamAction( - ChangeStreamMetrics metrics, - ThroughputEstimator> throughputEstimator) { + public synchronized ChangeStreamAction changeStreamAction(ChangeStreamMetrics metrics) { if (changeStreamAction == null) { - changeStreamAction = new ChangeStreamAction(metrics, throughputEstimator); + changeStreamAction = new ChangeStreamAction(metrics); } return changeStreamAction; } @@ -145,11 +143,17 @@ public synchronized ReadChangeStreamPartitionAction readChangeStreamPartitionAct ChangeStreamDao changeStreamDao, ChangeStreamMetrics metrics, ChangeStreamAction changeStreamAction, - Duration heartbeatDuration) { + Duration heartbeatDuration, + SizeEstimator> sizeEstimator) { if (readChangeStreamPartitionAction == null) { readChangeStreamPartitionAction = new ReadChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); } return readChangeStreamPartitionAction; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java index 4c3f9c5a83353..3ab3b1973b973 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java @@ -31,7 +31,7 @@ import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; import org.apache.beam.sdk.transforms.DoFn; @@ -46,20 +46,15 @@ @Internal public class ChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamAction.class); - private final ChangeStreamMetrics metrics; - private final ThroughputEstimator> throughputEstimator; /** * Constructs ChangeStreamAction to process individual ChangeStreamRecord. * * @param metrics record beam metrics. */ - public ChangeStreamAction( - ChangeStreamMetrics metrics, - ThroughputEstimator> throughputEstimator) { + public ChangeStreamAction(ChangeStreamMetrics metrics) { this.metrics = metrics; - this.throughputEstimator = throughputEstimator; } /** @@ -111,14 +106,27 @@ public Optional run( PartitionRecord partitionRecord, ChangeStreamRecord record, RestrictionTracker tracker, - DoFn.OutputReceiver> receiver, + DoFn.OutputReceiver> receiver, ManualWatermarkEstimator watermarkEstimator, + BytesThroughputEstimator> throughputEstimator, boolean shouldDebug) { if (record instanceof Heartbeat) { Heartbeat heartbeat = (Heartbeat) record; final Instant watermark = toJodaTime(heartbeat.getEstimatedLowWatermark()); + + // These will be filtered so the key doesn't really matter but the most logical thing to + // key a heartbeat by is the partition it corresponds to. + ByteString heartbeatKey = + Range.ByteStringRange.serializeToByteString(partitionRecord.getPartition()); + KV outputRecord = KV.of(heartbeatKey, heartbeat); + throughputEstimator.update(Instant.now(), outputRecord); StreamProgress streamProgress = - new StreamProgress(heartbeat.getChangeStreamContinuationToken(), watermark); + new StreamProgress( + heartbeat.getChangeStreamContinuationToken(), + watermark, + throughputEstimator.get(), + Instant.now(), + true); watermarkEstimator.setWatermark(watermark); if (shouldDebug) { @@ -142,6 +150,15 @@ public Optional run( return Optional.of(DoFn.ProcessContinuation.stop()); } metrics.incHeartbeatCount(); + // We output heartbeats so that they are factored into throughput and can be used to + // autoscale. These will be filtered in a downstream step and never returned to users. This is + // to prevent autoscaler from scaling down when we have large tables with no throughput but + // we need enough workers to keep up with heartbeats. + + // We are outputting elements with timestamp of 0 to prevent reliance on event time. This + // limits the ability to window on commit time of any data changes. It is still possible to + // window on processing time. + receiver.outputWithTimestamp(outputRecord, Instant.EPOCH); } else if (record instanceof CloseStream) { CloseStream closeStream = (CloseStream) record; StreamProgress streamProgress = new StreamProgress(closeStream); @@ -185,7 +202,17 @@ public Optional run( partitionRecord.getPartition().getStart(), partitionRecord.getPartition().getEnd()), changeStreamMutation.getToken()); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, watermark); + + KV outputRecord = + KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); + throughputEstimator.update(Instant.now(), outputRecord); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + watermark, + throughputEstimator.get(), + Instant.now(), + false); // If the tracker fail to claim the streamProgress, it most likely means the runner initiated // a checkpoint. See ReadChangeStreamPartitionProgressTracker for more information regarding // runner initiated checkpoints. @@ -206,9 +233,6 @@ public Optional run( metrics.updateProcessingDelayFromCommitTimestamp( Instant.now().getMillis() - delay.getMillis()); - KV outputRecord = - KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); - throughputEstimator.update(Instant.now(), outputRecord); // We are outputting elements with timestamp of 0 to prevent reliance on event time. This // limits the ability to window on commit time of any data changes. It is still possible to // window on processing time. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java index 2270cbd08ac13..87273c9894d91 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java @@ -25,7 +25,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.bigtable.common.Status; import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; @@ -40,6 +39,8 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -66,18 +67,21 @@ public class ReadChangeStreamPartitionAction { private final ChangeStreamMetrics metrics; private final ChangeStreamAction changeStreamAction; private final Duration heartbeatDuration; + private final SizeEstimator> sizeEstimator; public ReadChangeStreamPartitionAction( MetadataTableDao metadataTableDao, ChangeStreamDao changeStreamDao, ChangeStreamMetrics metrics, ChangeStreamAction changeStreamAction, - Duration heartbeatDuration) { + Duration heartbeatDuration, + SizeEstimator> sizeEstimator) { this.metadataTableDao = metadataTableDao; this.changeStreamDao = changeStreamDao; this.metrics = metrics; this.changeStreamAction = changeStreamAction; this.heartbeatDuration = heartbeatDuration; + this.sizeEstimator = sizeEstimator; } /** @@ -131,12 +135,14 @@ public ReadChangeStreamPartitionAction( public ProcessContinuation run( PartitionRecord partitionRecord, RestrictionTracker tracker, - OutputReceiver> receiver, + OutputReceiver> receiver, ManualWatermarkEstimator watermarkEstimator) throws IOException { // Watermark being delayed beyond 5 minutes signals a possible problem. boolean shouldDebug = watermarkEstimator.getState().plus(Duration.standardMinutes(5)).isBeforeNow(); + BytesThroughputEstimator> throughputEstimator = + new BytesThroughputEstimator<>(sizeEstimator, Instant.now()); if (shouldDebug) { LOG.info( @@ -298,7 +304,13 @@ public ProcessContinuation run( for (ChangeStreamRecord record : stream) { Optional result = changeStreamAction.run( - partitionRecord, record, tracker, receiver, watermarkEstimator, shouldDebug); + partitionRecord, + record, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + shouldDebug); // changeStreamAction will usually return Optional.empty() except for when a checkpoint // (either runner or pipeline initiated) is required. if (result.isPresent()) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java new file mode 100644 index 0000000000000..968e2ecddaa65 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFn.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; +import com.google.protobuf.ByteString; +import java.io.Serializable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +public class FilterForMutationDoFn + extends DoFn, KV> + implements Serializable { + + @ProcessElement + public void processElement( + @Element KV changeStreamRecordKV, + OutputReceiver> receiver) { + ChangeStreamRecord inputRecord = changeStreamRecordKV.getValue(); + if (inputRecord instanceof ChangeStreamMutation) { + receiver.output(KV.of(changeStreamRecordKV.getKey(), (ChangeStreamMutation) inputRecord)); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index 9fcf9e12968f5..91210caa5972d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.protobuf.ByteString; import java.io.IOException; import java.math.BigDecimal; +import java.math.RoundingMode; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction; @@ -29,9 +31,9 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.NullThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.NullSizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -41,17 +43,18 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // Allows for readChangeStreamPartitionAction setup -@SuppressWarnings("initialization.fields.uninitialized") +@SuppressWarnings({"initialization.fields.uninitialized", "dereference.of.nullable"}) @Internal @UnboundedPerElement public class ReadChangeStreamPartitionDoFn - extends DoFn> { + extends DoFn> { private static final long serialVersionUID = 4418739381635104479L; private static final BigDecimal MAX_DOUBLE = BigDecimal.valueOf(Double.MAX_VALUE); private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class); @@ -61,8 +64,7 @@ public class ReadChangeStreamPartitionDoFn private final DaoFactory daoFactory; private final ChangeStreamMetrics metrics; private final ActionFactory actionFactory; - private ThroughputEstimator> throughputEstimator; - + private SizeEstimator> sizeEstimator; private ReadChangeStreamPartitionAction readChangeStreamPartitionAction; public ReadChangeStreamPartitionDoFn( @@ -74,7 +76,7 @@ public ReadChangeStreamPartitionDoFn( this.daoFactory = daoFactory; this.metrics = metrics; this.actionFactory = actionFactory; - this.throughputEstimator = new NullThroughputEstimator<>(); + this.sizeEstimator = new NullSizeEstimator<>(); } @GetInitialWatermarkEstimatorState @@ -106,30 +108,55 @@ public double getSize(@Restriction StreamProgress streamProgress) { return 0d; } Instant lowWatermark = streamProgress.getEstimatedLowWatermark(); + BigDecimal estimatedThroughput = streamProgress.getThroughputEstimate(); + Instant lastRunTimestamp = streamProgress.getLastRunTimestamp(); // This should only be null if: // 1) We've failed to lock for the partition in which case we expect 0 throughput // 2) We've received a CloseStream in which case we won't process any more data for // this partition // 3) RCSP has just started and hasn't completed a checkpoint yet, in which case we can't // estimate throughput yet - if (lowWatermark == null) { + if (lowWatermark == null || estimatedThroughput == null || lastRunTimestamp == null) { return 0; } + String partition = ""; + if (streamProgress.getCurrentToken() != null) { + partition = + ByteStringRangeHelper.formatByteStringRange( + Preconditions.checkNotNull(streamProgress.getCurrentToken()).getPartition()); + } + + // Heartbeat lowWatermark takes up to a minute to update on the server. We don't want + // this to count against the backlog and prevent scaling down, so we estimate heartbeat backlog + // using the time we most recently processed a heartbeat. Otherwise, (for mutations) we use the + // watermark. + Duration processingTimeLag = + Duration.millis( + Instant.now().getMillis() - streamProgress.getLastRunTimestamp().getMillis()); Duration watermarkLag = Duration.millis(Instant.now().getMillis() - lowWatermark.getMillis()); - BigDecimal estimatedThroughput = BigDecimal.valueOf(throughputEstimator.get()); + long lagInMillis = + (streamProgress.isHeartbeat() ? processingTimeLag : watermarkLag).getMillis(); // Return the estimated bytes per second throughput multiplied by the amount of known work // outstanding (watermark lag). Cap at max double to avoid overflow. - final double estimatedSize = + double estimatedSize = estimatedThroughput - .multiply(BigDecimal.valueOf(watermarkLag.getStandardSeconds())) + .multiply(BigDecimal.valueOf(lagInMillis)) + .divide(BigDecimal.valueOf(1000), 3, RoundingMode.DOWN) .min(MAX_DOUBLE) + // Lag can be negative from clock skew. We treat that as caught up, so + // it should return zero. + .max(BigDecimal.ZERO) .doubleValue(); + LOG.debug( - "Estimated size: throughputBytes: {} x watermarkLag {} = {}", + "Estimated size (per second): partition: {}, isHeartbeat: {}, throughputBytes: {} x watermarkLagMillis {} = {}, lastRun = {}", + partition, + streamProgress.isHeartbeat(), estimatedThroughput, - watermarkLag, - estimatedSize); + lagInMillis, + estimatedSize, + streamProgress.getLastRunTimestamp()); return estimatedSize; } @@ -137,18 +164,22 @@ public double getSize(@Restriction StreamProgress streamProgress) { public void setup() throws IOException { MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao(); ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao(); - ChangeStreamAction changeStreamAction = - actionFactory.changeStreamAction(this.metrics, this.throughputEstimator); + ChangeStreamAction changeStreamAction = actionFactory.changeStreamAction(this.metrics); readChangeStreamPartitionAction = actionFactory.readChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); } @ProcessElement public ProcessContinuation processElement( @Element PartitionRecord partitionRecord, RestrictionTracker tracker, - OutputReceiver> receiver, + OutputReceiver> receiver, ManualWatermarkEstimator watermarkEstimator) throws InterruptedException, IOException { return readChangeStreamPartitionAction.run( @@ -158,10 +189,10 @@ public ProcessContinuation processElement( /** * Sets the estimator to track throughput for each DoFn instance. * - * @param throughputEstimator an estimator to calculate DoFn instance level throughput + * @param sizeEstimator an estimator to calculate the size of records for throughput estimates */ - public void setThroughputEstimator( - BytesThroughputEstimator> throughputEstimator) { - this.throughputEstimator = throughputEstimator; + public void setSizeEstimator( + CoderSizeEstimator> sizeEstimator) { + this.sizeEstimator = sizeEstimator; } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java index d9fa19e87e1cb..ec7a71ba5d3ee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimator.java @@ -17,15 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; -import java.io.Serializable; import java.math.BigDecimal; -import java.math.MathContext; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.Random; +import java.math.RoundingMode; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -42,64 +39,36 @@ * every element */ @Internal -public class BytesThroughputEstimator implements ThroughputEstimator { +public class BytesThroughputEstimator { - private static final long serialVersionUID = -1147130541208370666L; - private static final BigDecimal MAX_DOUBLE = BigDecimal.valueOf(Double.MAX_VALUE); + private static final long serialVersionUID = 6014227751984587954L; private static final int DEFAULT_SAMPLE_RATE = 50; - - /** Keeps track of how many bytes of throughput have been seen in a given timestamp. */ - private static class ThroughputEntry implements Serializable { - - private static final long serialVersionUID = 3752325891215855332L; - - private final Instant instant; - private BigDecimal bytes; - - public ThroughputEntry(Instant instant, long bytes) { - this.instant = instant; - this.bytes = BigDecimal.valueOf(bytes); - } - - public Instant getTimestamp() { - return instant; - } - - public long getSeconds() { - return TimestampConverter.toSeconds(instant); - } - - public BigDecimal getBytes() { - return bytes; - } - - public void addBytes(long bytesToAdd) { - bytes = bytes.add(BigDecimal.valueOf(bytesToAdd)); - } - } - - // The deque holds a number of windows in the past in order to calculate - // a rolling windowing throughput. - private final Deque deque; - // The number of seconds to be accounted for when calculating the throughput - private final int windowSizeSeconds; - // Estimates the size in bytes of throughput elements private final SizeEstimator sizeEstimator; private final int sampleRate; - private final Random random; + private long elementCount; + private BigDecimal currentElementSizeEstimate; + private final Instant startTimestamp; + private Instant lastElementTimestamp; + private BigDecimal totalThroughputEstimate; - public BytesThroughputEstimator(int windowSizeSeconds, SizeEstimator sizeEstimator) { - this(windowSizeSeconds, sizeEstimator, DEFAULT_SAMPLE_RATE); + public BytesThroughputEstimator( + SizeEstimator sizeEstimator, @Nullable Instant lastRunTimestamp) { + this( + sizeEstimator, + DEFAULT_SAMPLE_RATE, + lastRunTimestamp != null ? lastRunTimestamp : Instant.now()); } @VisibleForTesting public BytesThroughputEstimator( - int windowSizeSeconds, SizeEstimator sizeEstimator, int sampleRate) { - this.deque = new ArrayDeque<>(); - this.windowSizeSeconds = windowSizeSeconds; + SizeEstimator sizeEstimator, int sampleRate, Instant startTimestamp) { this.sizeEstimator = sizeEstimator; this.sampleRate = sampleRate; - this.random = new Random(); + this.startTimestamp = startTimestamp; + this.elementCount = 0; + this.currentElementSizeEstimate = BigDecimal.ZERO; + lastElementTimestamp = this.startTimestamp; + totalThroughputEstimate = BigDecimal.ZERO; } /** @@ -108,66 +77,31 @@ public BytesThroughputEstimator( * @param timeOfRecords the committed timestamp of the records * @param element the element to estimate the byte size of */ - @SuppressWarnings("nullness") // queue is never null, nor the peeked element - @Override public void update(Instant timeOfRecords, T element) { - if (random.nextInt(sampleRate) == 0) { - long bytes = sizeEstimator.sizeOf(element); - synchronized (deque) { - if (deque.isEmpty() - || TimestampConverter.toSeconds(timeOfRecords) > deque.getLast().getSeconds()) { - deque.addLast(new ThroughputEntry(timeOfRecords, bytes)); - } else { - deque.getLast().addBytes(bytes); - } - cleanQueue(deque.getLast().getTimestamp()); - } - } - } - - /** Returns the estimated throughput bytes for now. */ - @Override - public double get() { - return getFrom(Instant.now()); - } - - /** - * Returns the estimated throughput bytes for a specified time. - * - * @param time the specified timestamp to check throughput - */ - @Override - public double getFrom(Instant time) { - synchronized (deque) { - cleanQueue(time); - if (deque.size() == 0) { - return 0D; - } - BigDecimal throughput = BigDecimal.ZERO; - for (ThroughputEntry entry : deque) { - throughput = throughput.add(entry.getBytes()); - } - return throughput - // Prevents negative values - .max(BigDecimal.ZERO) - .divide(BigDecimal.valueOf(windowSizeSeconds), MathContext.DECIMAL128) - .multiply(BigDecimal.valueOf(sampleRate)) - // Cap it to Double.MAX_VALUE - .min(MAX_DOUBLE) - .doubleValue(); + // Always updates on first element re-estimates size based on sample rate. + // This is expensive so we avoid doing it too often. + if (elementCount % sampleRate == 0) { + currentElementSizeEstimate = BigDecimal.valueOf(sizeEstimator.sizeOf(element)); } + lastElementTimestamp = timeOfRecords; + elementCount += 1; + totalThroughputEstimate = totalThroughputEstimate.add(currentElementSizeEstimate); } - private void cleanQueue(Instant time) { - while (deque.size() > 0) { - final ThroughputEntry entry = deque.getFirst(); - if (entry != null - && entry.getSeconds() >= TimestampConverter.toSeconds(time) - windowSizeSeconds) { - break; - } - // Remove the element if the timestamp of the first element is beyond - // the time range to look backward. - deque.removeFirst(); + /** Returns the estimated throughput bytes for this run. */ + public BigDecimal get() { + if (elementCount == 0) { + return BigDecimal.ZERO; + } else { + BigDecimal processingTimeMillis = + BigDecimal.valueOf(new Duration(startTimestamp, lastElementTimestamp).getMillis()) + // Avoid divide by zero by rounding up to 1 ms when the difference is less + // than a full millisecond + .max(BigDecimal.ONE); + + return totalThroughputEstimate + .divide(processingTimeMillis, 3, RoundingMode.DOWN) + .multiply(BigDecimal.valueOf(1000)); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java new file mode 100644 index 0000000000000..528bbd7baf88b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/CoderSizeEstimator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * This class is used to estimate the size in bytes of a given element. It uses the given {@link + * Coder} to calculate the size of the element. + */ +@Internal +public class CoderSizeEstimator implements SizeEstimator, Serializable { + + private static final long serialVersionUID = 5564948506493524158L; + + private static class SizeEstimatorObserver extends ElementByteSizeObserver + implements Serializable { + + private static final long serialVersionUID = 4569562919962045617L; + private long observedBytes; + + @Override + protected void reportElementSize(long elementByteSize) { + observedBytes = elementByteSize; + } + } + + private final Coder coder; + private final SizeEstimatorObserver observer; + + public CoderSizeEstimator(Coder coder) { + this.coder = coder; + this.observer = new SizeEstimatorObserver(); + } + + /** + * Estimates the size in bytes of the given element with the configured {@link Coder} . + * + * @param element the element instance to be estimated + * @return the estimated size in bytes of the given element + */ + @Override + public long sizeOf(T element) { + try { + coder.registerByteSizeObserver(element, observer); + observer.advance(); + + return observer.observedBytes; + } catch (Exception e) { + throw new EncodingException(e); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimator.java similarity index 59% rename from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimator.java rename to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimator.java index 98b5761feda72..3af627e12ec12 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimator.java @@ -18,43 +18,23 @@ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; import org.apache.beam.sdk.annotations.Internal; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * NoOp implementation of a throughput estimator. This will always return 0 as the throughput and it - * will warn users that this is being used (it should not be used in production). + * NoOp implementation of a size estimator. This will always return 0 as the size and it will warn + * users that this is being used (it should not be used in production). */ @Internal -public class NullThroughputEstimator implements ThroughputEstimator { +public class NullSizeEstimator implements SizeEstimator { private static final long serialVersionUID = 7088120208289907630L; - private static final Logger LOG = LoggerFactory.getLogger(NullThroughputEstimator.class); + private static final Logger LOG = LoggerFactory.getLogger(NullSizeEstimator.class); - /** - * NoOp. - * - * @param timeOfRecords ignored - * @param element ignored - */ @Override - public void update(Instant timeOfRecords, T element) { + public long sizeOf(T element) { LOG.warn( - "Trying to update throughput using {}, this operation will have no effect", - this.getClass().getSimpleName()); - } - - /** - * Always returns 0. - * - * @param time ignored - * @return 0 - */ - @Override - public double getFrom(Instant time) { - LOG.warn( - "Trying to retrieve throughput using {}, this operation will always return 0", + "Trying to estimate size using {}, this operation will always return 0", this.getClass().getSimpleName()); return 0; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java index 30d6245059653..b79013f41af2f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/SizeEstimator.java @@ -17,54 +17,6 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.util.common.ElementByteSizeObserver; - -/** - * This class is used to estimate the size in bytes of a given element. It uses the given {@link - * Coder} to calculate the size of the element. - */ -@Internal -public class SizeEstimator implements Serializable { - - private static final long serialVersionUID = 5564948506493524158L; - - private static class SizeEstimatorObserver extends ElementByteSizeObserver - implements Serializable { - - private static final long serialVersionUID = 4569562919962045617L; - private long observedBytes; - - @Override - protected void reportElementSize(long elementByteSize) { - observedBytes = elementByteSize; - } - } - - private final Coder coder; - private final SizeEstimatorObserver observer; - - public SizeEstimator(Coder coder) { - this.coder = coder; - this.observer = new SizeEstimatorObserver(); - } - - /** - * Estimates the size in bytes of the given element with the configured {@link Coder} . - * - * @param element the element instance to be estimated - * @return the estimated size in bytes of the given element - */ - public long sizeOf(T element) { - try { - coder.registerByteSizeObserver(element, observer); - observer.advance(); - - return observer.observedBytes; - } catch (Exception e) { - throw new EncodingException(e); - } - } +public interface SizeEstimator { + long sizeOf(T element); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java deleted file mode 100644 index aeedb31ed134a..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/ThroughputEstimator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Internal; -import org.joda.time.Instant; - -/** An estimator to calculate the throughput of the outputted elements from a DoFn. */ -@Internal -public interface ThroughputEstimator extends Serializable { - /** - * Updates the estimator with the size of the records. - * - * @param timeOfRecords the committed timestamp of the records - * @param element the element to estimate the byte size of - */ - void update(Instant timeOfRecords, T element); - - /** Returns the estimated throughput for now. */ - default double get() { - return getFrom(Instant.now()); - } - - /** - * Returns the estimated throughput for a specified time. - * - * @param time the specified timestamp to check throughput - */ - double getFrom(Instant time); -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java index ce2cf11a842e5..fe3cdb15a2382 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java @@ -20,6 +20,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; import com.google.cloud.bigtable.data.v2.models.CloseStream; import java.io.Serializable; +import java.math.BigDecimal; import java.util.Objects; import org.apache.beam.sdk.annotations.Internal; import org.checkerframework.checker.nullness.qual.Nullable; @@ -41,15 +42,25 @@ public class StreamProgress implements Serializable { private @Nullable ChangeStreamContinuationToken currentToken; private @Nullable Instant estimatedLowWatermark; + private @Nullable BigDecimal throughputEstimate; + private @Nullable Instant lastRunTimestamp; private @Nullable CloseStream closeStream; private boolean failToLock; + private boolean isHeartbeat; public StreamProgress() {} public StreamProgress( - @Nullable ChangeStreamContinuationToken token, Instant estimatedLowWatermark) { + @Nullable ChangeStreamContinuationToken token, + Instant estimatedLowWatermark, + BigDecimal throughputEstimate, + Instant lastRunTimestamp, + boolean isHeartbeat) { this.currentToken = token; this.estimatedLowWatermark = estimatedLowWatermark; + this.throughputEstimate = throughputEstimate; + this.lastRunTimestamp = lastRunTimestamp; + this.isHeartbeat = isHeartbeat; } public StreamProgress(@Nullable CloseStream closeStream) { @@ -64,6 +75,14 @@ public StreamProgress(@Nullable CloseStream closeStream) { return estimatedLowWatermark; } + public @Nullable BigDecimal getThroughputEstimate() { + return throughputEstimate; + } + + public @Nullable Instant getLastRunTimestamp() { + return lastRunTimestamp; + } + public @Nullable CloseStream getCloseStream() { return closeStream; } @@ -76,6 +95,10 @@ public void setFailToLock(boolean failToLock) { this.failToLock = failToLock; } + public boolean isHeartbeat() { + return this.isHeartbeat; + } + public boolean isEmpty() { return closeStream == null && currentToken == null; } @@ -92,7 +115,10 @@ public boolean equals(@Nullable Object o) { return Objects.equals(getCurrentToken(), that.getCurrentToken()) && Objects.equals(getEstimatedLowWatermark(), that.getEstimatedLowWatermark()) && Objects.equals(getCloseStream(), that.getCloseStream()) - && (isFailToLock() == that.isFailToLock()); + && (isFailToLock() == that.isFailToLock()) + && Objects.equals(getThroughputEstimate(), that.getThroughputEstimate()) + && Objects.equals(getLastRunTimestamp(), that.getLastRunTimestamp()) + && (isHeartbeat() == that.isHeartbeat()); } @Override @@ -111,6 +137,12 @@ public String toString() { + closeStream + ", failToLock=" + failToLock + + ", throughputEstimate=" + + throughputEstimate + + ", lastRunTimestamp=" + + lastRunTimestamp + + ", isHeartbeat=" + + isHeartbeat + '}'; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java index 6fe272009ccb4..2e340c2b30b5b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamActionTest.java @@ -30,15 +30,17 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.Heartbeat; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.protobuf.ByteString; import com.google.rpc.Status; +import java.math.BigDecimal; import java.util.Collections; import java.util.Optional; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.ThroughputEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -49,8 +51,13 @@ import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.Silent.class) public class ChangeStreamActionTest { private ChangeStreamMetrics metrics; @@ -58,9 +65,10 @@ public class ChangeStreamActionTest { private RestrictionTracker tracker; private PartitionRecord partitionRecord; - private DoFn.OutputReceiver> receiver; + private DoFn.OutputReceiver> receiver; private ManualWatermarkEstimator watermarkEstimator; - private ThroughputEstimator> throughputEstimator; + private BytesThroughputEstimator> throughputEstimator; + @Captor private ArgumentCaptor streamProgressArgumentCaptor; @Before public void setUp() { @@ -69,10 +77,12 @@ public void setUp() { partitionRecord = mock(PartitionRecord.class); receiver = mock(DoFn.OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); - throughputEstimator = mock(ThroughputEstimator.class); + throughputEstimator = mock(BytesThroughputEstimator.class); - action = new ChangeStreamAction(metrics, throughputEstimator); + action = new ChangeStreamAction(metrics); when(tracker.tryClaim(any())).thenReturn(true); + when(partitionRecord.getPartition()).thenReturn(ByteStringRange.create("a", "b")); + when(throughputEstimator.get()).thenReturn(BigDecimal.valueOf(1000)); } @Test @@ -87,14 +97,41 @@ public void testHeartBeat() { .thenReturn(changeStreamContinuationToken); final Optional result = - action.run(partitionRecord, mockHeartBeat, tracker, receiver, watermarkEstimator, false); + action.run( + partitionRecord, + mockHeartBeat, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertFalse(result.isPresent()); verify(metrics).incHeartbeatCount(); verify(watermarkEstimator).setWatermark(eq(lowWatermark)); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark); - verify(tracker).tryClaim(eq(streamProgress)); - verify(throughputEstimator, never()).update(any(), any()); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + lowWatermark, + BigDecimal.valueOf(1000), + Instant.now(), + true); + verify(tracker).tryClaim(streamProgressArgumentCaptor.capture()); + assertEquals( + streamProgress.getCurrentToken(), + streamProgressArgumentCaptor.getValue().getCurrentToken()); + assertEquals( + streamProgress.getThroughputEstimate(), + streamProgressArgumentCaptor.getValue().getThroughputEstimate()); + assertEquals( + streamProgress.getEstimatedLowWatermark(), + streamProgressArgumentCaptor.getValue().getEstimatedLowWatermark()); + assertEquals( + streamProgress.isHeartbeat(), streamProgressArgumentCaptor.getValue().isHeartbeat()); + KV record = + KV.of(ByteStringRange.serializeToByteString(partitionRecord.getPartition()), mockHeartBeat); + verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH)); + verify(throughputEstimator).update(any(), eq(record)); } @Test @@ -109,14 +146,20 @@ public void testCloseStreamResume() { .thenReturn(Collections.singletonList(changeStreamContinuationToken)); final Optional result = - action.run(partitionRecord, mockCloseStream, tracker, receiver, watermarkEstimator, false); + action.run( + partitionRecord, + mockCloseStream, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertTrue(result.isPresent()); assertEquals(DoFn.ProcessContinuation.resume(), result.get()); verify(metrics).incClosestreamCount(); StreamProgress streamProgress = new StreamProgress(mockCloseStream); verify(tracker).tryClaim(eq(streamProgress)); - verify(throughputEstimator, never()).update(any(), any()); } @Test @@ -134,18 +177,41 @@ public void testChangeStreamMutationUser() { Mockito.when(changeStreamMutation.getEstimatedLowWatermark()) .thenReturn(toThreetenInstant(lowWatermark)); Mockito.when(changeStreamMutation.getType()).thenReturn(ChangeStreamMutation.MutationType.USER); - KV record = + KV record = KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); final Optional result = action.run( - partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false); + partitionRecord, + changeStreamMutation, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertFalse(result.isPresent()); verify(metrics).incChangeStreamMutationUserCounter(); verify(metrics, never()).incChangeStreamMutationGcCounter(); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark); - verify(tracker).tryClaim(eq(streamProgress)); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + lowWatermark, + BigDecimal.valueOf(1000), + Instant.now(), + false); + verify(tracker).tryClaim(streamProgressArgumentCaptor.capture()); + assertEquals( + streamProgress.getCurrentToken(), + streamProgressArgumentCaptor.getValue().getCurrentToken()); + assertEquals( + streamProgress.getThroughputEstimate(), + streamProgressArgumentCaptor.getValue().getThroughputEstimate()); + assertEquals( + streamProgress.getEstimatedLowWatermark(), + streamProgressArgumentCaptor.getValue().getEstimatedLowWatermark()); + assertEquals( + streamProgress.isHeartbeat(), streamProgressArgumentCaptor.getValue().isHeartbeat()); verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH)); verify(watermarkEstimator).setWatermark(eq(lowWatermark)); verify(throughputEstimator).update(any(), eq(record)); @@ -167,18 +233,41 @@ public void testChangeStreamMutationGc() { .thenReturn(toThreetenInstant(lowWatermark)); Mockito.when(changeStreamMutation.getType()) .thenReturn(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION); - KV record = + KV record = KV.of(changeStreamMutation.getRowKey(), changeStreamMutation); final Optional result = action.run( - partitionRecord, changeStreamMutation, tracker, receiver, watermarkEstimator, false); + partitionRecord, + changeStreamMutation, + tracker, + receiver, + watermarkEstimator, + throughputEstimator, + false); assertFalse(result.isPresent()); verify(metrics).incChangeStreamMutationGcCounter(); verify(metrics, never()).incChangeStreamMutationUserCounter(); - StreamProgress streamProgress = new StreamProgress(changeStreamContinuationToken, lowWatermark); - verify(tracker).tryClaim(eq(streamProgress)); + StreamProgress streamProgress = + new StreamProgress( + changeStreamContinuationToken, + lowWatermark, + BigDecimal.valueOf(1000), + Instant.now(), + false); + verify(tracker).tryClaim(streamProgressArgumentCaptor.capture()); + assertEquals( + streamProgress.getCurrentToken(), + streamProgressArgumentCaptor.getValue().getCurrentToken()); + assertEquals( + streamProgress.getThroughputEstimate(), + streamProgressArgumentCaptor.getValue().getThroughputEstimate()); + assertEquals( + streamProgress.getEstimatedLowWatermark(), + streamProgressArgumentCaptor.getValue().getEstimatedLowWatermark()); + assertEquals( + streamProgress.isHeartbeat(), streamProgressArgumentCaptor.getValue().isHeartbeat()); verify(receiver).outputWithTimestamp(eq(record), eq(Instant.EPOCH)); verify(watermarkEstimator).setWatermark(eq(lowWatermark)); verify(throughputEstimator).update(any(), eq(record)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java index f997db07a8aea..305be15688516 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java @@ -30,7 +30,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; -import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.Heartbeat; @@ -45,6 +44,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; @@ -71,8 +71,9 @@ public class ReadChangeStreamPartitionActionTest { // private PartitionRecord partitionRecord; private StreamProgress restriction; private RestrictionTracker tracker; - private DoFn.OutputReceiver> receiver; + private DoFn.OutputReceiver> receiver; private ManualWatermarkEstimator watermarkEstimator; + private CoderSizeEstimator> sizeEstimator; private ByteStringRange partition; private String uuid; @@ -84,11 +85,17 @@ public void setUp() throws Exception { changeStreamDao = mock(ChangeStreamDao.class); metrics = mock(ChangeStreamMetrics.class); changeStreamAction = mock(ChangeStreamAction.class); + sizeEstimator = mock(CoderSizeEstimator.class); Duration heartbeatDuration = Duration.standardSeconds(1); action = new ReadChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); restriction = mock(StreamProgress.class); tracker = mock(ReadChangeStreamPartitionProgressTracker.class); @@ -124,7 +131,7 @@ public void testLockingRowSucceed() throws IOException { when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), anyBoolean())) .thenReturn(responses); - when(changeStreamAction.run(any(), any(), any(), any(), any(), anyBoolean())) + when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), anyBoolean())) .thenReturn(Optional.of(DoFn.ProcessContinuation.stop())); final DoFn.ProcessContinuation result = @@ -133,7 +140,7 @@ public void testLockingRowSucceed() throws IOException { assertEquals(DoFn.ProcessContinuation.stop(), result); // Verify that on successful lock, we don't tryClaim on the tracker. verify(tracker, never()).tryClaim(any()); - verify(changeStreamAction).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -150,7 +157,7 @@ public void testLockingRowFailsStops() throws IOException { StreamProgress streamProgress = new StreamProgress(); streamProgress.setFailToLock(true); verify(tracker).tryClaim(streamProgress); - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -170,7 +177,7 @@ public void testLockingRowNotNeededAfterFirstRun() throws IOException { when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), anyBoolean())) .thenReturn(responses); - when(changeStreamAction.run(any(), any(), any(), any(), any(), anyBoolean())) + when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), anyBoolean())) .thenReturn(Optional.of(DoFn.ProcessContinuation.stop())); final DoFn.ProcessContinuation result = @@ -179,7 +186,7 @@ public void testLockingRowNotNeededAfterFirstRun() throws IOException { assertEquals(DoFn.ProcessContinuation.stop(), result); // Verify that on successful lock, we don't tryClaim on the tracker. verify(tracker, never()).tryClaim(any()); - verify(changeStreamAction).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -198,7 +205,7 @@ public void testLockingRowNotNeededAfterFirstRunNotSame() throws IOException { StreamProgress streamProgress = new StreamProgress(); streamProgress.setFailToLock(true); verify(tracker).tryClaim(streamProgress); - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -217,13 +224,13 @@ public void testThatChangeStreamWorkerCounterIsIncrementedOnInitialRun() throws when(changeStreamDao.readChangeStreamPartition(any(), any(), any(), any(), anyBoolean())) .thenReturn(responses); - when(changeStreamAction.run(any(), any(), any(), any(), any(), anyBoolean())) + when(changeStreamAction.run(any(), any(), any(), any(), any(), any(), anyBoolean())) .thenReturn(Optional.of(DoFn.ProcessContinuation.stop())); final DoFn.ProcessContinuation result = action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); - verify(changeStreamAction).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction).run(any(), any(), any(), any(), any(), any(), anyBoolean()); } @Test @@ -240,7 +247,7 @@ public void testCloseStreamTerminateOKStatus() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -266,7 +273,7 @@ public void testCloseStreamTerminateNotOutOfRangeStatus() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -301,7 +308,7 @@ public void testCloseStreamWritesContinuationTokens() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -353,7 +360,7 @@ public void testCloseStreamNewPartitionMerge() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. @@ -399,7 +406,7 @@ public void testCloseStreamMergeWithoutNewPartitionsField() throws IOException { action.run(partitionRecord, tracker, receiver, watermarkEstimator); assertEquals(DoFn.ProcessContinuation.stop(), result); // Should terminate before reaching processing stream partition responses. - verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), anyBoolean()); + verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any(), anyBoolean()); // Should not try claim any restriction when processing CloseStream verify(tracker, (never())).tryClaim(any()); // Should decrement the metric on termination. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java new file mode 100644 index 0000000000000..46fe78dd878e9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/FilterForMutationDoFnTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.CloseStream; +import com.google.cloud.bigtable.data.v2.models.Heartbeat; +import com.google.protobuf.ByteString; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class FilterForMutationDoFnTest { + + private FilterForMutationDoFn doFn; + @Mock private DoFn.OutputReceiver> outputReceiver; + + @Before + public void setup() { + doFn = new FilterForMutationDoFn(); + } + + @Test + public void shouldNotOutputHeartbeats() { + Heartbeat heartbeat = mock(Heartbeat.class); + doFn.processElement(KV.of(ByteString.copyFromUtf8("test"), heartbeat), outputReceiver); + verify(outputReceiver, never()).output(any()); + } + + @Test + public void shouldOutputChangeStreamMutations() { + ChangeStreamMutation mutation = mock(ChangeStreamMutation.class); + doFn.processElement(KV.of(ByteString.copyFromUtf8("test"), mutation), outputReceiver); + verify(outputReceiver, times(1)).output(KV.of(ByteString.copyFromUtf8("test"), mutation)); + } + + @Test + public void shouldOutputCloseStreams() { + // This shouldn't happen but if it were to we wouldn't want the CloseStreams to be returned to + // users + CloseStream closeStream = mock(CloseStream.class); + doFn.processElement(KV.of(ByteString.copyFromUtf8("test"), closeStream), outputReceiver); + verify(outputReceiver, never()).output(any()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index e8f59c2bd1776..cb82fc6d19ca6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -33,6 +33,7 @@ import com.google.cloud.bigtable.data.v2.models.Range; import com.google.protobuf.ByteString; import java.io.IOException; +import java.math.BigDecimal; import java.util.Collections; import java.util.Iterator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; @@ -42,8 +43,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.SizeEstimator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEstimator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; @@ -58,7 +58,7 @@ public class ReadChangeStreamPartitionDoFnTest { private ChangeStreamDao changeStreamDao; private MetadataTableDao metadataTableDao; - private SizeEstimator> sizeEstimator; + private CoderSizeEstimator> sizeEstimator; private ReadChangeStreamPartitionDoFn doFn; @Before @@ -74,22 +74,28 @@ public void setup() throws IOException { ActionFactory actionFactory = mock(ActionFactory.class); ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); - sizeEstimator = mock(SizeEstimator.class); - BytesThroughputEstimator> throughputEstimator = - new BytesThroughputEstimator<>( - ReadChangeStreamPartitionDoFn.THROUGHPUT_ESTIMATION_WINDOW_SECONDS, sizeEstimator, 1); - ChangeStreamAction changeStreamAction = new ChangeStreamAction(metrics, throughputEstimator); + sizeEstimator = mock(CoderSizeEstimator.class); + ChangeStreamAction changeStreamAction = new ChangeStreamAction(metrics); ReadChangeStreamPartitionAction readChangeStreamPartitionAction = new ReadChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration); - when(actionFactory.changeStreamAction(metrics, throughputEstimator)) - .thenReturn(changeStreamAction); + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator); + when(actionFactory.changeStreamAction(metrics)).thenReturn(changeStreamAction); when(actionFactory.readChangeStreamPartitionAction( - metadataTableDao, changeStreamDao, metrics, changeStreamAction, heartbeatDuration)) + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDuration, + sizeEstimator)) .thenReturn(readChangeStreamPartitionAction); doFn = new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics); - doFn.setThroughputEstimator(throughputEstimator); + doFn.setSizeEstimator(sizeEstimator); } @Test @@ -112,7 +118,7 @@ public void testProcessElementAndGetSize() throws IOException, InterruptedExcept ReadChangeStreamPartitionProgressTracker restrictionTracker = mock(ReadChangeStreamPartitionProgressTracker.class); when(restrictionTracker.currentRestriction()).thenReturn(new StreamProgress()); - DoFn.OutputReceiver> receiver = + DoFn.OutputReceiver> receiver = mock(DoFn.OutputReceiver.class); ManualWatermarkEstimator watermarkEstimator = mock(ManualWatermarkEstimator.class); doFn.setup(); @@ -138,10 +144,43 @@ public void testProcessElementAndGetSize() throws IOException, InterruptedExcept when(restrictionTracker.tryClaim(any())).thenReturn(true, true, false); doFn.processElement(partition, restrictionTracker, receiver, watermarkEstimator); - double sizeEstimate = doFn.getSize(new StreamProgress(testToken, tenSecondsAgo)); + double sizeEstimate = + doFn.getSize( + new StreamProgress( + testToken, tenSecondsAgo, BigDecimal.valueOf(20), Instant.now(), false)); // we should have output 2 100B mutations in the past 10s long bytesPerSecond = (mutationSize * 2) / 10; - assertEquals(sizeEstimate, bytesPerSecond * watermarkLag, .1d); + assertEquals(sizeEstimate, bytesPerSecond * watermarkLag, 10); verify(receiver, times(2)).outputWithTimestamp(KV.of(rowKey, mockMutation), Instant.EPOCH); } + + @Test + public void testGetSizeCantBeNegative() throws IOException { + long mutationSize = 100L; + when(sizeEstimator.sizeOf(any())).thenReturn(mutationSize); + Range.ByteStringRange partitionRange = Range.ByteStringRange.create("", ""); + ChangeStreamContinuationToken testToken = + ChangeStreamContinuationToken.create(partitionRange, "test"); + doFn.setup(); + + double mutationEstimate = + doFn.getSize( + new StreamProgress( + testToken, + Instant.now().plus(Duration.standardMinutes(10)), + BigDecimal.valueOf(1000), + Instant.now().plus(Duration.standardMinutes(10)), + false)); + assertEquals(0, mutationEstimate, 0); + + double heartbeatEstimate = + doFn.getSize( + new StreamProgress( + testToken, + Instant.now().plus(Duration.standardMinutes(10)), + BigDecimal.valueOf(1000), + Instant.now().plus(Duration.standardMinutes(10)), + true)); + assertEquals(0, heartbeatEstimate, 0); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java index e10f730720668..90dd863bba106 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/BytesThroughputEstimatorTest.java @@ -29,113 +29,79 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils; import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; public class BytesThroughputEstimatorTest { private static final double DELTA = 1e-10; - private static final int WINDOW_SIZE_SECONDS = 10; - private BytesThroughputEstimator estimator; - - @Before - public void setup() { - final SizeEstimator sizeEstimator = new SizeEstimator<>(new TestCoder()); - estimator = new BytesThroughputEstimator<>(WINDOW_SIZE_SECONDS, sizeEstimator, 1); - } + private final SizeEstimator sizeEstimator = new CoderSizeEstimator<>(new TestCoder()); @Test public void testThroughputIsZeroWhenNothingsBeenRegistered() { - assertEquals(0D, estimator.get(), DELTA); - assertEquals(0D, estimator.getFrom(Instant.now()), DELTA); + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, Instant.now()); + assertEquals(0D, estimator.get().doubleValue(), DELTA); } @Test public void testThroughputCalculation() { + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(0)); estimator.update(Instant.ofEpochSecond(2), new byte[10]); estimator.update(Instant.ofEpochSecond(3), new byte[20]); estimator.update(Instant.ofEpochSecond(5), new byte[30]); estimator.update(Instant.ofEpochSecond(10), new byte[40]); // (10 + 20 + 30 + 40) / 10 sec window = 10 - assertEquals(10D, estimator.getFrom(Instant.ofEpochSecond(11)), DELTA); - - estimator.update(Instant.ofEpochSecond(20), new byte[10]); - estimator.update(Instant.ofEpochSecond(21), new byte[20]); - estimator.update(Instant.ofEpochSecond(21), new byte[10]); - estimator.update(Instant.ofEpochSecond(29), new byte[40]); + assertEquals(10D, estimator.get().doubleValue(), DELTA); + + BytesThroughputEstimator estimator2 = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(20)); + estimator2.update(Instant.ofEpochSecond(21), new byte[10]); + estimator2.update(Instant.ofEpochSecond(22), new byte[20]); + estimator2.update(Instant.ofEpochSecond(23), new byte[10]); + estimator2.update(Instant.ofEpochSecond(30), new byte[40]); // (10 + 20 + 10 + 40) / 10 sec window = 8 - assertEquals(8D, estimator.getFrom(Instant.ofEpochSecond(30)), DELTA); + assertEquals(8D, estimator2.get().doubleValue(), DELTA); - estimator.update(Instant.ofEpochSecond(31), new byte[10]); - estimator.update(Instant.ofEpochSecond(35), new byte[40]); + BytesThroughputEstimator estimator3 = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(30)); + estimator3.update(Instant.ofEpochSecond(31), new byte[10]); + estimator3.update(Instant.ofEpochSecond(40), new byte[40]); // (10 + 40) / 10 sec window = 5 - assertEquals(5D, estimator.getFrom(Instant.ofEpochSecond(41)), DELTA); - - // No values in the past 10 seconds - assertEquals(0D, estimator.getFrom(Instant.ofEpochSecond(50)), DELTA); + assertEquals(5D, estimator3.get().doubleValue(), DELTA); } @Test public void testThroughputIsAccumulatedWithin60SecondsWindow() { - List> pairs = generateTestData(100, 0, 10); + List> pairs = generateTestData(100, 0, 11); pairs.sort(Comparator.comparing(ImmutablePair::getLeft)); - final Instant lastUpdateTimestamp = pairs.get(pairs.size() - 1).getLeft(); BigDecimal sum = BigDecimal.valueOf(0L); for (ImmutablePair pair : pairs) { sum = sum.add(BigDecimal.valueOf(pair.getRight().length)); } - final BigDecimal want = - sum.divide(BigDecimal.valueOf(WINDOW_SIZE_SECONDS), MathContext.DECIMAL128); + final BigDecimal want = sum.divide(BigDecimal.valueOf(10), MathContext.DECIMAL128); + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(0)); for (ImmutablePair pair : pairs) { estimator.update(pair.getLeft(), pair.getRight()); } - double actual = estimator.getFrom(Instant.ofEpochSecond(10)); - assertEquals(want.doubleValue(), actual, DELTA); - - // After window without updates the throughput should be zero - final Instant afterWindowTimestamp = - Instant.ofEpochSecond( - TimestampConverter.toSeconds(lastUpdateTimestamp) + WINDOW_SIZE_SECONDS + 1); - assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA); + double actual = estimator.get().doubleValue(); + assertEquals(want.doubleValue(), actual, 1); } @Test - public void testThroughputIsAccumulatedWithin50SecondsWindow() { - final List> excludedPairs = generateTestData(300, 0, 10); - final List> expectedPairs = generateTestData(50, 10, 20); - final List> pairs = - Stream.concat(excludedPairs.stream(), expectedPairs.stream()) - .sorted(Comparator.comparing(ImmutablePair::getLeft)) - .collect(Collectors.toList()); - final Instant lastUpdateTimestamp = pairs.get(pairs.size() - 1).getLeft(); - - BigDecimal sum = BigDecimal.valueOf(0L); - for (ImmutablePair pair : expectedPairs) { - sum = sum.add(BigDecimal.valueOf(pair.getRight().length)); - } - final BigDecimal want = - sum.divide(BigDecimal.valueOf(WINDOW_SIZE_SECONDS), MathContext.DECIMAL128); - for (ImmutablePair pair : pairs) { - estimator.update(pair.getLeft(), pair.getRight()); - } - - double actual = estimator.getFrom(Instant.ofEpochSecond(20)); - assertEquals(want.doubleValue(), actual, DELTA); - - // After window without updates the throughput should be zero - final Instant afterWindowTimestamp = - Instant.ofEpochSecond( - TimestampConverter.toSeconds(lastUpdateTimestamp) + WINDOW_SIZE_SECONDS + 1); - assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA); + public void testThroughputHandlesNoTimeDifference() { + BytesThroughputEstimator estimator = + new BytesThroughputEstimator<>(sizeEstimator, 1, Instant.ofEpochSecond(0)); + estimator.update(Instant.ofEpochSecond(0), new byte[10]); + // (10 / 1) * 1000 because we round up to one millisecond + assertEquals(10000D, estimator.get().doubleValue(), DELTA); } private List> generateTestData( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimatorTest.java similarity index 68% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimatorTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimatorTest.java index 266dc1255e529..a229c0e66bef3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullThroughputEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/estimator/NullSizeEstimatorTest.java @@ -19,20 +19,16 @@ import static org.junit.Assert.assertEquals; -import org.joda.time.Instant; import org.junit.Test; -public class NullThroughputEstimatorTest { +public class NullSizeEstimatorTest { private static final double DELTA = 1e-10; @Test public void alwaysReturns0AsEstimatedThroughput() { - final NullThroughputEstimator estimator = new NullThroughputEstimator<>(); - assertEquals(estimator.get(), 0D, DELTA); - - estimator.update(Instant.ofEpochSecond(1), new byte[10]); - assertEquals(estimator.getFrom(Instant.ofEpochSecond(1)), 0D, DELTA); - estimator.update(Instant.ofEpochSecond(2), new byte[20]); - assertEquals(estimator.getFrom(Instant.ofEpochSecond(2)), 0D, DELTA); + final NullSizeEstimator estimator = new NullSizeEstimator<>(); + assertEquals(estimator.sizeOf(new byte[40]), 0D, DELTA); + assertEquals(estimator.sizeOf(new byte[20]), 0D, DELTA); + assertEquals(estimator.sizeOf(new byte[10]), 0D, DELTA); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java index b0bc8de7c8114..e2f0cbb62a614 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTrackerTest.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken; import com.google.cloud.bigtable.data.v2.models.Range; +import java.math.BigDecimal; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.joda.time.Instant; import org.junit.Test; @@ -41,7 +42,8 @@ public void testTryClaim() { ChangeStreamContinuationToken changeStreamContinuationToken = ChangeStreamContinuationToken.create(Range.ByteStringRange.create("a", "b"), "1234"); final StreamProgress streamProgress2 = - new StreamProgress(changeStreamContinuationToken, Instant.now()); + new StreamProgress( + changeStreamContinuationToken, Instant.now(), BigDecimal.ONE, Instant.now(), false); assertTrue(tracker.tryClaim(streamProgress2)); assertEquals(streamProgress2, tracker.currentRestriction()); assertEquals(