Skip to content

Commit

Permalink
feat(bigtable): Add MergeToCell support to the bigtable emulator and …
Browse files Browse the repository at this point in the history
…client (#10366)

* feat(bigtable): Add MergeToCell support to the bigtable emulator and client

* feat(bigtable): Add MergeToCell support to the bigtable emulator and client

* add integration tests

* add integration tests
  • Loading branch information
ron-gal committed Jul 29, 2024
1 parent a49ab59 commit 0211c95
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 2 deletions.
14 changes: 14 additions & 0 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,20 @@ func (m *Mutation) addToCell(family, column string, ts Timestamp, value *btpb.Va
}}})
}

// MergeBytesToCell merges a bytes accumulator value to a cell in an aggregate column family.
func (m *Mutation) MergeBytesToCell(family, column string, ts Timestamp, value []byte) {
m.mergeToCell(family, column, ts, &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: value}})
}

func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb.Value) {
m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{
FamilyName: family,
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte(column)}},
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: int64(ts.TruncateToMilliseconds())}},
Input: value,
}}})
}

// entryErr is a container that combines an entry with the error that was returned for it.
// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
type entryErr struct {
Expand Down
63 changes: 62 additions & 1 deletion bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func TestHeaderPopulatedWithAppProfile(t *testing.T) {
}
}

func TestMutateRowsWithAggregates(t *testing.T) {
func TestMutateRowsWithAggregates_AddToCell(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
Expand Down Expand Up @@ -814,3 +814,64 @@ func TestMutateRowsWithAggregates(t *testing.T) {
t.Error()
}
}

func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
testEnv, err := NewEmulatedEnv(IntegrationTestConfig{})
if err != nil {
t.Fatalf("NewEmulatedEnv failed: %v", err)
}
conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
)
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer adminClient.Close()

tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}

client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if err != nil {
t.Fatalf("NewClient failed: %v", err)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

m := NewMutation()
m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 1000))
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}

m = NewMutation()
m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 2000))
err = table.Apply(ctx, "row1", m)
if err != nil {
t.Fatalf("Apply failed: %v", err)
}

row, err := table.ReadRow(ctx, "row1")
if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) {
t.Error()
}
}
29 changes: 29 additions & 0 deletions bigtable/bttest/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,35 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co
f := r.getOrCreateFamily(fam, fs[fam].order)
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)

case *btpb.Mutation_MergeToCell_:
add := mut.MergeToCell
var cf, ok = fs[add.FamilyName]
if !ok {
return fmt.Errorf("unknown family %q", add.FamilyName)
}
if cf.valueType == nil || cf.valueType.GetAggregateType() == nil {
return fmt.Errorf("illegal attempt to use MergeToCell on non-aggregate cell")
}
ts := add.Timestamp.GetRawTimestampMicros()
if ts < 0 {
return fmt.Errorf("MergeToCell must set timestamp >= 0")
}

fam := add.FamilyName
col := string(add.GetColumnQualifier().GetRawValue())

var value []byte
switch v := add.Input.Kind.(type) {
case *btpb.Value_RawValue:
value = v.RawValue
default:
return fmt.Errorf("only []bytes values are supported")
}

newCell := cell{ts: ts, value: value}
f := r.getOrCreateFamily(fam, fs[fam].order)
f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf)

case *btpb.Mutation_DeleteFromColumn_:
del := mut.DeleteFromColumn
if _, ok := fs[del.FamilyName]; !ok {
Expand Down
92 changes: 91 additions & 1 deletion bigtable/bttest/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,7 +1794,7 @@ func TestFilters(t *testing.T) {
}
}

func TestMutateRowsAggregate(t *testing.T) {
func TestMutateRowsAggregate_AddToCell(t *testing.T) {
ctx := context.Background()

s := &server{
Expand Down Expand Up @@ -1884,6 +1884,96 @@ func TestMutateRowsAggregate(t *testing.T) {
}
}

func TestMutateRowsAggregate_MergeToCell(t *testing.T) {
ctx := context.Background()

s := &server{
tables: make(map[string]*table),
}

tblInfo, err := populateTable(ctx, s)
if err != nil {
t.Fatal(err)
}

_, err = s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{
Name: tblInfo.Name,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: "sum",
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{
Create: &btapb.ColumnFamily{
ValueType: &btapb.Type{
Kind: &btapb.Type_AggregateType{
AggregateType: &btapb.Type_Aggregate{
InputType: &btapb.Type{
Kind: &btapb.Type_Int64Type{},
},
Aggregator: &btapb.Type_Aggregate_Sum_{
Sum: &btapb.Type_Aggregate_Sum{},
},
},
},
},
},
}},
}})

