Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ChangeStreamRecordAdapter and ChangeStreamStateMachine #1334

Merged
merged 4 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,28 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o;
return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode());
ChangeStreamMutation other = (ChangeStreamMutation) o;
return Objects.equal(this.rowKey, other.rowKey)
&& Objects.equal(this.type, other.type)
&& Objects.equal(this.sourceClusterId, other.sourceClusterId)
&& Objects.equal(this.commitTimestamp, other.commitTimestamp)
&& Objects.equal(this.tieBreaker, other.tieBreaker)
&& Objects.equal(this.token, other.token)
&& Objects.equal(this.lowWatermark, other.lowWatermark)
&& Objects.equal(this.entries.build(), other.entries.build());
}

@Override
public int hashCode() {
return Objects.hashCode(
rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries);
rowKey,
type,
sourceClusterId,
commitTimestamp,
tieBreaker,
token,
lowWatermark,
entries.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import javax.annotation.Nonnull;

/**
* An extension point that allows end users to plug in a custom implementation of logical change
* stream records. This is useful in cases where the user would like to apply advanced client side
* filtering(for example, only keep DeleteFamily in the mutations). This adapter acts like a factory
* for a SAX style change stream record builder.
*/
public interface ChangeStreamRecordAdapter<ChangeStreamRecordT> {
/** Creates a new instance of a {@link ChangeStreamRecordBuilder}. */
ChangeStreamRecordBuilder<ChangeStreamRecordT> createChangeStreamRecordBuilder();

/** Checks if the given change stream record is a Heartbeat. */
@InternalApi("Used in Changestream beam pipeline.")
boolean isHeartbeat(ChangeStreamRecordT record);

/**
* Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will
* throw an Exception.
*/
@InternalApi("Used in Changestream beam pipeline.")
String getTokenFromHeartbeat(ChangeStreamRecordT heartbeatRecord);

/** Checks if the given change stream record is a ChangeStreamMutation. */
@InternalApi("Used in Changestream beam pipeline.")
boolean isChangeStreamMutation(ChangeStreamRecordT record);

/**
* Get the token from the given ChangeStreamMutation record. If the given record is not a
* ChangeStreamMutation, it will throw an Exception.
*/
@InternalApi("Used in Changestream beam pipeline.")
String getTokenFromChangeStreamMutation(ChangeStreamRecordT record);

/**
* A SAX style change stream record factory. It is responsible for creating one of the three types
* of change stream record: heartbeat, close stream, and a change stream mutation.
*
* <p>State management is handled external to the implementation of this class:
*
* <ol>
* Case 1: Heartbeat
* <li>Exactly 1 {@code onHeartbeat}.
* </ol>
*
* <ol>
* Case 2: CloseStream
* <li>Exactly 1 {@code onCloseStream}.
* </ol>
*
* <ol>
* Case 3: ChangeStreamMutation. A change stream mutation consists of one or more mods, where
* the SetCells might be chunked. There are 3 different types of mods that a ReadChangeStream
* response can have:
* <li>DeleteFamily -> Exactly 1 {@code deleteFamily}
* <li>DeleteCell -> Exactly 1 {@code deleteCell}
* <li>SetCell -> Exactly 1 {@code startCell}, At least 1 {@code CellValue}, Exactly 1 {@code
* finishCell}.
* </ol>
*
* <p>The whole flow of constructing a ChangeStreamMutation is:
*
* <ol>
* <li>Exactly 1 {@code startUserMutation} or {@code startGcMutation}.
* <li>At least 1 DeleteFamily/DeleteCell/SetCell mods.
* <li>Exactly 1 {@code finishChangeStreamMutation}.
* </ol>
*
* <p>Note: For a non-chunked SetCell, only 1 {@code CellValue} will be called. For a chunked
* SetCell, more than 1 {@code CellValue}s will be called.
*
* <p>Note: DeleteRow's won't appear in data changes since they'll be converted to multiple
* DeleteFamily's.
*/
interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
/**
* Called to create a heartbeat. This will be called at most once. If called, the current change
* stream record must not include any data changes or close stream messages.
*/
ChangeStreamRecordT onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat);

/**
* Called to create a close stream message. This will be called at most once. If called, the
* current change stream record must not include any data changes or heartbeats.
*/
ChangeStreamRecordT onCloseStream(ReadChangeStreamResponse.CloseStream closeStream);

/**
* Called to start a new user initiated ChangeStreamMutation. This will be called at most once.
* If called, the current change stream record must not include any close stream message or
* heartbeat.
*/
void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker);

/**
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
* once. If called, the current change stream record must not include any close stream message
* or heartbeat.
*/
void startGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker);

/** Called to add a DeleteFamily mod. */
void deleteFamily(@Nonnull String familyName);

/** Called to add a DeleteCell mod. */
void deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange);

