From 16aa63fad126b4cc6329ae64e62006ef6e553388 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Mon, 1 Aug 2022 18:00:20 -0400 Subject: [PATCH 1/4] Add ChangeStreamRecordAdapter and ChangeStreamStateMachine These will be used later for ChangeStreamMergingCallable. --- .../data/v2/models/ChangeStreamMutation.java | 20 +- .../v2/models/ChangeStreamRecordAdapter.java | 151 +++++ .../DefaultChangeStreamRecordAdapter.java | 175 ++++++ .../ChangeStreamStateMachine.java | 581 ++++++++++++++++++ .../v2/models/ChangeStreamMutationTest.java | 20 +- .../DefaultChangeStreamRecordAdapterTest.java | 352 +++++++++++ .../ChangeStreamStateMachineTest.java | 61 ++ 7 files changed, 1339 insertions(+), 21 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index b79b184e7a..10571ecd1f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -321,14 +321,28 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o; - return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode()); + ChangeStreamMutation other = (ChangeStreamMutation) o; + return Objects.equal(this.rowKey, other.rowKey) + && Objects.equal(this.type, other.type) + && Objects.equal(this.sourceClusterId, other.sourceClusterId) + && Objects.equal(this.commitTimestamp, other.commitTimestamp) + && Objects.equal(this.tieBreaker, other.tieBreaker) + && Objects.equal(this.token, other.token) + && Objects.equal(this.lowWatermark, other.lowWatermark) + && Objects.equal(this.entries.build(), other.entries.build()); } @Override public int hashCode() { return Objects.hashCode( - rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries); + rowKey, + type, + sourceClusterId, + commitTimestamp, + tieBreaker, + token, + lowWatermark, + entries.build()); } @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java new file mode 100644 index 0000000000..2aa6cc334f --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -0,0 +1,151 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import com.google.api.core.InternalApi; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import javax.annotation.Nonnull; + +/** + * An extension point that allows end users to plug in a custom implementation of logical change + * stream records. This is useful in cases where the user would like to apply advanced client side + * filtering. This adapter acts like a factory for a SAX style change stream record builder. + */ +public interface ChangeStreamRecordAdapter { + /** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */ + ChangeStreamRecordBuilder createChangeStreamRecordBuilder(); + + /** Checks if the given change stream record is a Heartbeat. */ + @InternalApi("Used in Changestream beam pipeline.") + boolean isHeartbeat(ChangeStreamRecordT record); + + /** + * Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will + * throw an Exception. + */ + @InternalApi("Used in Changestream beam pipeline.") + String getTokenFromHeartbeat(ChangeStreamRecordT heartbeatRecord); + + /** Checks if the given change stream record is a ChangeStreamMutation. */ + @InternalApi("Used in Changestream beam pipeline.") + boolean isChangeStreamMutation(ChangeStreamRecordT record); + + /** + * Get the token from the given ChangeStreamMutation record. If the given record is not a + * Heartbeat, it will throw an Exception. + */ + String getTokenFromChangeStreamMutation(ChangeStreamRecordT record); + + /** + * A SAX style change stream record factory. It is responsible for creating one of the three types + * of change stream record: heartbeat, close stream, and a change stream mutation. + * + *

State management is handled external to the implementation of this class: + * + *

    + * Case 1: Heartbeat + *
  1. Exactly 1 {@code onHeartbeat}. + *
+ * + *
    + * Case 2: CloseStream + *
  1. Exactly 1 {@code onCloseStream}. + *
+ * + *
    + * Case 3: ChangeStreamMutation. A change stream mutation consists of one or more {@link + * com.google.bigtable.v2.Mutation}s, where the SetCells might be chunked. There are 4 different + * types of mods that a ReadChangeStream response can have: 1) DeleteFamily -> {@code + * deleteFamily}. 2) DeleteCell -> {@code deleteCell}. 3) non-chunked SetCell 4) chunked SetCell + * Mod 3) and 4) are supposed to be handled by a sequence of: a) Exactly 1 {@code startCell} b) + * Exactly 1 {@code CellValue} for non-chunked SetCell, or at least 2 {@code CellValue} for + * chunked SetCell c) Exactly 1 {@code finishCell}. Note: DeleteRow won't appear in the data + * change since it'll be converted to multiple DeleteFamily's. + *

    The whole flow of constructing a ChangeStreamMutation is: + *

  1. Exactly 1 {@code startChangeStreamMutation}. + *
  2. At least 1 1)/2)/3)/4) mod. + *
  3. Exactly 1 {@code finishChangeStreamRecord}. + *
+ */ + interface ChangeStreamRecordBuilder { + /** + * Called to create a heartbeat. This will be called at most once. If called, the current change + * stream record must not include any data changes or close stream messages. + */ + ChangeStreamRecordT onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat); + + /** + * Called to create a close stream message. This will be called at most once. If called, the + * current change stream record must not include any data changes or heartbeats. + */ + ChangeStreamRecordT onCloseStream(ReadChangeStreamResponse.CloseStream closeStream); + + /** + * Called to start a new user initiated ChangeStreamMutation. This will be called at most once. + * If called, the current change stream record must not include any close stream message or + * heartbeat. + */ + void startUserMutation( + @Nonnull ByteString rowKey, + @Nonnull String sourceClusterId, + @Nonnull Timestamp commitTimestamp, + int tieBreaker); + + /** + * Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most + * once. If called, the current change stream record must not include any close stream message + * or heartbeat. + */ + void startGcMutation( + @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker); + + /** Called to add a DeleteFamily mod. */ + void deleteFamily(@Nonnull String familyName); + + /** Called to add a DeleteCell mod. */ + void deleteCells( + @Nonnull String familyName, + @Nonnull ByteString qualifier, + @Nonnull TimestampRange timestampRange); + + /** + * Called to start a SetCell. In case of a non-chunked cell, the following order is guaranteed: + * 1) Exactly 1 {@code startCell}. 2) Exactly 1 {@code cellValue}. 3) Exactly 1 {@code + * finishCell}. In case of a chunked cell, the following order is guaranteed: 1) Exactly 1 + * {@code startCell}. 2) At least 2 {@code cellValue}. 3) Exactly 1 {@code finishCell}. + */ + void startCell(String family, ByteString qualifier, long timestampMicros); + + /** + * Called once per non-chunked cell, or at least twice per chunked cell to concatenate the cell + * value. + */ + void cellValue(ByteString value); + + /** Called once per cell to signal the end of the value (unless reset). */ + void finishCell(); + + /** Called once per stream record to signal that all mods have been processed (unless reset). */ + ChangeStreamRecordT finishChangeStreamMutation( + @Nonnull String token, @Nonnull Timestamp lowWatermark); + + /** Called when the current in progress change stream record should be dropped */ + void reset(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java new file mode 100644 index 0000000000..d8eb632e54 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -0,0 +1,175 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import javax.annotation.Nonnull; + +/** + * Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link + * ChangeStreamRecord}s to represent change stream records. + */ +public class DefaultChangeStreamRecordAdapter + implements ChangeStreamRecordAdapter { + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecordBuilder createChangeStreamRecordBuilder() { + return new DefaultChangeStreamRecordBuilder(); + } + + /** {@inheritDoc} */ + @Override + public boolean isHeartbeat(ChangeStreamRecord record) { + return record instanceof Heartbeat; + } + + /** {@inheritDoc} */ + @Override + public String getTokenFromHeartbeat(ChangeStreamRecord record) { + Preconditions.checkArgument(isHeartbeat(record), "record is not a Heartbeat."); + return ((Heartbeat) record).getChangeStreamContinuationToken().getToken(); + } + + /** {@inheritDoc} */ + @Override + public boolean isChangeStreamMutation(ChangeStreamRecord record) { + return record instanceof ChangeStreamMutation; + } + + /** {@inheritDoc} */ + @Override + public String getTokenFromChangeStreamMutation(ChangeStreamRecord record) { + Preconditions.checkArgument( + isChangeStreamMutation(record), "record is not a ChangeStreamMutation."); + return ((ChangeStreamMutation) record).getToken(); + } + + /** {@inheritDoc} */ + static class DefaultChangeStreamRecordBuilder + implements ChangeStreamRecordBuilder { + private ChangeStreamMutation.Builder changeStreamMutationBuilder = null; + + // For the current SetCell. + private String family; + private ByteString qualifier; + private long timestampMicros; + private ByteString value; + + public DefaultChangeStreamRecordBuilder() { + reset(); + } + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecord onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + Preconditions.checkArgument( + this.changeStreamMutationBuilder == null, + "Can not create a Heartbeat when there is an existing ChangeStreamMutation being built."); + return Heartbeat.fromProto(heartbeat); + } + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + Preconditions.checkArgument( + this.changeStreamMutationBuilder == null, + "Can not create a CloseStream when there is an existing ChangeStreamMutation being built."); + return CloseStream.fromProto(closeStream); + } + + /** {@inheritDoc} */ + @Override + public void startUserMutation( + @Nonnull ByteString rowKey, + @Nonnull String sourceClusterId, + @Nonnull Timestamp commitTimestamp, + int tieBreaker) { + this.changeStreamMutationBuilder = + ChangeStreamMutation.createUserMutation( + rowKey, sourceClusterId, commitTimestamp, tieBreaker); + } + + /** {@inheritDoc} */ + @Override + public void startGcMutation( + @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) { + this.changeStreamMutationBuilder = + ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker); + } + + /** {@inheritDoc} */ + @Override + public void deleteFamily(@Nonnull String familyName) { + this.changeStreamMutationBuilder.deleteFamily(familyName); + } + + /** {@inheritDoc} */ + @Override + public void deleteCells( + @Nonnull String familyName, + @Nonnull ByteString qualifier, + @Nonnull TimestampRange timestampRange) { + this.changeStreamMutationBuilder.deleteCells(familyName, qualifier, timestampRange); + } + + /** {@inheritDoc} */ + @Override + public void startCell(String family, ByteString qualifier, long timestampMicros) { + this.family = family; + this.qualifier = qualifier; + this.timestampMicros = timestampMicros; + this.value = ByteString.EMPTY; + } + + /** {@inheritDoc} */ + @Override + public void cellValue(ByteString value) { + this.value = this.value.concat(value); + } + + /** {@inheritDoc} */ + @Override + public void finishCell() { + this.changeStreamMutationBuilder.setCell( + this.family, this.qualifier, this.timestampMicros, this.value); + } + + /** {@inheritDoc} */ + @Override + public ChangeStreamRecord finishChangeStreamMutation( + @Nonnull String token, @Nonnull Timestamp lowWatermark) { + this.changeStreamMutationBuilder.setToken(token); + this.changeStreamMutationBuilder.setLowWatermark(lowWatermark); + return this.changeStreamMutationBuilder.build(); + } + + /** {@inheritDoc} */ + @Override + public void reset() { + changeStreamMutationBuilder = null; + + family = null; + qualifier = null; + timestampMicros = 0; + value = null; + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java new file mode 100644 index 0000000000..0496fa9a5c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java @@ -0,0 +1,581 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import com.google.bigtable.v2.Mutation; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.common.base.Preconditions; + +/** + * A state machine to produce {@link ChangeStreamRecordT}s from a stream of {@link + * ReadChangeStreamResponse}. 1) Produces a {@link + * com.google.cloud.bigtable.data.v2.models.Heartbeat} from a {@link + * ReadChangeStreamResponse.Heartbeat}. 2) Produces a {@link + * com.google.cloud.bigtable.data.v2.models.CloseStream} from a {@link + * ReadChangeStreamResponse.CloseStream}. 3) Produces a {@link + * com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation} from a sequence of {@link + * ReadChangeStreamResponse.DataChange}s. Note that there can be two types of chunking: 3_1) {@link + * ReadChangeStreamResponse.DataChange}s are chunked into multiple {@link + * ReadChangeStreamResponse}s. For example, a logical mutation has two mods, where the first mod is + * sent by the first {@link ReadChangeStreamResponse.DataChange} and the second mod is sent by the + * second {@link ReadChangeStreamResponse.DataChange}. 3_2) {@link + * com.google.bigtable.v2.ReadChangeStreamResponse.MutationChunk} has a chunked {@link + * com.google.bigtable.v2.Mutation.SetCell} mutation. For example, a logical mutation has one big + * {@link com.google.bigtable.v2.Mutation.SetCell} mutation which is chunked into two {@link + * ReadChangeStreamResponse}s. The first {@link ReadChangeStreamResponse.DataChange} has the first + * half of the cell value, and the second {@link ReadChangeStreamResponse.DataChange} has the second + * half. For both types of chunking, this state machine will merge all mods into a logical mutation, + * represented by a {@link com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation}. + * + *

Building of the actual change stream record object is delegated to a {@link + * ChangeStreamRecordBuilder}. This class is not thread safe. + * + *

The inputs are: + * + *

    + *
  • Heartbeats represented by {@link ReadChangeStreamResponse.Heartbeat}. + *
  • CloseStreams represented by {@link ReadChangeStreamResponse.CloseStream}. + *
  • DataChanges, represented by {@link ReadChangeStreamResponse.DataChange}, that must be + * merged to form logical mutations. + *
  • ChangeStreamRecord consumption events that reset the state machine for the next change + * stream record. + *
+ * + *

The outputs are: + * + *

    + *
  • Heartbeat records represented by {@link + * com.google.cloud.bigtable.data.v2.models.Heartbeat}. + *
  • CloseStream records represented by {@link + * com.google.cloud.bigtable.data.v2.models.CloseStream}. + *
  • Logical mutation records represented by {@link + * com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation}. + *
+ * + *

Expected Usage: + * + *

{@code
+ * ChangeStreamStateMachine changeStreamStateMachine = new ChangeStreamStateMachine<>(myChangeStreamRecordAdapter);
+ * while(responseIterator.hasNext()) {
+ *   ReadChangeStreamResponse response = responseIterator.next();
+ *   if (response.hasHeartbeat()) {
+ *     changeStreamStateMachine.handleHeartbeat(response.getHeartbeat());
+ *   } else if (response.hasCloseStream()) {
+ *     changeStreamStateMachine.handleCloseStream(response.getCloseStream());
+ *   } else {
+ *       changeStreamStateMachine.handleDataChange(response.getDataChange());
+ *   }
+ *   if (changeStreamStateMachine.hasCompleteChangeStreamRecord()) {
+ *       MyChangeStreamRecord = changeStreamStateMachine.consumeChangeStreamRecord();
+ *       // do something with the change stream record.
+ *   }
+ * }
+ * }
+ * + *

Package-private for internal use. + * + * @param The type of row the adapter will build + */ +final class ChangeStreamStateMachine { + private final ChangeStreamRecordBuilder builder; + private State currentState; + // debug stats + private int numHeartbeats = 0; + private int numCloseStreams = 0; + private int numDataChanges = 0; + private int numNonCellMods = 0; + private int numCellChunks = 0; // 1 for non-chunked cell. + private int actualTotalSizeOfChunkedSetCell = 0; + private ChangeStreamRecordT completeChangeStreamRecord; + + /** + * Initialize a new state machine that's ready for a new change stream record. + * + * @param builder The builder that will build the final change stream record. + */ + ChangeStreamStateMachine(ChangeStreamRecordBuilder builder) { + this.builder = builder; + reset(); + } + + /** + * Handle heartbeat events from the server. + * + *

+ *
Valid states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} + *
Resulting states: + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} + *
+ */ + void handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + try { + numHeartbeats++; + currentState = currentState.handleHeartbeat(heartbeat); + } catch (RuntimeException e) { + currentState = null; + throw e; + } + } + + /** + * Handle CloseStream events from the server. + * + *
+ *
Valid states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} + *
Resulting states: + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} + *
+ */ + void handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + try { + numCloseStreams++; + currentState = currentState.handleCloseStream(closeStream); + } catch (RuntimeException e) { + currentState = null; + throw e; + } + } + + /** + * Feeds a new dataChange into the state machine. If the dataChange is invalid, the state machine + * will throw an exception and should not be used for further input. + * + *
+ *
Valid states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD} + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD} + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE} + *
Resulting states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD} + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE} + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME} + *
+ * + * @param dataChange The new chunk to process. + * @throws ChangeStreamStateMachine.InvalidInputException When the chunk is not applicable to the + * current state. + * @throws IllegalStateException When the internal state is inconsistent + */ + void handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { + try { + numDataChanges++; + currentState = currentState.handleMod(dataChange, 0); + } catch (RuntimeException e) { + currentState = null; + throw e; + } + } + + /** + * Returns the last completed change stream record and transitions to awaiting a new change stream + * record. + * + * @return The last completed change stream record. + * @throws IllegalStateException If the last dataChange did not complete a change stream record. + */ + ChangeStreamRecordT consumeChangeStreamRecord() { + Preconditions.checkState( + completeChangeStreamRecord != null, "No change stream record to consume."); + Preconditions.checkState( + currentState == AWAITING_STREAM_RECORD_CONSUME, + "Change stream record not ready to consume: " + currentState); + ChangeStreamRecordT changeStreamRecord = completeChangeStreamRecord; + reset(); + return changeStreamRecord; + } + + /** Checks if there is a complete change stream record to be consumed. */ + boolean hasCompleteChangeStreamRecord() { + return currentState == AWAITING_STREAM_RECORD_CONSUME; + } + /** + * Checks if the state machine is in the middle of processing a change stream record. + * + * @return True If there is a change stream record in progress. + */ + boolean isChangeStreamRecordInProgress() { + return currentState != AWAITING_NEW_STREAM_RECORD; + } + + private void reset() { + currentState = AWAITING_NEW_STREAM_RECORD; + numHeartbeats = 0; + numCloseStreams = 0; + numDataChanges = 0; + numNonCellMods = 0; + numCellChunks = 0; + actualTotalSizeOfChunkedSetCell = 0; + completeChangeStreamRecord = null; + + builder.reset(); + } + + /** + * Base class for all the state machine's internal states. + * + *

Each state can consume 3 events: Heartbeat, CloseStream and a DataChange. By default, the + * default implementation will just throw an IllegalStateException unless the subclass adds + * explicit handling for these events. + */ + abstract static class State { + /** + * Accepts a Heartbeat by the server. And completes the current change stream record. + * + * @throws IllegalStateException If the subclass can't handle heartbeat events. + */ + ChangeStreamStateMachine.State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException(); + } + + /** + * Accepts a CloseStream by the server. And completes the current change stream record. + * + * @throws IllegalStateException If the subclass can't handle CloseStream events. + */ + ChangeStreamStateMachine.State handleCloseStream( + ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException(); + } + + /** + * Accepts a new mod and transitions to the next state. + * + * @param dataChange The DataChange that holds the new mod to process. + * @param index The index of the mod in the DataChange. + * @return The next state. + * @throws IllegalStateException If the subclass can't handle the mod. + * @throws ChangeStreamStateMachine.InvalidInputException If the subclass determines that this + * dataChange is invalid. + */ + ChangeStreamStateMachine.State handleMod( + ReadChangeStreamResponse.DataChange dataChange, int index) { + throw new IllegalStateException(); + } + } + + /** + * The default state when the state machine is awaiting a ReadChangeStream response to start a new + * change stream record. It will notify the builder of the new change stream record and transits + * to one of the following states: + * + *

+ *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}, in case of a Heartbeat + * or a CloseStream. + *
Same as {@link ChangeStreamStateMachine#AWAITING_NEW_MOD}, depending on the DataChange. + *
+ */ + private final State AWAITING_NEW_STREAM_RECORD = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + validate( + completeChangeStreamRecord == null, + "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); + completeChangeStreamRecord = builder.onHeartbeat(heartbeat); + return AWAITING_STREAM_RECORD_CONSUME; + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + validate( + completeChangeStreamRecord == null, + "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); + completeChangeStreamRecord = builder.onCloseStream(closeStream); + return AWAITING_STREAM_RECORD_CONSUME; + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + completeChangeStreamRecord == null, + "AWAITING_NEW_STREAM_RECORD: Existing ChangeStreamRecord not consumed yet."); + validate( + !dataChange.getRowKey().isEmpty(), + "AWAITING_NEW_STREAM_RECORD: First data change missing rowKey."); + validate( + dataChange.hasCommitTimestamp(), + "AWAITING_NEW_STREAM_RECORD: First data change missing commit timestamp."); + validate( + index == 0, + "AWAITING_NEW_STREAM_RECORD: First data change should start with the first mod."); + validate( + dataChange.getChunksCount() > 0, + "AWAITING_NEW_STREAM_RECORD: First data change missing mods."); + if (dataChange.getType() == Type.GARBAGE_COLLECTION) { + validate( + dataChange.getSourceClusterId().isEmpty(), + "AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id."); + builder.startGcMutation( + dataChange.getRowKey(), + dataChange.getCommitTimestamp(), + dataChange.getTiebreaker()); + } else if (dataChange.getType() == Type.USER) { + validate( + !dataChange.getSourceClusterId().isEmpty(), + "AWAITING_NEW_STREAM_RECORD: User initiated data change missing source cluster id."); + builder.startUserMutation( + dataChange.getRowKey(), + dataChange.getSourceClusterId(), + dataChange.getCommitTimestamp(), + dataChange.getTiebreaker()); + } else { + validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType()); + } + return AWAITING_NEW_MOD.handleMod(dataChange, index); + } + }; + + /** + * A state to handle the next Mod. + * + *
+ *
Valid exit states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD}. Current mod is added, and we have more + * mods to expect. + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE}. Current mod is the first chunk of a + * chunked SetCell. + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current mod is the last + * mod of the current logical mutation. + *
+ */ + private final State AWAITING_NEW_MOD = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException( + "AWAITING_NEW_MOD: Can't handle a Heartbeat in the middle of building a ChangeStreamMutation."); + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException( + "AWAITING_NEW_MOD: Can't handle a CloseStream in the middle of building a ChangeStreamMutation."); + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + 0 <= index && index <= dataChange.getChunksCount() - 1, + "AWAITING_NEW_MOD: Index out of bound."); + ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); + Mutation mod = chunk.getMutation(); + // Case 1: SetCell + if (mod.hasSetCell()) { + // Start the Cell and delegate to AWAITING_CELL_VALUE to add the cell value. + Mutation.SetCell setCell = chunk.getMutation().getSetCell(); + if (chunk.hasChunkInfo()) { + // If it has chunk info, it must be the first chunk of a chunked SetCell. + validate( + chunk.getChunkInfo().getChunkedValueOffset() == 0, + "First chunk of a chunked cell must start with offset==0."); + actualTotalSizeOfChunkedSetCell = 0; + } + builder.startCell( + setCell.getFamilyName(), + setCell.getColumnQualifier(), + setCell.getTimestampMicros()); + return AWAITING_CELL_VALUE.handleMod(dataChange, index); + } + // Case 2: DeleteFamily + if (mod.hasDeleteFromFamily()) { + numNonCellMods++; + builder.deleteFamily(mod.getDeleteFromFamily().getFamilyName()); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + // Case 3: DeleteCell + if (mod.hasDeleteFromColumn()) { + numNonCellMods++; + builder.deleteCells( + mod.getDeleteFromColumn().getFamilyName(), + mod.getDeleteFromColumn().getColumnQualifier(), + TimestampRange.create( + mod.getDeleteFromColumn().getTimeRange().getStartTimestampMicros(), + mod.getDeleteFromColumn().getTimeRange().getEndTimestampMicros())); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + throw new IllegalStateException("AWAITING_NEW_MOD: Unexpected mod type"); + } + }; + + /** + * A state that represents a cell's value continuation. + * + *
+ *
Valid exit states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_MOD}. Current chunked SetCell is added, and + * we have more mods to expect. + *
{@link ChangeStreamStateMachine#AWAITING_CELL_VALUE}. Current chunked SetCell has more + * cell values to expect. + *
{@link ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. Current chunked SetCell + * is the last mod of the current logical mutation. + *
+ */ + private final State AWAITING_CELL_VALUE = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException( + "AWAITING_CELL_VALUE: Can't handle a Heartbeat in the middle of building a SetCell."); + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException( + "AWAITING_CELL_VALUE: Can't handle a CloseStream in the middle of building a SetCell."); + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + 0 <= index && index <= dataChange.getChunksCount() - 1, + "AWAITING_CELL_VALUE: Index out of bound."); + ReadChangeStreamResponse.MutationChunk chunk = dataChange.getChunks(index); + validate( + chunk.getMutation().hasSetCell(), + "AWAITING_CELL_VALUE: Current mod is not a SetCell."); + Mutation.SetCell setCell = chunk.getMutation().getSetCell(); + numCellChunks++; + builder.cellValue(setCell.getValue()); + // Case 1: Current SetCell is chunked. + if (chunk.hasChunkInfo()) { + validate( + chunk.getChunkInfo().getChunkedValueSize() != 0, + "AWAITING_CELL_VALUE: Chunked value size must be positive."); + actualTotalSizeOfChunkedSetCell += setCell.getValue().size(); + // If it's the last chunk of the chunked SetCell, finish the cell. + if (chunk.getChunkInfo().getLastChunk()) { + builder.finishCell(); + validate( + actualTotalSizeOfChunkedSetCell == chunk.getChunkInfo().getChunkedValueSize(), + "Chunked value size in ChunkInfo doesn't match the actual total size. " + + "ChunkInfo: " + + chunk.getChunkInfo().getChunkedValueSize() + + "; actual total size: " + + actualTotalSizeOfChunkedSetCell); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + // If this is not the last chunk of a chunked SetCell, then this must be the last mod of + // the current response, and we're expecting the rest of the chunked cells in the + // following ReadChangeStream response. + validate( + index == dataChange.getChunksCount() - 1, + "AWAITING_CELL_VALUE: Current SetCell is not chunked."); + return AWAITING_CELL_VALUE; + } + // Case 2: Current SetCell is not chunked. + builder.finishCell(); + return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } + }; + + /** + * A state that represents a completed change stream record. It prevents new change stream records + * from being read until the current one has been consumed. + * + *
+ *
Valid exit states: + *
{@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD}. + *
+ */ + private final State AWAITING_STREAM_RECORD_CONSUME = + new State() { + @Override + State handleHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) { + throw new IllegalStateException( + "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); + } + + @Override + State handleCloseStream(ReadChangeStreamResponse.CloseStream closeStream) { + throw new IllegalStateException( + "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); + } + + @Override + State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + throw new IllegalStateException( + "AWAITING_STREAM_RECORD_CONSUME: Skipping completed change stream record."); + } + }; + + /** + * Check if we should continue handling mods in the current DataChange or wrap up. There are 3 + * cases: + * + *
    + *
  • 1) index < dataChange.getChunksCount() -> continue to handle the next mod. + *
  • 2) index == dataChange.getChunksCount() 2_1) dataChange.done == true -> current change + * stream mutation is complete. Wrap it up and return {@link + * ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. 2_2) dataChange.done != true -> + * current change stream mutation isn't complete. Return {@link + * ChangeStreamStateMachine#AWAITING_NEW_MOD} to wait for more mods in the next + * ReadChangeStreamResponse. + *
+ */ + private State checkAndFinishMutationIfNeeded( + ReadChangeStreamResponse.DataChange dataChange, int index) { + validate( + 0 <= index && index <= dataChange.getChunksCount(), + "checkAndFinishMutationIfNeeded: index out of bound."); + // Case 1): Handle the next mod. + if (index < dataChange.getChunksCount()) { + return AWAITING_NEW_MOD.handleMod(dataChange, index); + } + // We have finished handling all the mods in this DataChange. + if (dataChange.getDone()) { + // Case 2_1): Current change stream mutation is complete. + validate(!dataChange.getToken().isEmpty(), "Last data change missing token"); + validate(dataChange.hasLowWatermark(), "Last data change missing lowWatermark"); + completeChangeStreamRecord = + builder.finishChangeStreamMutation(dataChange.getToken(), dataChange.getLowWatermark()); + return AWAITING_STREAM_RECORD_CONSUME; + } + // Case 2_2): The current DataChange itself is chunked, so wait for the next + // ReadChangeStreamResponse. Note that we should wait for the new mods instead + // of for the new change stream record since the current record hasn't finished yet. + return AWAITING_NEW_MOD; + } + + private void validate(boolean condition, String message) { + if (!condition) { + throw new ChangeStreamStateMachine.InvalidInputException( + message + + ". numHeartbeats: " + + numHeartbeats + + ", numCloseStreams: " + + numCloseStreams + + ", numDataChanges: " + + numDataChanges + + ", numNonCellMods: " + + numNonCellMods + + ", numCellChunks: " + + numCellChunks + + ", actualTotalSizeOfChunkedSetCell: " + + actualTotalSizeOfChunkedSetCell); + } + } + + static class InvalidInputException extends RuntimeException { + InvalidInputException(String message) { + super(message); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java index 938213fb36..a14fe001cd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java @@ -87,15 +87,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti oos.close(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); ChangeStreamMutation actual = (ChangeStreamMutation) ois.readObject(); - Assert.assertEquals(actual.getRowKey(), changeStreamMutation.getRowKey()); - Assert.assertEquals(actual.getType(), changeStreamMutation.getType()); - Assert.assertEquals(actual.getSourceClusterId(), changeStreamMutation.getSourceClusterId()); - Assert.assertEquals(actual.getCommitTimestamp(), changeStreamMutation.getCommitTimestamp()); - Assert.assertEquals(actual.getTieBreaker(), changeStreamMutation.getTieBreaker()); - Assert.assertEquals(actual.getToken(), changeStreamMutation.getToken()); - Assert.assertEquals(actual.getLowWatermark(), changeStreamMutation.getLowWatermark()); - assertThat(actual.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)) - .isEqualTo(changeStreamMutation.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)); + Assert.assertEquals(actual, changeStreamMutation); } @Test @@ -138,15 +130,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException { oos.close(); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); ChangeStreamMutation actual = (ChangeStreamMutation) ois.readObject(); - Assert.assertEquals(actual.getRowKey(), changeStreamMutation.getRowKey()); - Assert.assertEquals(actual.getType(), changeStreamMutation.getType()); - Assert.assertEquals(actual.getSourceClusterId(), changeStreamMutation.getSourceClusterId()); - Assert.assertEquals(actual.getCommitTimestamp(), changeStreamMutation.getCommitTimestamp()); - Assert.assertEquals(actual.getTieBreaker(), changeStreamMutation.getTieBreaker()); - Assert.assertEquals(actual.getToken(), changeStreamMutation.getToken()); - Assert.assertEquals(actual.getLowWatermark(), changeStreamMutation.getLowWatermark()); - assertThat(actual.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)) - .isEqualTo(changeStreamMutation.toRowMutation(TABLE_ID).toProto(REQUEST_CONTEXT)); + Assert.assertEquals(actual, changeStreamMutation); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java new file mode 100644 index 0000000000..b2511cc67b --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -0,0 +1,352 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.Mutation; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.TimestampRange; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.rpc.Status; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DefaultChangeStreamRecordAdapterTest { + + private final DefaultChangeStreamRecordAdapter adapter = new DefaultChangeStreamRecordAdapter(); + private ChangeStreamRecordBuilder changeStreamRecordBuilder; + + @Before + public void setUp() { + changeStreamRecordBuilder = adapter.createChangeStreamRecordBuilder(); + } + + @Test + public void heartbeatTest() { + ReadChangeStreamResponse.Heartbeat expectedHeartbeat = + ReadChangeStreamResponse.Heartbeat.newBuilder() + .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) + .setContinuationToken( + StreamContinuationToken.newBuilder().setToken("random-token").build()) + .build(); + assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) + .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); + // Call again. + assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) + .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); + } + + @Test + public void closeStreamTest() { + ReadChangeStreamResponse.CloseStream expectedCloseStream = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder().setToken("random-token").build()) + .setStatus(Status.newBuilder().setCode(0).build()) + .build(); + assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) + .isEqualTo(CloseStream.fromProto(expectedCloseStream)); + // Call again. + assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) + .isEqualTo(CloseStream.fromProto(expectedCloseStream)); + } + + @Test + public void singleDeleteFamilyTest() { + // This is the mod we get from the ReadChangeStreamResponse. + Mutation.DeleteFromFamily deleteFromFamily = + Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build(); + + // This is the expected logical mutation in the change stream record. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteFamily("fake-family") + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // This is the actual change stream record built from the changeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteFamily(deleteFromFamily.getFamilyName()); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void singleDeleteCellTest() { + // This is the mod we get from the ReadChangeStreamResponse. + Mutation.DeleteFromColumn deleteFromColumn = + Mutation.DeleteFromColumn.newBuilder() + .setFamilyName("fake-family") + .setColumnQualifier(ByteString.copyFromUtf8("fake-qualifier")) + .setTimeRange( + TimestampRange.newBuilder() + .setStartTimestampMicros(1000L) + .setEndTimestampMicros(2000L) + .build()) + .build(); + + // This is the expected logical mutation in the change stream record. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteCells( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + Range.TimestampRange.create(1000L, 2000L)) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // This is the actual change stream record built from the changeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteCells( + deleteFromColumn.getFamilyName(), + deleteFromColumn.getColumnQualifier(), + Range.TimestampRange.create( + deleteFromColumn.getTimeRange().getStartTimestampMicros(), + deleteFromColumn.getTimeRange().getEndTimestampMicros())); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void singleNonChunkedCellTest() { + // This is the expected logical mutation in the change stream record. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("fake-value")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + // This is the actual change stream record built from the changeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("fake-value")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void singleChunkedCellTest() { + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("fake-value1-value2")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("fake-value1")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value2")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } + + @Test + public void multipleChunkedCellsTest() { + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation.Builder expectedChangeStreamMutationBuilder = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + for (int i = 0; i < 10; ++i) { + expectedChangeStreamMutationBuilder.setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8(i + "-fake-value1-value2-value3")); + } + expectedChangeStreamMutationBuilder.setToken("fake-token").setLowWatermark(fakeLowWatermark); + + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + for (int i = 0; i < 10; ++i) { + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8(i + "-fake-value1")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value2")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value3")); + changeStreamRecordBuilder.finishCell(); + } + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutationBuilder.build()); + // Call again. + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutationBuilder.build()); + } + + @Test + public void multipleDifferentModsTest() { + // This is the expected logical mutation in the change stream record, which contains one + // DeleteFromFamily, + // one non-chunked cell, and one chunked cell. + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation.Builder expectedChangeStreamMutationBuilder = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteFamily("fake-family") + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("non-chunked-value")) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("chunked-value")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark); + + // This is the actual change stream record built from the changeStreamRecordBuilder. + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteFamily("fake-family"); + // Add non-chunked cell. + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("non-chunked-value")); + changeStreamRecordBuilder.finishCell(); + // Add chunked cell. + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("chunked")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutationBuilder.build()); + } + + @Test + public void resetTest() { + // Build a Heartbeat. + ReadChangeStreamResponse.Heartbeat expectedHeartbeat = + ReadChangeStreamResponse.Heartbeat.newBuilder() + .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) + .setContinuationToken( + StreamContinuationToken.newBuilder().setToken("random-token").build()) + .build(); + assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) + .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); + + // Reset and build a CloseStream. + changeStreamRecordBuilder.reset(); + ReadChangeStreamResponse.CloseStream expectedCloseStream = + ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens( + StreamContinuationToken.newBuilder().setToken("random-token").build()) + .setStatus(Status.newBuilder().setCode(0).build()) + .build(); + assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) + .isEqualTo(CloseStream.fromProto(expectedCloseStream)); + + // Reset and build a DeleteFamily. + changeStreamRecordBuilder.reset(); + Mutation deleteFromFamily = + Mutation.newBuilder() + .setDeleteFromFamily( + Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build()) + .build(); + Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); + Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + ChangeStreamMutation expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .deleteFamily("fake-family") + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.deleteFamily(deleteFromFamily.getDeleteFromFamily().getFamilyName()); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + + // Reset a build a cell. + changeStreamRecordBuilder.reset(); + expectedChangeStreamMutation = + ChangeStreamMutation.createUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) + .setCell( + "fake-family", + ByteString.copyFromUtf8("fake-qualifier"), + 100L, + ByteString.copyFromUtf8("fake-value1-value2")) + .setToken("fake-token") + .setLowWatermark(fakeLowWatermark) + .build(); + + changeStreamRecordBuilder.startUserMutation( + ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); + changeStreamRecordBuilder.startCell( + "fake-family", ByteString.copyFromUtf8("fake-qualifier"), 100L); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("fake-value1")); + changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value2")); + changeStreamRecordBuilder.finishCell(); + assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) + .isEqualTo(expectedChangeStreamMutation); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java new file mode 100644 index 0000000000..d86df91c35 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachineTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; +import com.google.cloud.bigtable.data.v2.models.DefaultChangeStreamRecordAdapter; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChangeStreamStateMachineTest { + ChangeStreamStateMachine changeStreamStateMachine; + + @Before + public void setUp() throws Exception { + changeStreamStateMachine = + new ChangeStreamStateMachine<>( + new DefaultChangeStreamRecordAdapter().createChangeStreamRecordBuilder()); + } + + @Test + public void testErrorHandlingStats() { + ReadChangeStreamResponse.DataChange dataChange = + ReadChangeStreamResponse.DataChange.newBuilder().build(); + + ChangeStreamStateMachine.InvalidInputException actualError = null; + try { + changeStreamStateMachine.handleDataChange(dataChange); + } catch (ChangeStreamStateMachine.InvalidInputException e) { + actualError = e; + } + + assertThat(actualError) + .hasMessageThat() + .containsMatch("AWAITING_NEW_STREAM_RECORD: First data change missing rowKey"); + assertThat(actualError).hasMessageThat().contains("numHeartbeats: 0"); + assertThat(actualError).hasMessageThat().contains("numCloseStreams: 0"); + assertThat(actualError).hasMessageThat().contains("numDataChanges: 1"); + assertThat(actualError).hasMessageThat().contains("numNonCellMods: 0"); + assertThat(actualError).hasMessageThat().contains("numCellChunks: 0"); + assertThat(actualError).hasMessageThat().contains("actualTotalSizeOfChunkedSetCell: 0"); + } +} From bd4ad0318b7f20eecde151b6260237e88aed7742 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Mon, 1 Aug 2022 22:32:32 -0400 Subject: [PATCH 2/4] fix: Fix styles and add some tests. --- .../v2/models/ChangeStreamRecordAdapter.java | 55 ++++--- .../bigtable/data/v2/models/Heartbeat.java | 2 +- .../ChangeStreamStateMachine.java | 75 +++++----- .../DefaultChangeStreamRecordAdapterTest.java | 140 +++++++++++++++--- 4 files changed, 192 insertions(+), 80 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java index 2aa6cc334f..2972f25c9d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -48,8 +48,9 @@ public interface ChangeStreamRecordAdapter { /** * Get the token from the given ChangeStreamMutation record. If the given record is not a - * Heartbeat, it will throw an Exception. + * ChangeStreamMutation, it will throw an Exception. */ + @InternalApi("Used in Changestream beam pipeline.") String getTokenFromChangeStreamMutation(ChangeStreamRecordT record); /** @@ -69,19 +70,28 @@ public interface ChangeStreamRecordAdapter { * * *
    - * Case 3: ChangeStreamMutation. A change stream mutation consists of one or more {@link - * com.google.bigtable.v2.Mutation}s, where the SetCells might be chunked. There are 4 different - * types of mods that a ReadChangeStream response can have: 1) DeleteFamily -> {@code - * deleteFamily}. 2) DeleteCell -> {@code deleteCell}. 3) non-chunked SetCell 4) chunked SetCell - * Mod 3) and 4) are supposed to be handled by a sequence of: a) Exactly 1 {@code startCell} b) - * Exactly 1 {@code CellValue} for non-chunked SetCell, or at least 2 {@code CellValue} for - * chunked SetCell c) Exactly 1 {@code finishCell}. Note: DeleteRow won't appear in the data - * change since it'll be converted to multiple DeleteFamily's. - *

    The whole flow of constructing a ChangeStreamMutation is: - *

  1. Exactly 1 {@code startChangeStreamMutation}. - *
  2. At least 1 1)/2)/3)/4) mod. - *
  3. Exactly 1 {@code finishChangeStreamRecord}. + * Case 3: ChangeStreamMutation. A change stream mutation consists of one or more mods, where + * the SetCells might be chunked. There are 3 different types of mods that a ReadChangeStream + * response can have: + *
  4. DeleteFamily -> Exactly 1 {@code deleteFamily} + *
  5. DeleteCell -> Exactly 1 {@code deleteCell} + *
  6. SetCell -> Exactly 1 {@code startCell}, At least 1 {@code CellValue}, Exactly 1 {@code + * finishCell}. *
