Skip to content

Commit

Permalink
feat: Return MutationType and bigtable.common.Status instead of raw p… (
Browse files Browse the repository at this point in the history
#1359)

* feat: Return MutationType and bigtable.common.Status instead of raw protos

* fix: remove unused import

* fix: fix test

Co-authored-by: Teng Zhong <tengzhong@google.com>
  • Loading branch information
tengzhonger and Teng Zhong authored Aug 16, 2022
1 parent c3086d9 commit 351a151
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -69,10 +68,14 @@
public class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 8419520253162024218L;

public enum MutationType {
USER,
GARBAGE_COLLECTION
}

private final ByteString rowKey;

/** Possible values: USER/GARBAGE_COLLECTION. */
private final Type type;
private final MutationType type;

/** This should only be set when type==USER. */
private final String sourceClusterId;
Expand Down Expand Up @@ -108,7 +111,7 @@ static Builder createUserMutation(
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker) {
return new Builder(rowKey, Type.USER, sourceClusterId, commitTimestamp, tieBreaker);
return new Builder(rowKey, MutationType.USER, sourceClusterId, commitTimestamp, tieBreaker);
}

/**
Expand All @@ -118,7 +121,7 @@ static Builder createUserMutation(
*/
static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
return new Builder(rowKey, MutationType.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
Expand All @@ -142,7 +145,7 @@ public ByteString getRowKey() {

/** Get the type of the current mutation. */
@Nonnull
public Type getType() {
public MutationType getType() {
return this.type;
}

Expand Down Expand Up @@ -190,7 +193,7 @@ Builder toBuilder() {
public static class Builder {
private final ByteString rowKey;

private final Type type;
private final MutationType type;

private final String sourceClusterId;

Expand All @@ -206,7 +209,7 @@ public static class Builder {

private Builder(
ByteString rowKey,
Type type,
MutationType type,
String sourceClusterId,
Timestamp commitTimestamp,
int tieBreaker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ private CloseStream(Status status, List<StreamContinuationToken> continuationTok
}

@InternalApi("Used in Changestream beam pipeline.")
public Status getStatus() {
return this.status;
public com.google.cloud.bigtable.common.Status getStatus() {
return com.google.cloud.bigtable.common.Status.fromProto(this.status);
}

@InternalApi("Used in Changestream beam pipeline.")
Expand Down Expand Up @@ -88,21 +88,21 @@ public boolean equals(Object o) {
return false;
}
CloseStream record = (CloseStream) o;
return Objects.equal(status, record.getStatus())
return Objects.equal(getStatus(), record.getStatus())
&& Objects.equal(
changeStreamContinuationTokens.build(), record.getChangeStreamContinuationTokens());
getChangeStreamContinuationTokens(), record.getChangeStreamContinuationTokens());
}

@Override
public int hashCode() {
return Objects.hashCode(status, changeStreamContinuationTokens);
return Objects.hashCode(getStatus(), getChangeStreamContinuationTokens());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("status", status)
.add("changeStreamContinuationTokens", changeStreamContinuationTokens)
.add("status", getStatus())
.add("changeStreamContinuationTokens", getChangeStreamContinuationTokens())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.common.primitives.Longs;
Expand Down Expand Up @@ -72,8 +71,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti

// Test the getters.
Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key"));
Assert.assertEquals(
changeStreamMutation.getType(), ReadChangeStreamResponse.DataChange.Type.USER);
Assert.assertEquals(changeStreamMutation.getType(), ChangeStreamMutation.MutationType.USER);
Assert.assertEquals(changeStreamMutation.getSourceClusterId(), "fake-source-cluster-id");
Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp);
Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0);
Expand Down Expand Up @@ -115,8 +113,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException {
// Test the getters.
Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key"));
Assert.assertEquals(
changeStreamMutation.getType(),
ReadChangeStreamResponse.DataChange.Type.GARBAGE_COLLECTION);
changeStreamMutation.getType(), ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
Assert.assertNull(changeStreamMutation.getSourceClusterId());
Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp);
Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void closeStreamTest() {
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);

Assert.assertEquals(status, actualCloseStream.getStatus());
Assert.assertEquals(status, actualCloseStream.getStatus().toProto());
Assert.assertEquals(
actualCloseStream.getChangeStreamContinuationTokens().get(0).getPartition(),
ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void closeStreamTest() {
ChangeStreamRecord record = results.get(0);
Assert.assertTrue(record instanceof CloseStream);
CloseStream closeStream = (CloseStream) record;
Assert.assertEquals(closeStream.getStatus(), closeStreamProto.getStatus());
Assert.assertEquals(closeStream.getStatus().toProto(), closeStreamProto.getStatus());
Assert.assertEquals(closeStream.getChangeStreamContinuationTokens().size(), 1);
ChangeStreamContinuationToken changeStreamContinuationToken =
closeStream.getChangeStreamContinuationTokens().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
Expand Down Expand Up @@ -151,7 +152,8 @@ public void test() throws Exception {
} else if (record instanceof CloseStream) {
CloseStream closeStream = (CloseStream) record;
ReadChangeStreamResponse.CloseStream.Builder builder =
ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(closeStream.getStatus());
ReadChangeStreamResponse.CloseStream.newBuilder()
.setStatus(closeStream.getStatus().toProto());
for (ChangeStreamContinuationToken token :
closeStream.getChangeStreamContinuationTokens()) {
builder.addContinuationTokens(
Expand Down Expand Up @@ -179,7 +181,14 @@ public void test() throws Exception {
ReadChangeStreamTest.TestChangeStreamMutation.Builder builder =
ReadChangeStreamTest.TestChangeStreamMutation.newBuilder();
builder.setRowKey(changeStreamMutation.getRowKey());
builder.setType(changeStreamMutation.getType());
Type type = Type.UNRECOGNIZED;
if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
type = Type.USER;
} else if (changeStreamMutation.getType()
== ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
type = Type.GARBAGE_COLLECTION;
}
builder.setType(type);
if (changeStreamMutation.getSourceClusterId() != null) {
builder.setSourceClusterId(changeStreamMutation.getSourceClusterId());
}
Expand Down

0 comments on commit 351a151

Please sign in to comment.