/**
* Called to start a SetCell.
*
* <ol>
* In case of a non-chunked cell, the following order is guaranteed:
* <li>Exactly 1 {@code startCell}.
* <li>Exactly 1 {@code cellValue}.
* <li>Exactly 1 {@code finishCell}.
* </ol>
*
* <ol>
* In case of a chunked cell, the following order is guaranteed:
* <li>Exactly 1 {@code startCell}.
* <li>At least 2 {@code cellValue}.
* <li>Exactly 1 {@code finishCell}.
* </ol>
*/
void startCell(String family, ByteString qualifier, long timestampMicros);

/**
* Called once per non-chunked cell, or at least twice per chunked cell to concatenate the cell
* value.
*/
void cellValue(ByteString value);

/** Called once per cell to signal the end of the value (unless reset). */
void finishCell();

/** Called once per stream record to signal that all mods have been processed (unless reset). */
ChangeStreamRecordT finishChangeStreamMutation(
@Nonnull String token, @Nonnull Timestamp lowWatermark);

/** Called when the current in progress change stream record should be dropped */
void reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import javax.annotation.Nonnull;

/**
* Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link
* ChangeStreamRecord}s to represent change stream records.
*/
public class DefaultChangeStreamRecordAdapter
implements ChangeStreamRecordAdapter<ChangeStreamRecord> {

/** {@inheritDoc} */
@Override
public ChangeStreamRecordBuilder<ChangeStreamRecord> createChangeStreamRecordBuilder() {
return new DefaultChangeStreamRecordBuilder();
}

/** {@inheritDoc} */
@Override
public boolean isHeartbeat(ChangeStreamRecord record) {
return record instanceof Heartbeat;
}

/** {@inheritDoc} */
@Override
public String getTokenFromHeartbeat(ChangeStreamRecord record) {
Preconditions.checkArgument(isHeartbeat(record), "record is not a Heartbeat.");
return ((Heartbeat) record).getChangeStreamContinuationToken().getToken();
}

/** {@inheritDoc} */
@Override
public boolean isChangeStreamMutation(ChangeStreamRecord record) {
return record instanceof ChangeStreamMutation;
}

/** {@inheritDoc} */
@Override
public String getTokenFromChangeStreamMutation(ChangeStreamRecord record) {
Preconditions.checkArgument(
isChangeStreamMutation(record), "record is not a ChangeStreamMutation.");
return ((ChangeStreamMutation) record).getToken();
}

/** {@inheritDoc} */
static class DefaultChangeStreamRecordBuilder
implements ChangeStreamRecordBuilder<ChangeStreamRecord> {
private ChangeStreamMutation.Builder changeStreamMutationBuilder = null;

// For the current SetCell.
private String family;
private ByteString qualifier;
private long timestampMicros;
private ByteString value;

public DefaultChangeStreamRecordBuilder() {
reset();
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord onHeartbeat(ReadChangeStreamResponse.Heartbeat heartbeat) {
Preconditions.checkArgument(
this.changeStreamMutationBuilder == null,
"Can not create a Heartbeat when there is an existing ChangeStreamMutation being built.");
return Heartbeat.fromProto(heartbeat);
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream closeStream) {
Preconditions.checkArgument(
this.changeStreamMutationBuilder == null,
"Can not create a CloseStream when there is an existing ChangeStreamMutation being built.");
return CloseStream.fromProto(closeStream);
}

/** {@inheritDoc} */
@Override
public void startUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createUserMutation(
rowKey, sourceClusterId, commitTimestamp, tieBreaker);
}

/** {@inheritDoc} */
@Override
public void startGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
this.changeStreamMutationBuilder =
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
}

/** {@inheritDoc} */
@Override
public void deleteFamily(@Nonnull String familyName) {
this.changeStreamMutationBuilder.deleteFamily(familyName);
}

/** {@inheritDoc} */
@Override
public void deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.changeStreamMutationBuilder.deleteCells(familyName, qualifier, timestampRange);
}

/** {@inheritDoc} */
@Override
public void startCell(String family, ByteString qualifier, long timestampMicros) {
this.family = family;
this.qualifier = qualifier;
this.timestampMicros = timestampMicros;
this.value = ByteString.EMPTY;
}

/** {@inheritDoc} */
@Override
public void cellValue(ByteString value) {
this.value = this.value.concat(value);
}

/** {@inheritDoc} */
@Override
public void finishCell() {
this.changeStreamMutationBuilder.setCell(
this.family, this.qualifier, this.timestampMicros, this.value);
}

/** {@inheritDoc} */
@Override
public ChangeStreamRecord finishChangeStreamMutation(
@Nonnull String token, @Nonnull Timestamp lowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setLowWatermark(lowWatermark);
return this.changeStreamMutationBuilder.build();
}

/** {@inheritDoc} */
@Override
public void reset() {
changeStreamMutationBuilder = null;

family = null;
qualifier = null;
timestampMicros = 0;
value = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;

public static Heartbeat create(
private static Heartbeat create(
ChangeStreamContinuationToken changeStreamContinuationToken, Timestamp lowWatermark) {
return new AutoValue_Heartbeat(changeStreamContinuationToken, lowWatermark);
}
Expand Down
Loading