From 351a1513d75136de3e6147dd4d65600bd2c9d06b Mon Sep 17 00:00:00 2001 From: tengzhonger <109308630+tengzhonger@users.noreply.github.com> Date: Tue, 16 Aug 2022 10:49:50 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Return=20MutationType=20and=20bigtable.?= =?UTF-8?q?common.Status=20instead=20of=20raw=20p=E2=80=A6=20(#1359)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Return MutationType and bigtable.common.Status instead of raw protos * fix: remove unused import * fix: fix test Co-authored-by: Teng Zhong --- .../data/v2/models/ChangeStreamMutation.java | 19 +++++++++++-------- .../bigtable/data/v2/models/CloseStream.java | 14 +++++++------- .../v2/models/ChangeStreamMutationTest.java | 7 ++----- .../v2/models/ChangeStreamRecordTest.java | 2 +- ...ChangeStreamRecordMergingCallableTest.java | 2 +- ...ReadChangeStreamMergingAcceptanceTest.java | 13 +++++++++++-- 6 files changed, 33 insertions(+), 24 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index cfb8bb30b7..3d11d5d4fc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -16,7 +16,6 @@ package com.google.cloud.bigtable.data.v2.models; import com.google.api.core.InternalExtensionOnly; -import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger; import com.google.common.base.MoreObjects; @@ -69,10 +68,14 @@ public class ChangeStreamMutation implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 8419520253162024218L; + public enum MutationType { + USER, + GARBAGE_COLLECTION + } + private final ByteString rowKey; - /** Possible values: USER/GARBAGE_COLLECTION. */ - private final Type type; + private final MutationType type; /** This should only be set when type==USER. */ private final String sourceClusterId; @@ -108,7 +111,7 @@ static Builder createUserMutation( @Nonnull String sourceClusterId, @Nonnull Timestamp commitTimestamp, int tieBreaker) { - return new Builder(rowKey, Type.USER, sourceClusterId, commitTimestamp, tieBreaker); + return new Builder(rowKey, MutationType.USER, sourceClusterId, commitTimestamp, tieBreaker); } /** @@ -118,7 +121,7 @@ static Builder createUserMutation( */ static Builder createGcMutation( @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) { - return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker); + return new Builder(rowKey, MutationType.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker); } private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { @@ -142,7 +145,7 @@ public ByteString getRowKey() { /** Get the type of the current mutation. */ @Nonnull - public Type getType() { + public MutationType getType() { return this.type; } @@ -190,7 +193,7 @@ Builder toBuilder() { public static class Builder { private final ByteString rowKey; - private final Type type; + private final MutationType type; private final String sourceClusterId; @@ -206,7 +209,7 @@ public static class Builder { private Builder( ByteString rowKey, - Type type, + MutationType type, String sourceClusterId, Timestamp commitTimestamp, int tieBreaker) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index 346b0b60a7..795e05029a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -50,8 +50,8 @@ private CloseStream(Status status, List continuationTok } @InternalApi("Used in Changestream beam pipeline.") - public Status getStatus() { - return this.status; + public com.google.cloud.bigtable.common.Status getStatus() { + return com.google.cloud.bigtable.common.Status.fromProto(this.status); } @InternalApi("Used in Changestream beam pipeline.") @@ -88,21 +88,21 @@ public boolean equals(Object o) { return false; } CloseStream record = (CloseStream) o; - return Objects.equal(status, record.getStatus()) + return Objects.equal(getStatus(), record.getStatus()) && Objects.equal( - changeStreamContinuationTokens.build(), record.getChangeStreamContinuationTokens()); + getChangeStreamContinuationTokens(), record.getChangeStreamContinuationTokens()); } @Override public int hashCode() { - return Objects.hashCode(status, changeStreamContinuationTokens); + return Objects.hashCode(getStatus(), getChangeStreamContinuationTokens()); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("status", status) - .add("changeStreamContinuationTokens", changeStreamContinuationTokens) + .add("status", getStatus()) + .add("changeStreamContinuationTokens", getChangeStreamContinuationTokens()) .toString(); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java index a14fe001cd..1052a1646d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java @@ -19,7 +19,6 @@ import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowsRequest; -import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.common.primitives.Longs; @@ -72,8 +71,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti // Test the getters. Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key")); - Assert.assertEquals( - changeStreamMutation.getType(), ReadChangeStreamResponse.DataChange.Type.USER); + Assert.assertEquals(changeStreamMutation.getType(), ChangeStreamMutation.MutationType.USER); Assert.assertEquals(changeStreamMutation.getSourceClusterId(), "fake-source-cluster-id"); Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp); Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0); @@ -115,8 +113,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException { // Test the getters. Assert.assertEquals(changeStreamMutation.getRowKey(), ByteString.copyFromUtf8("key")); Assert.assertEquals( - changeStreamMutation.getType(), - ReadChangeStreamResponse.DataChange.Type.GARBAGE_COLLECTION); + changeStreamMutation.getType(), ChangeStreamMutation.MutationType.GARBAGE_COLLECTION); Assert.assertNull(changeStreamMutation.getSourceClusterId()); Assert.assertEquals(changeStreamMutation.getCommitTimestamp(), fakeCommitTimestamp); Assert.assertEquals(changeStreamMutation.getTieBreaker(), 0); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index c6aa7580dd..ed54628f67 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -157,7 +157,7 @@ public void closeStreamTest() { .build(); CloseStream actualCloseStream = CloseStream.fromProto(closeStreamProto); - Assert.assertEquals(status, actualCloseStream.getStatus()); + Assert.assertEquals(status, actualCloseStream.getStatus().toProto()); Assert.assertEquals( actualCloseStream.getChangeStreamContinuationTokens().get(0).getPartition(), ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen())); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index 5cc04f764d..56cc5aa845 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -114,7 +114,7 @@ public void closeStreamTest() { ChangeStreamRecord record = results.get(0); Assert.assertTrue(record instanceof CloseStream); CloseStream closeStream = (CloseStream) record; - Assert.assertEquals(closeStream.getStatus(), closeStreamProto.getStatus()); + Assert.assertEquals(closeStream.getStatus().toProto(), closeStreamProto.getStatus()); Assert.assertEquals(closeStream.getChangeStreamContinuationTokens().size(), 1); ChangeStreamContinuationToken changeStreamContinuationToken = closeStream.getChangeStreamContinuationTokens().get(0); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java index ef8b9fec9f..483562aa4a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java @@ -24,6 +24,7 @@ import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.ReadChangeStreamRequest; import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.StreamContinuationToken; import com.google.bigtable.v2.StreamPartition; @@ -151,7 +152,8 @@ public void test() throws Exception { } else if (record instanceof CloseStream) { CloseStream closeStream = (CloseStream) record; ReadChangeStreamResponse.CloseStream.Builder builder = - ReadChangeStreamResponse.CloseStream.newBuilder().setStatus(closeStream.getStatus()); + ReadChangeStreamResponse.CloseStream.newBuilder() + .setStatus(closeStream.getStatus().toProto()); for (ChangeStreamContinuationToken token : closeStream.getChangeStreamContinuationTokens()) { builder.addContinuationTokens( @@ -179,7 +181,14 @@ public void test() throws Exception { ReadChangeStreamTest.TestChangeStreamMutation.Builder builder = ReadChangeStreamTest.TestChangeStreamMutation.newBuilder(); builder.setRowKey(changeStreamMutation.getRowKey()); - builder.setType(changeStreamMutation.getType()); + Type type = Type.UNRECOGNIZED; + if (changeStreamMutation.getType() == ChangeStreamMutation.MutationType.USER) { + type = Type.USER; + } else if (changeStreamMutation.getType() + == ChangeStreamMutation.MutationType.GARBAGE_COLLECTION) { + type = Type.GARBAGE_COLLECTION; + } + builder.setType(type); if (changeStreamMutation.getSourceClusterId() != null) { builder.setSourceClusterId(changeStreamMutation.getSourceClusterId()); }