Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for MergeToCell API #2258

Merged
merged 16 commits into from
Jul 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ Builder addToCell(@Nonnull String familyName, Value qualifier, Value timestamp,
return this;
}

Builder mergeToCell(@Nonnull String familyName, Value qualifier, Value timestamp, Value input) {
this.entriesBuilder().add(MergeToCell.create(familyName, qualifier, timestamp, input));
return this;
}

abstract ChangeStreamMutation build();
}

Expand Down Expand Up @@ -210,6 +215,13 @@ public RowMutation toRowMutation(@Nonnull String tableId) {
addToCell.getQualifier(),
addToCell.getTimestamp(),
addToCell.getInput());
} else if (entry instanceof MergeToCell) {
MergeToCell mergeToCell = (MergeToCell) entry;
rowMutation.mergeToCell(
mergeToCell.getFamily(),
mergeToCell.getQualifier(),
mergeToCell.getTimestamp(),
mergeToCell.getInput());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
Expand Down Expand Up @@ -242,6 +254,13 @@ public RowMutationEntry toRowMutationEntry() {
addToCell.getQualifier(),
addToCell.getTimestamp(),
addToCell.getInput());
} else if (entry instanceof MergeToCell) {
MergeToCell mergeToCell = (MergeToCell) entry;
rowMutationEntry.mergeToCell(
mergeToCell.getFamily(),
mergeToCell.getQualifier(),
mergeToCell.getTimestamp(),
mergeToCell.getInput());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ void addToCell(
@Nonnull Value timestamp,
@Nonnull Value value);

void mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value value);

/**
* Called to start a SetCell.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ public void addToCell(
this.changeStreamMutationBuilder.addToCell(familyName, qualifier, timestamp, input);
}

@Override
public void mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
this.changeStreamMutationBuilder.mergeToCell(familyName, qualifier, timestamp, input);
}

/** {@inheritDoc} */
@Override
public void startCell(String family, ByteString qualifier, long timestampMicros) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** Representation of an MergeToCell mod in a data change. */
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@AutoValue
public abstract class MergeToCell implements Entry, Serializable {
public static MergeToCell create(
@Nonnull String family,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
return new AutoValue_MergeToCell(family, qualifier, timestamp, input);
}

@Nonnull
public abstract String getFamily();

@Nonnull
public abstract Value getQualifier();

@Nonnull
public abstract Value getTimestamp();

@Nonnull
public abstract Value getInput();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.bigtable.v2.Mutation.DeleteFromColumn;
import com.google.bigtable.v2.Mutation.DeleteFromFamily;
import com.google.bigtable.v2.Mutation.DeleteFromRow;
import com.google.bigtable.v2.Mutation.MergeToCell;
import com.google.bigtable.v2.Mutation.SetCell;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -308,6 +309,24 @@ public Mutation addToCell(
return this;
}

@Override
public Mutation mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value value) {
com.google.bigtable.v2.Mutation.Builder builder = com.google.bigtable.v2.Mutation.newBuilder();
MergeToCell.Builder mergeToCellBuilder = builder.getMergeToCellBuilder();
mergeToCellBuilder.setFamilyName(familyName);

qualifier.buildTo(mergeToCellBuilder.getColumnQualifierBuilder());
timestamp.buildTo(mergeToCellBuilder.getTimestampBuilder());
value.buildTo(mergeToCellBuilder.getInputBuilder());

addMutation(builder.build());
return this;
}

private void addMutation(com.google.bigtable.v2.Mutation mutation) {
Preconditions.checkState(numMutations + 1 <= MAX_MUTATIONS, "Too many mutations per row");
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ default T addToCell(
return addToCell(familyName, ByteString.copyFromUtf8(qualifier), timestamp, value);
}

/**
* Merges a ByteString accumulator value to a cell in an aggregate column family.
*
* <p>This is a convenience override that converts Strings to ByteStrings.
*
* <p>Note: The timestamp values are in microseconds but must match the granularity of the
* table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
* granularity). For example: `1571902339435000`.
*/
default T mergeToCell(
@Nonnull String familyName, @Nonnull String qualifier, long timestamp, ByteString value) {
return mergeToCell(familyName, ByteString.copyFromUtf8(qualifier), timestamp, value);
}

/**
* Adds an int64 value to an aggregate cell. The column family must be an aggregate family and
* have an "int64" input type or this mutation will be rejected.
Expand All @@ -155,6 +169,22 @@ default T addToCell(
Value.IntValue.create(input));
}

/**
* Merges a ByteString accumulator value to a cell in an aggregate column family.
*
* <p>Note: The timestamp values are in microseconds but must match the granularity of the
* table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
* granularity). For example: `1571902339435000`.
*/
default T mergeToCell(
@Nonnull String familyName, @Nonnull ByteString qualifier, long timestamp, ByteString input) {
return mergeToCell(
familyName,
Value.RawValue.create(qualifier),
Value.RawTimestamp.create(timestamp),
Value.RawValue.create(input));
}

/**
* Adds a {@link Value} to an aggregate cell. The column family must be an aggregate family and
* have an input type matching the type of {@link Value} or this mutation will be rejected.
Expand All @@ -168,4 +198,18 @@ T addToCell(
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input);

/**
* Merges a {@link Value} accumulator to an aggregate cell. The column family must be an aggregate
* family or this mutation will be rejected.
*
* <p>Note: The timestamp values are in microseconds but must match the granularity of the
* table(defaults to `MILLIS`). Therefore, the given value must be a multiple of 1000 (millisecond
* granularity). For example: `1571902339435000`.
*/
T mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ public RowMutation addToCell(
return this;
}

@Override
public RowMutation mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
mutation.mergeToCell(familyName, qualifier, timestamp, input);
return this;
}

