Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Return MutationType and bigtable.common.Status instead of raw p… #1359

Merged
merged 3 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -69,10 +68,14 @@
public class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 8419520253162024218L;

public enum MutationType {
USER,
GARBAGE_COLLECTION
}

private final ByteString rowKey;

/** Possible values: USER/GARBAGE_COLLECTION. */
private final Type type;
private final MutationType type;

/** This should only be set when type==USER. */
private final String sourceClusterId;
Expand Down Expand Up @@ -108,7 +111,7 @@ static Builder createUserMutation(
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker) {
return new Builder(rowKey, Type.USER, sourceClusterId, commitTimestamp, tieBreaker);
return new Builder(rowKey, MutationType.USER, sourceClusterId, commitTimestamp, tieBreaker);
}

/**
Expand All @@ -118,7 +121,7 @@ static Builder createUserMutation(
*/
static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
return new Builder(rowKey, MutationType.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
Expand All @@ -142,7 +145,7 @@ public ByteString getRowKey() {

/** Get the type of the current mutation. */
@Nonnull
public Type getType() {
public MutationType getType() {
return this.type;
}

Expand Down Expand Up @@ -190,7 +193,7 @@ Builder toBuilder() {
public static class Builder {
private final ByteString rowKey;

private final Type type;
private final MutationType type;

private final String sourceClusterId;

Expand All @@ -206,7 +209,7 @@ public static class Builder {

private Builder(
ByteString rowKey,
Type type,
MutationType type,
String sourceClusterId,
Timestamp commitTimestamp,
int tieBreaker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ private CloseStream(Status status, List<StreamContinuationToken> continuationTok
}

@InternalApi("Used in Changestream beam pipeline.")
public Status getStatus() {
return this.status;
public com.google.cloud.bigtable.common.Status getStatus() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why long import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I already import import com.google.rpc.Status;. CloseStream holds the proto type status (i.e. com.google.rpc.Status), and getStatus() return the wrapper class(i.e. com.google.cloud.bigtable.common.Status)

return com.google.cloud.bigtable.common.Status.fromProto(this.status);
}

@InternalApi("Used in Changestream beam pipeline.")
Expand Down Expand Up @@ -88,21 +88,21 @@ public boolean equals(Object o) {
return false;
}
CloseStream record = (CloseStream) o;
return Objects.equal(status, record.getStatus())
return Objects.equal(getStatus(), record.getStatus())
&& Objects.equal(
changeStreamContinuationTokens.build(), record.getChangeStreamContinuationTokens());
getChangeStreamContinuationTokens(), record.getChangeStreamContinuationTokens());
}

@Override
public int hashCode() {
return Objects.hashCode(status, changeStreamContinuationTokens);
return Objects.hashCode(getStatus(), getChangeStreamContinuationTokens());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("status", status)
.add("changeStreamContinuationTokens", changeStreamContinuationTokens)
.add("status", getStatus())
.add("changeStreamContinuationTokens", getChangeStreamContinuationTokens())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.common.primitives.Longs;
Expand Down Expand Up @@ -72,8 +71,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti

// Test the getters.
Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key"));
Assert.assertEquals(
changeStreamMutation.getType(), ReadChangeStreamResponse.DataChange.Type.USER);
Assert.assertEquals(changeStreamMutation.getType(), ChangeStreamMutation.MutationType.USER);
Assert.assertEquals(changeStreamMutation.getSourceClusterId(), "fake-source-cluster-id");
Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp);
Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0);
Expand Down Expand Up @@ -115,8 +113,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException {
// Test the getters.
Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key"));
Assert.assertEquals(
changeStreamMutation.getType(),
ReadChangeStreamResponse.DataChange.Type.GARBAGE_COLLECTION);
changeStreamMutation.getType(), ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
Assert.assertNull(changeStreamMutation.getSourceClusterId());
Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp);
Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void closeStreamTest() {
.build();
CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto);

Assert.assertEquals(status, actualCloseStream.getStatus());
Assert.assertEquals(status, actualCloseStream.getStatus().toProto());
Assert.assertEquals(
actualCloseStream.getChangeStreamContinuationTokens().get(0).getPartition(),
ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void closeStreamTest() {
ChangeStreamRecord record = results.get(0);
Assert.assertTrue(record instanceof CloseStream);
CloseStream closeStream = (CloseStream) record;
Assert.assertEquals(closeStream.getStatus(), closeStreamProto.getStatus());
Assert.assertEquals(closeStream.getStatus().toProto(), closeStreamProto.getStatus());
Assert.assertEquals(closeStream.getChangeStreamContinuationTokens().size(), 1);
ChangeStreamContinuationToken changeStreamContinuationToken =
closeStream.getChangeStreamContinuationTokens().get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.ReadChangeStreamRequest;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
Expand Down Expand Up @@ -151,7 +152,8 @@ public void test() throws Exception {
} else if (record instanceof CloseStream) {
CloseStream closeStream = (CloseStream) record;
ReadChangeStreamResponse.CloseStream.Builder builder =
ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(closeStream.getStatus());
ReadChangeStreamResponse.CloseStream.newBuilder()
.setStatus(closeStream.getStatus().toProto());
for (ChangeStreamContinuationToken token :
closeStream.getChangeStreamContinuationTokens()) {
builder.addContinuationTokens(
Expand Down Expand Up @@ -179,7 +181,14 @@ public void test() throws Exception {
ReadChangeStreamTest.TestChangeStreamMutation.Builder builder =
ReadChangeStreamTest.TestChangeStreamMutation.newBuilder();
builder.setRowKey(changeStreamMutation.getRowKey());
builder.setType(changeStreamMutation.getType());
Type type = Type.UNRECOGNIZED;
if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) {
type = Type.USER;
} else if (changeStreamMutation.getType()
== ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) {
type = Type.GARBAGE_COLLECTION;
}
builder.setType(type);
if (changeStreamMutation.getSourceClusterId() != null) {
builder.setSourceClusterId(changeStreamMutation.getSourceClusterId());
}
Expand Down