+ * + *

The whole flow of constructing a ChangeStreamMutation is: + * + *

    + *
  1. Exactly 1 {@code startUserMutation} or {@code startGcMutation}. + *
  2. At least 1 DeleteFamily/DeleteCell/SetCell mods. + *
  3. Exactly 1 {@code finishChangeStreamMutation}. + *
+ * + *

Note: For a non-chunked SetCell, only 1 {@code CellValue} will be called. For a chunked + * SetCell, more than 1 {@code CellValue}s will be called. + * + *

Note: DeleteRow's won't appear in data changes since they'll be converted to multiple + * DeleteFamily's. */ interface ChangeStreamRecordBuilder { /** @@ -125,10 +135,21 @@ void deleteCells( @Nonnull TimestampRange timestampRange); /** - * Called to start a SetCell. In case of a non-chunked cell, the following order is guaranteed: - * 1) Exactly 1 {@code startCell}. 2) Exactly 1 {@code cellValue}. 3) Exactly 1 {@code - * finishCell}. In case of a chunked cell, the following order is guaranteed: 1) Exactly 1 - * {@code startCell}. 2) At least 2 {@code cellValue}. 3) Exactly 1 {@code finishCell}. + * Called to start a SetCell. + * + *

    + * In case of a non-chunked cell, the following order is guaranteed: + *
  1. Exactly 1 {@code startCell}. + *
  2. Exactly 1 {@code cellValue}. + *
  3. Exactly 1 {@code finishCell}. + *
+ * + *
    + * In case of a chunked cell, the following order is guaranteed: + *
  1. Exactly 1 {@code startCell}. + *
  2. At least 2 {@code cellValue}. + *
  3. Exactly 1 {@code finishCell}. + *
*/ void startCell(String family, ByteString qualifier, long timestampMicros); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index db82657e49..f2371c8507 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -26,7 +26,7 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608504L; - public static Heartbeat create( + private static Heartbeat create( ChangeStreamContinuationToken changeStreamContinuationToken, Timestamp lowWatermark) { return new AutoValue_Heartbeat(changeStreamContinuationToken, lowWatermark); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java index 0496fa9a5c..bb3eb3d15d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java @@ -23,25 +23,25 @@ import com.google.common.base.Preconditions; /** - * A state machine to produce {@link ChangeStreamRecordT}s from a stream of {@link - * ReadChangeStreamResponse}. 1) Produces a {@link - * com.google.cloud.bigtable.data.v2.models.Heartbeat} from a {@link - * ReadChangeStreamResponse.Heartbeat}. 2) Produces a {@link - * com.google.cloud.bigtable.data.v2.models.CloseStream} from a {@link - * ReadChangeStreamResponse.CloseStream}. 3) Produces a {@link - * com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation} from a sequence of {@link - * ReadChangeStreamResponse.DataChange}s. Note that there can be two types of chunking: 3_1) {@link - * ReadChangeStreamResponse.DataChange}s are chunked into multiple {@link - * ReadChangeStreamResponse}s. For example, a logical mutation has two mods, where the first mod is - * sent by the first {@link ReadChangeStreamResponse.DataChange} and the second mod is sent by the - * second {@link ReadChangeStreamResponse.DataChange}. 3_2) {@link - * com.google.bigtable.v2.ReadChangeStreamResponse.MutationChunk} has a chunked {@link - * com.google.bigtable.v2.Mutation.SetCell} mutation. For example, a logical mutation has one big - * {@link com.google.bigtable.v2.Mutation.SetCell} mutation which is chunked into two {@link - * ReadChangeStreamResponse}s. The first {@link ReadChangeStreamResponse.DataChange} has the first - * half of the cell value, and the second {@link ReadChangeStreamResponse.DataChange} has the second - * half. For both types of chunking, this state machine will merge all mods into a logical mutation, - * represented by a {@link com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation}. + * A state machine to produce change stream records from a stream of {@link + * ReadChangeStreamResponse}. A change stream record can be a heartbeat, a close stream message or a + * logical mutation. + * + *

Note that there can be two types of chunking for a logical mutation: + * + *

    + *
  • Non-SetCell chunking. For example, a logical mutation has two mods, where the first mod is + * sent by the first {@link ReadChangeStreamResponse} and the second mod is sent by the second + * {@link ReadChangeStreamResponse}. + *
  • {@link ReadChangeStreamResponse.MutationChunk} has a chunked {@link + * com.google.bigtable.v2.Mutation.SetCell} mutation. For example, a logical mutation has one + * big {@link Mutation.SetCell} mutation which is chunked into two {@link + * ReadChangeStreamResponse}s. The first {@link ReadChangeStreamResponse.DataChange} has the + * first half of the cell value, and the second {@link ReadChangeStreamResponse.DataChange} + * has the second half. + *
+ * + * This state machine handles both types of chunking. * *

Building of the actual change stream record object is delegated to a {@link * ChangeStreamRecordBuilder}. This class is not thread safe. @@ -49,10 +49,10 @@ *

The inputs are: * *

    - *
  • Heartbeats represented by {@link ReadChangeStreamResponse.Heartbeat}. - *
  • CloseStreams represented by {@link ReadChangeStreamResponse.CloseStream}. - *
  • DataChanges, represented by {@link ReadChangeStreamResponse.DataChange}, that must be - * merged to form logical mutations. + *
  • {@link ReadChangeStreamResponse.Heartbeat}s. + *
  • {@link ReadChangeStreamResponse.CloseStream}s. + *
  • {@link ReadChangeStreamResponse.DataChange}s, that must be merged to form logical + * mutations. *
  • ChangeStreamRecord consumption events that reset the state machine for the next change * stream record. *
@@ -60,12 +60,9 @@ *

The outputs are: * *

    - *
  • Heartbeat records represented by {@link - * com.google.cloud.bigtable.data.v2.models.Heartbeat}. - *
  • CloseStream records represented by {@link - * com.google.cloud.bigtable.data.v2.models.CloseStream}. - *
  • Logical mutation records represented by {@link - * com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation}. + *
  • Heartbeat records. + *
  • CloseStream records. + *
  • Logical mutation records. *
* *

Expected Usage: @@ -185,10 +182,10 @@ void handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { } /** - * Returns the last completed change stream record and transitions to awaiting a new change stream + * Returns the completed change stream record and transitions to awaiting a new change stream * record. * - * @return The last completed change stream record. + * @return The completed change stream record. * @throws IllegalStateException If the last dataChange did not complete a change stream record. */ ChangeStreamRecordT consumeChangeStreamRecord() { @@ -231,9 +228,9 @@ private void reset() { /** * Base class for all the state machine's internal states. * - *

Each state can consume 3 events: Heartbeat, CloseStream and a DataChange. By default, the - * default implementation will just throw an IllegalStateException unless the subclass adds - * explicit handling for these events. + *

Each state can consume 3 events: Heartbeat, CloseStream and a Mod. By default, the default + * implementation will just throw an IllegalStateException unless the subclass adds explicit + * handling for these events. */ abstract static class State { /** @@ -522,12 +519,12 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { * *

    *
  • 1) index < dataChange.getChunksCount() -> continue to handle the next mod. - *
  • 2) index == dataChange.getChunksCount() 2_1) dataChange.done == true -> current change + *
  • 2_1) index == dataChange.getChunksCount() && dataChange.done == true -> current change * stream mutation is complete. Wrap it up and return {@link - * ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. 2_2) dataChange.done != true -> - * current change stream mutation isn't complete. Return {@link - * ChangeStreamStateMachine#AWAITING_NEW_MOD} to wait for more mods in the next - * ReadChangeStreamResponse. + * ChangeStreamStateMachine#AWAITING_STREAM_RECORD_CONSUME}. + *
  • 2_2) index == dataChange.getChunksCount() && dataChange.done != true -> current change + * stream mutation isn't complete. Return {@link ChangeStreamStateMachine#AWAITING_NEW_MOD} + * to wait for more mods in the next ReadChangeStreamResponse. *
*/ private State checkAndFinishMutationIfNeeded( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index b2511cc67b..e29b914ffc 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -25,8 +25,11 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.rpc.Status; +import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -36,11 +39,88 @@ public class DefaultChangeStreamRecordAdapterTest { private final DefaultChangeStreamRecordAdapter adapter = new DefaultChangeStreamRecordAdapter(); private ChangeStreamRecordBuilder changeStreamRecordBuilder; + @Rule public ExpectedException expect = ExpectedException.none(); + @Before public void setUp() { changeStreamRecordBuilder = adapter.createChangeStreamRecordBuilder(); } + @Test + public void isHeartbeatTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertTrue(adapter.isHeartbeat(heartbeatRecord)); + Assert.assertFalse(adapter.isHeartbeat(closeStreamRecord)); + Assert.assertFalse(adapter.isHeartbeat(changeStreamMutationRecord)); + } + + @Test + public void getTokenFromHeartbeatTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto( + ReadChangeStreamResponse.Heartbeat.newBuilder() + .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) + .setContinuationToken( + StreamContinuationToken.newBuilder().setToken("heartbeat-token").build()) + .build()); + Assert.assertEquals(adapter.getTokenFromHeartbeat(heartbeatRecord), "heartbeat-token"); + } + + @Test(expected = IllegalArgumentException.class) + public void getTokenFromHeartbeatInvalidTypeTest() { + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + adapter.getTokenFromHeartbeat(closeStreamRecord); + expect.expectMessage("record is not a Heartbeat."); + } + + @Test + public void isChangeStreamMutationTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertFalse(adapter.isChangeStreamMutation(heartbeatRecord)); + Assert.assertFalse(adapter.isChangeStreamMutation(closeStreamRecord)); + Assert.assertTrue(adapter.isChangeStreamMutation(changeStreamMutationRecord)); + } + + @Test + public void getTokenFromChangeStreamMutationTest() { + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("change-stream-mutation-token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertEquals( + adapter.getTokenFromChangeStreamMutation(changeStreamMutationRecord), + "change-stream-mutation-token"); + } + + @Test(expected = IllegalArgumentException.class) + public void getTokenFromChangeStreamMutationInvalidTypeTest() { + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + adapter.getTokenFromChangeStreamMutation(closeStreamRecord); + expect.expectMessage("record is not a ChangeStreamMutation."); + } + @Test public void heartbeatTest() { ReadChangeStreamResponse.Heartbeat expectedHeartbeat = @@ -71,15 +151,30 @@ public void closeStreamTest() { .isEqualTo(CloseStream.fromProto(expectedCloseStream)); } + @Test(expected = IllegalArgumentException.class) + public void createHeartbeatWithExistingMutationShouldFailTest() { + changeStreamRecordBuilder.startGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0); + changeStreamRecordBuilder.onHeartbeat(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + } + + @Test(expected = IllegalArgumentException.class) + public void createCloseStreamWithExistingMutationShouldFailTest() { + changeStreamRecordBuilder.startGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0); + changeStreamRecordBuilder.onCloseStream( + ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + } + @Test public void singleDeleteFamilyTest() { - // This is the mod we get from the ReadChangeStreamResponse. + // Suppose this is the mod we get from the ReadChangeStreamResponse. Mutation.DeleteFromFamily deleteFromFamily = Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build(); - - // This is the expected logical mutation in the change stream record. Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + + // Expected logical mutation in the change stream record. ChangeStreamMutation expectedChangeStreamMutation = ChangeStreamMutation.createUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) @@ -88,7 +183,7 @@ public void singleDeleteFamilyTest() { .setLowWatermark(fakeLowWatermark) .build(); - // This is the actual change stream record built from the changeStreamRecordBuilder. + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); changeStreamRecordBuilder.deleteFamily(deleteFromFamily.getFamilyName()); @@ -101,7 +196,7 @@ public void singleDeleteFamilyTest() { @Test public void singleDeleteCellTest() { - // This is the mod we get from the ReadChangeStreamResponse. + // Suppose this is the mod we get from the ReadChangeStreamResponse. Mutation.DeleteFromColumn deleteFromColumn = Mutation.DeleteFromColumn.newBuilder() .setFamilyName("fake-family") @@ -112,10 +207,10 @@ public void singleDeleteCellTest() { .setEndTimestampMicros(2000L) .build()) .build(); - - // This is the expected logical mutation in the change stream record. Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); + + // Expected logical mutation in the change stream record. ChangeStreamMutation expectedChangeStreamMutation = ChangeStreamMutation.createUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0) @@ -127,7 +222,7 @@ public void singleDeleteCellTest() { .setLowWatermark(fakeLowWatermark) .build(); - // This is the actual change stream record built from the changeStreamRecordBuilder. + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); changeStreamRecordBuilder.deleteCells( @@ -145,7 +240,7 @@ public void singleDeleteCellTest() { @Test public void singleNonChunkedCellTest() { - // This is the expected logical mutation in the change stream record. + // Expected logical mutation in the change stream record. Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); ChangeStreamMutation expectedChangeStreamMutation = @@ -160,7 +255,8 @@ public void singleNonChunkedCellTest() { .setLowWatermark(fakeLowWatermark) .build(); - // This is the actual change stream record built from the changeStreamRecordBuilder. + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + // Suppose the SetCell is not chunked and the state machine calls `cellValue()` once. changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); changeStreamRecordBuilder.startCell( @@ -176,6 +272,7 @@ public void singleNonChunkedCellTest() { @Test public void singleChunkedCellTest() { + // Expected logical mutation in the change stream record. Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); ChangeStreamMutation expectedChangeStreamMutation = @@ -190,6 +287,9 @@ public void singleChunkedCellTest() { .setLowWatermark(fakeLowWatermark) .build(); + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. + // Suppose the SetCell is chunked into two pieces and the state machine calls `cellValue()` + // twice. changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); changeStreamRecordBuilder.startCell( @@ -206,6 +306,7 @@ public void singleChunkedCellTest() { @Test public void multipleChunkedCellsTest() { + // Expected logical mutation in the change stream record. Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); ChangeStreamMutation.Builder expectedChangeStreamMutationBuilder = @@ -220,6 +321,7 @@ public void multipleChunkedCellsTest() { } expectedChangeStreamMutationBuilder.setToken("fake-token").setLowWatermark(fakeLowWatermark); + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); for (int i = 0; i < 10; ++i) { @@ -230,6 +332,7 @@ public void multipleChunkedCellsTest() { changeStreamRecordBuilder.cellValue(ByteString.copyFromUtf8("-value3")); changeStreamRecordBuilder.finishCell(); } + // Check that they're the same. assertThat(changeStreamRecordBuilder.finishChangeStreamMutation("fake-token", fakeLowWatermark)) .isEqualTo(expectedChangeStreamMutationBuilder.build()); // Call again. @@ -239,8 +342,7 @@ public void multipleChunkedCellsTest() { @Test public void multipleDifferentModsTest() { - // This is the expected logical mutation in the change stream record, which contains one - // DeleteFromFamily, + // Expected logical mutation in the change stream record, which contains one DeleteFromFamily, // one non-chunked cell, and one chunked cell. Timestamp fakeCommitTimestamp = Timestamp.newBuilder().setSeconds(1000).build(); Timestamp fakeLowWatermark = Timestamp.newBuilder().setSeconds(2000).build(); @@ -261,7 +363,7 @@ public void multipleDifferentModsTest() { .setToken("fake-token") .setLowWatermark(fakeLowWatermark); - // This is the actual change stream record built from the changeStreamRecordBuilder. + // Create the ChangeStreamMutation through the ChangeStreamRecordBuilder. changeStreamRecordBuilder.startUserMutation( ByteString.copyFromUtf8("key"), "fake-source-cluster-id", fakeCommitTimestamp, 0); changeStreamRecordBuilder.deleteFamily("fake-family"); @@ -284,22 +386,14 @@ public void multipleDifferentModsTest() { public void resetTest() { // Build a Heartbeat. ReadChangeStreamResponse.Heartbeat expectedHeartbeat = - ReadChangeStreamResponse.Heartbeat.newBuilder() - .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) - .setContinuationToken( - StreamContinuationToken.newBuilder().setToken("random-token").build()) - .build(); + ReadChangeStreamResponse.Heartbeat.getDefaultInstance(); assertThat(changeStreamRecordBuilder.onHeartbeat(expectedHeartbeat)) .isEqualTo(Heartbeat.fromProto(expectedHeartbeat)); // Reset and build a CloseStream. changeStreamRecordBuilder.reset(); ReadChangeStreamResponse.CloseStream expectedCloseStream = - ReadChangeStreamResponse.CloseStream.newBuilder() - .addContinuationTokens( - StreamContinuationToken.newBuilder().setToken("random-token").build()) - .setStatus(Status.newBuilder().setCode(0).build()) - .build(); + ReadChangeStreamResponse.CloseStream.getDefaultInstance(); assertThat(changeStreamRecordBuilder.onCloseStream(expectedCloseStream)) .isEqualTo(CloseStream.fromProto(expectedCloseStream)); From 18b0869e32e2d000878a26f80d7731eb09857e26 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Tue, 2 Aug 2022 16:31:32 -0400 Subject: [PATCH 3/4] fix: Address comments --- .../bigtable/data/v2/models/ChangeStreamRecordAdapter.java | 3 ++- .../data/v2/stub/changestream/ChangeStreamStateMachine.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java index 2972f25c9d..14ebb4192c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -25,7 +25,8 @@ /** * An extension point that allows end users to plug in a custom implementation of logical change * stream records. This is useful in cases where the user would like to apply advanced client side - * filtering. This adapter acts like a factory for a SAX style change stream record builder. + * filtering(for example, only keep DeleteFamily's in the mutations). This adapter acts like a + * factory for a SAX style change stream record builder. */ public interface ChangeStreamRecordAdapter { /** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */ diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java index bb3eb3d15d..c96c71f3b4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java @@ -454,7 +454,7 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { // Case 1: Current SetCell is chunked. if (chunk.hasChunkInfo()) { validate( - chunk.getChunkInfo().getChunkedValueSize() != 0, + chunk.getChunkInfo().getChunkedValueSize() > 0, "AWAITING_CELL_VALUE: Chunked value size must be positive."); actualTotalSizeOfChunkedSetCell += setCell.getValue().size(); // If it's the last chunk of the chunked SetCell, finish the cell. @@ -536,7 +536,9 @@ private State checkAndFinishMutationIfNeeded( if (index < dataChange.getChunksCount()) { return AWAITING_NEW_MOD.handleMod(dataChange, index); } - // We have finished handling all the mods in this DataChange. + // If we reach here, it means that all the mods in this DataChange have been handled. We should + // finish up the logical mutation or wait for more mods in the next ReadChangeStreamResponse, + // depending on whether the current response is the last response for the logical mutation. if (dataChange.getDone()) { // Case 2_1): Current change stream mutation is complete. validate(!dataChange.getToken().isEmpty(), "Last data change missing token"); From 9d192fcc9b7fc18974fa2f1e3db28c790ec23a62 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 3 Aug 2022 15:44:13 -0400 Subject: [PATCH 4/4] fix: Update comments --- .../v2/models/ChangeStreamRecordAdapter.java | 4 +- .../ChangeStreamStateMachine.java | 58 ++++++++++--------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java index 14ebb4192c..6e9715a407 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -25,8 +25,8 @@ /** * An extension point that allows end users to plug in a custom implementation of logical change * stream records. This is useful in cases where the user would like to apply advanced client side - * filtering(for example, only keep DeleteFamily's in the mutations). This adapter acts like a - * factory for a SAX style change stream record builder. + * filtering(for example, only keep DeleteFamily in the mutations). This adapter acts like a factory + * for a SAX style change stream record builder. */ public interface ChangeStreamRecordAdapter { /** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */ diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java index c96c71f3b4..7ab7fa2b7b 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java @@ -24,15 +24,15 @@ /** * A state machine to produce change stream records from a stream of {@link - * ReadChangeStreamResponse}. A change stream record can be a heartbeat, a close stream message or a - * logical mutation. + * ReadChangeStreamResponse}. A change stream record can be a Heartbeat, a CloseStream or a + * ChangeStreamMutation. * - *

Note that there can be two types of chunking for a logical mutation: + *

There could be two types of chunking for a ChangeStreamMutation: * *

    - *
  • Non-SetCell chunking. For example, a logical mutation has two mods, where the first mod is - * sent by the first {@link ReadChangeStreamResponse} and the second mod is sent by the second - * {@link ReadChangeStreamResponse}. + *
  • Non-SetCell chunking. For example, a ChangeStreamMutation has two mods, DeleteFamily and + * DeleteColumn. DeleteFamily is sent in the first {@link ReadChangeStreamResponse} and + * DeleteColumn is sent in the second {@link ReadChangeStreamResponse}. *
  • {@link ReadChangeStreamResponse.MutationChunk} has a chunked {@link * com.google.bigtable.v2.Mutation.SetCell} mutation. For example, a logical mutation has one * big {@link Mutation.SetCell} mutation which is chunked into two {@link @@ -51,8 +51,8 @@ *
      *
    • {@link ReadChangeStreamResponse.Heartbeat}s. *
    • {@link ReadChangeStreamResponse.CloseStream}s. - *
    • {@link ReadChangeStreamResponse.DataChange}s, that must be merged to form logical - * mutations. + *
    • {@link ReadChangeStreamResponse.DataChange}s, that must be merged to a + * ChangeStreamMutation. *
    • ChangeStreamRecord consumption events that reset the state machine for the next change * stream record. *
    @@ -62,7 +62,7 @@ *
      *
    • Heartbeat records. *
    • CloseStream records. - *
    • Logical mutation records. + *
    • ChangeStreamMutation records. *
    * *

    Expected Usage: @@ -182,8 +182,8 @@ void handleDataChange(ReadChangeStreamResponse.DataChange dataChange) { } /** - * Returns the completed change stream record and transitions to awaiting a new change stream - * record. + * Returns the completed change stream record and transitions to {@link + * ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD}. * * @return The completed change stream record. * @throws IllegalStateException If the last dataChange did not complete a change stream record. @@ -193,7 +193,7 @@ ChangeStreamRecordT consumeChangeStreamRecord() { completeChangeStreamRecord != null, "No change stream record to consume."); Preconditions.checkState( currentState == AWAITING_STREAM_RECORD_CONSUME, - "Change stream record not ready to consume: " + currentState); + "Change stream record is not ready to consume: " + currentState); ChangeStreamRecordT changeStreamRecord = completeChangeStreamRecord; reset(); return changeStreamRecord; @@ -201,7 +201,7 @@ ChangeStreamRecordT consumeChangeStreamRecord() { /** Checks if there is a complete change stream record to be consumed. */ boolean hasCompleteChangeStreamRecord() { - return currentState == AWAITING_STREAM_RECORD_CONSUME; + return completeChangeStreamRecord != null && currentState == AWAITING_STREAM_RECORD_CONSUME; } /** * Checks if the state machine is in the middle of processing a change stream record. @@ -253,7 +253,8 @@ ChangeStreamStateMachine.State handleCloseStream( } /** - * Accepts a new mod and transitions to the next state. + * Accepts a new mod and transitions to the next state. A mod could be a DeleteFamily, a + * DeleteColumn, or a SetCell. * * @param dataChange The DataChange that holds the new mod to process. * @param index The index of the mod in the DataChange. @@ -451,7 +452,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { Mutation.SetCell setCell = chunk.getMutation().getSetCell(); numCellChunks++; builder.cellValue(setCell.getValue()); - // Case 1: Current SetCell is chunked. + // Case 1: Current SetCell is chunked. For example: [ReadChangeStreamResponse1: + // {DeleteColumn, DeleteFamily, SetCell_1}, ReadChangeStreamResponse2: {SetCell_2, + // DeleteFamily}]. if (chunk.hasChunkInfo()) { validate( chunk.getChunkInfo().getChunkedValueSize() > 0, @@ -468,14 +471,16 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { + "; actual total size: " + actualTotalSizeOfChunkedSetCell); return checkAndFinishMutationIfNeeded(dataChange, index + 1); + } else { + // If this is not the last chunk of a chunked SetCell, then this must be the last mod + // of the current response, and we're expecting the rest of the chunked cells in the + // following ReadChangeStream response. + validate( + index == dataChange.getChunksCount() - 1, + "AWAITING_CELL_VALUE: Current mod is a chunked SetCell " + + "but not the last chunk, but it's not the last mod of the current response."); + return AWAITING_CELL_VALUE; } - // If this is not the last chunk of a chunked SetCell, then this must be the last mod of - // the current response, and we're expecting the rest of the chunked cells in the - // following ReadChangeStream response. - validate( - index == dataChange.getChunksCount() - 1, - "AWAITING_CELL_VALUE: Current SetCell is not chunked."); - return AWAITING_CELL_VALUE; } // Case 2: Current SetCell is not chunked. builder.finishCell(); @@ -485,12 +490,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) { /** * A state that represents a completed change stream record. It prevents new change stream records - * from being read until the current one has been consumed. - * - *

    - *
    Valid exit states: - *
    {@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD}. - *
    + * from being read until the current one has been consumed. The caller is supposed to consume the + * change stream record by calling {@link ChangeStreamStateMachine#consumeChangeStreamRecord()} + * which will reset the state to {@link ChangeStreamStateMachine#AWAITING_NEW_STREAM_RECORD}. */ private final State AWAITING_STREAM_RECORD_CONSUME = new State() {