@InternalApi
public MutateRowRequest toProto(RequestContext requestContext) {
MutateRowRequest.Builder builder = MutateRowRequest.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ public RowMutationEntry addToCell(
return this;
}

@Override
public RowMutationEntry mergeToCell(
@Nonnull String familyName,
@Nonnull Value qualifier,
@Nonnull Value timestamp,
@Nonnull Value input) {
mutation.mergeToCell(familyName, qualifier, timestamp, input);
return this;
}

@InternalApi
public MutateRowsRequest.Entry toProto() {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,16 @@ State handleDataChange(ReadChangeStreamResponse.DataChange dataChange) {
Value.fromProto(mod.getAddToCell().getColumnQualifier()),
Value.fromProto(mod.getAddToCell().getTimestamp()),
Value.fromProto(mod.getAddToCell().getInput()));
continue;
}
// Case 5: MergeToCell
if (mod.hasMergeToCell()) {
builder.mergeToCell(
mod.getMergeToCell().getFamilyName(),
Value.fromProto(mod.getMergeToCell().getColumnQualifier()),
Value.fromProto(mod.getMergeToCell().getTimestamp()),
Value.fromProto(mod.getMergeToCell().getInput()));
continue;
}
throw new IllegalStateException(
"Received unknown mod type. You may need to upgrade your Bigtable client.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.util.Base64;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -67,6 +69,11 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti
Value.rawValue(ByteString.copyFromUtf8("col1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
.mergeToCell(
"agg-family",
Value.rawValue(ByteString.copyFromUtf8("col2")),
Value.rawTimestamp(1000),
Value.rawValue(Base64.getEncoder().encode(BigInteger.valueOf(1234).toByteArray())))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
Expand Down Expand Up @@ -150,6 +157,11 @@ public void toRowMutationTest() {
Value.rawValue(ByteString.copyFromUtf8("qual1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
.mergeToCell(
"agg-family",
Value.rawValue(ByteString.copyFromUtf8("qual2")),
Value.rawTimestamp(1000),
Value.rawValue(Base64.getEncoder().encode(BigInteger.valueOf(1234).toByteArray())))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
Expand All @@ -161,7 +173,7 @@ public void toRowMutationTest() {
NameUtil.formatTableName(
REQUEST_CONTEXT.getProjectId(), REQUEST_CONTEXT.getInstanceId(), TABLE_ID);
assertThat(mutateRowRequest.getTableName()).isEqualTo(tableName);
assertThat(mutateRowRequest.getMutationsList()).hasSize(4);
assertThat(mutateRowRequest.getMutationsList()).hasSize(5);
assertThat(mutateRowRequest.getMutations(0).getSetCell().getValue())
.isEqualTo(ByteString.copyFromUtf8("fake-value"));
assertThat(mutateRowRequest.getMutations(1).getDeleteFromFamily().getFamilyName())
Expand All @@ -178,6 +190,16 @@ public void toRowMutationTest() {
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.intValue(1234).toProto())
.build());
assertThat(mutateRowRequest.getMutations(3).getMergeToCell())
.isEqualTo(
Mutation.MergeToCell.newBuilder()
.setFamilyName("agg-family")
.setColumnQualifier(Value.rawValue(ByteString.copyFromUtf8("qual2")).toProto())
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(
Value.rawValue(Base64.getEncoder().encode(BigInteger.valueOf(1234).toByteArray()))
.toProto())
.build());
}

@Test
Expand Down Expand Up @@ -220,6 +242,11 @@ public void toRowMutationEntryTest() {
Value.rawValue(ByteString.copyFromUtf8("qual1")),
Value.rawTimestamp(1000),
Value.intValue(1234))
.mergeToCell(
"agg-family",
Value.rawValue(ByteString.copyFromUtf8("qual2")),
Value.rawTimestamp(1000),
Value.rawValue(Base64.getEncoder().encode(BigInteger.valueOf(1234).toByteArray())))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();
Expand All @@ -228,7 +255,7 @@ public void toRowMutationEntryTest() {
RowMutationEntry rowMutationEntry = changeStreamMutation.toRowMutationEntry();
MutateRowsRequest.Entry mutateRowsRequestEntry = rowMutationEntry.toProto();
assertThat(mutateRowsRequestEntry.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(4);
assertThat(mutateRowsRequestEntry.getMutationsList()).hasSize(5);
assertThat(mutateRowsRequestEntry.getMutations(0).getSetCell().getValue())
.isEqualTo(ByteString.copyFromUtf8("fake-value"));
assertThat(mutateRowsRequestEntry.getMutations(1).getDeleteFromFamily().getFamilyName())
Expand All @@ -245,6 +272,16 @@ public void toRowMutationEntryTest() {
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(Value.intValue(1234).toProto())
.build());
assertThat(mutateRowsRequestEntry.getMutations(4).getMergeToCell())
.isEqualTo(
Mutation.MergeToCell.newBuilder()
.setFamilyName("agg-family")
.setColumnQualifier(Value.rawValue(ByteString.copyFromUtf8("qual2")).toProto())
.setTimestamp(Value.rawTimestamp(1000).toProto())
.setInput(
Value.rawValue(Base64.getEncoder().encode(BigInteger.valueOf(1234).toByteArray()))
.toProto())
.build());
}

@Test
Expand Down
Loading
Loading