if err != nil {
t.Fatal(err)
}

_, err = s.MutateRow(ctx, &btpb.MutateRowRequest{
TableName: tblInfo.GetName(),
RowKey: []byte("row1"),
Mutations: []*btpb.Mutation{{
Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{
FamilyName: "sum",
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}},
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}},
Input: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: binary.BigEndian.AppendUint64([]byte{}, 1)}},
}},
}},
})

if err != nil {
t.Fatal(err)
}

_, err = s.MutateRow(ctx, &btpb.MutateRowRequest{
TableName: tblInfo.GetName(),
RowKey: []byte("row1"),
Mutations: []*btpb.Mutation{{
Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{
FamilyName: "sum",
ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}},
Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}},
Input: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: binary.BigEndian.AppendUint64([]byte{}, 2)}},
}},
}},
})

if err != nil {
t.Fatal(err)
}

mock := &MockReadRowsServer{}
err = s.ReadRows(&btpb.ReadRowsRequest{
TableName: tblInfo.GetName(),
Rows: &btpb.RowSet{
RowKeys: [][]byte{
[]byte("row1"),
},
}}, mock)
if err != nil {
t.Fatal(err)
}
got := mock.responses[0]

if !bytes.Equal(got.Chunks[0].Value, binary.BigEndian.AppendUint64([]byte{}, 3)) {
t.Error()
}
}

func Test_Mutation_DeleteFromColumn(t *testing.T) {
ctx := context.Background()

Expand Down
80 changes: 80 additions & 0 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bigtable

import (
"context"
"encoding/binary"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -280,6 +281,73 @@ func TestIntegration_ReadRowList(t *testing.T) {
}
}

func TestIntegration_Aggregates(t *testing.T) {
ctx := context.Background()
_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()
key := "some-key"
family := "sum"
column := "col"
mut := NewMutation()
mut.AddIntToCell(family, column, 1000, 5)

// Add 5 to empty cell.
if err := table.Apply(ctx, key, mut); err != nil {
t.Fatalf("Mutating row %q: %v", key, err)
}
row, err := table.ReadRow(ctx, key)
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
wantRow := Row{
family: []ReadItem{
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 5)},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}

// Add 5 again.
if err := table.Apply(ctx, key, mut); err != nil {
t.Fatalf("Mutating row %q: %v", key, err)
}
row, err = table.ReadRow(ctx, key)
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
wantRow = Row{
family: []ReadItem{
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 10)},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}

// Merge 5, which translates in the backend to adding 5 for sum column families.
mut2 := NewMutation()
mut2.MergeBytesToCell(family, column, 1000, binary.BigEndian.AppendUint64([]byte{}, 5))
if err := table.Apply(ctx, key, mut); err != nil {
t.Fatalf("Mutating row %q: %v", key, err)
}
row, err = table.ReadRow(ctx, key)
if err != nil {
t.Fatalf("Reading a row: %v", err)
}
wantRow = Row{
family: []ReadItem{
{Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 15)},
},
}
if !testutil.Equal(row, wantRow) {
t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow)
}
}

func TestIntegration_ReadRowListReverse(t *testing.T) {
ctx := context.Background()
_, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
Expand Down Expand Up @@ -4221,6 +4289,18 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C
return nil, nil, nil, nil, "", nil, err
}

err = retryOnUnavailable(ctx, func() error {
return adminClient.CreateColumnFamilyWithConfig(ctx, tableName, "sum", Family{ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
}})
})
if err != nil {
cancel()
t.Logf("Error creating aggregate column family: %v", err)
return nil, nil, nil, nil, "", nil, err
}

return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
if err := adminClient.DeleteTable(ctx, tableName); err != nil {
t.Errorf("DeleteTable got error %v", err)
Expand Down

0 comments on commit 0211c95

Please sign in to comment.