From 0211c95e0404aad31be5bec6d5855f0bc5358161 Mon Sep 17 00:00:00 2001 From: ron-gal <125445217+ron-gal@users.noreply.github.com> Date: Mon, 29 Jul 2024 13:25:47 -0400 Subject: [PATCH] feat(bigtable): Add MergeToCell support to the bigtable emulator and client (#10366) * feat(bigtable): Add MergeToCell support to the bigtable emulator and client * feat(bigtable): Add MergeToCell support to the bigtable emulator and client * add integration tests * add integration tests --- bigtable/bigtable.go | 14 ++++++ bigtable/bigtable_test.go | 63 +++++++++++++++++++++++- bigtable/bttest/inmem.go | 29 +++++++++++ bigtable/bttest/inmem_test.go | 92 ++++++++++++++++++++++++++++++++++- bigtable/integration_test.go | 80 ++++++++++++++++++++++++++++++ 5 files changed, 276 insertions(+), 2 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 834c6b622f79..576146899de8 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1167,6 +1167,20 @@ func (m *Mutation) addToCell(family, column string, ts Timestamp, value *btpb.Va }}}) } +// MergeBytesToCell merges a bytes accumulator value to a cell in an aggregate column family. +func (m *Mutation) MergeBytesToCell(family, column string, ts Timestamp, value []byte) { + m.mergeToCell(family, column, ts, &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: value}}) +} + +func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb.Value) { + m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{ + FamilyName: family, + ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte(column)}}, + Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: int64(ts.TruncateToMilliseconds())}}, + Input: value, + }}}) +} + // entryErr is a container that combines an entry with the error that was returned for it. // Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed. type entryErr struct { diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index d1a85b05c3fd..aa26eb29b703 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -754,7 +754,7 @@ func TestHeaderPopulatedWithAppProfile(t *testing.T) { } } -func TestMutateRowsWithAggregates(t *testing.T) { +func TestMutateRowsWithAggregates_AddToCell(t *testing.T) { testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) if err != nil { t.Fatalf("NewEmulatedEnv failed: %v", err) @@ -814,3 +814,64 @@ func TestMutateRowsWithAggregates(t *testing.T) { t.Error() } } + +func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) { + testEnv, err := NewEmulatedEnv(IntegrationTestConfig{}) + if err != nil { + t.Fatalf("NewEmulatedEnv failed: %v", err) + } + conn, err := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), + ) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + adminClient, err := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + defer adminClient.Close() + + tableConf := &TableConf{ + TableID: testEnv.config.Table, + ColumnFamilies: map[string]Family{ + "f": { + ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: SumAggregator{}, + }, + }, + }, + } + if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { + t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) + } + + client, err := NewClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if err != nil { + t.Fatalf("NewClient failed: %v", err) + } + defer client.Close() + table := client.Open(testEnv.config.Table) + + m := NewMutation() + m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 1000)) + err = table.Apply(ctx, "row1", m) + if err != nil { + t.Fatalf("Apply failed: %v", err) + } + + m = NewMutation() + m.MergeBytesToCell("f", "q", 0, binary.BigEndian.AppendUint64([]byte{}, 2000)) + err = table.Apply(ctx, "row1", m) + if err != nil { + t.Fatalf("Apply failed: %v", err) + } + + row, err := table.ReadRow(ctx, "row1") + if !bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) { + t.Error() + } +} diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index 4ae9ab8a57b5..f7144bad9f43 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go @@ -1147,6 +1147,35 @@ func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*co f := r.getOrCreateFamily(fam, fs[fam].order) f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf) + case *btpb.Mutation_MergeToCell_: + add := mut.MergeToCell + var cf, ok = fs[add.FamilyName] + if !ok { + return fmt.Errorf("unknown family %q", add.FamilyName) + } + if cf.valueType == nil || cf.valueType.GetAggregateType() == nil { + return fmt.Errorf("illegal attempt to use MergeToCell on non-aggregate cell") + } + ts := add.Timestamp.GetRawTimestampMicros() + if ts < 0 { + return fmt.Errorf("MergeToCell must set timestamp >= 0") + } + + fam := add.FamilyName + col := string(add.GetColumnQualifier().GetRawValue()) + + var value []byte + switch v := add.Input.Kind.(type) { + case *btpb.Value_RawValue: + value = v.RawValue + default: + return fmt.Errorf("only []bytes values are supported") + } + + newCell := cell{ts: ts, value: value} + f := r.getOrCreateFamily(fam, fs[fam].order) + f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell, cf) + case *btpb.Mutation_DeleteFromColumn_: del := mut.DeleteFromColumn if _, ok := fs[del.FamilyName]; !ok { diff --git a/bigtable/bttest/inmem_test.go b/bigtable/bttest/inmem_test.go index 0eb688b8a796..ad4fe1d86166 100644 --- a/bigtable/bttest/inmem_test.go +++ b/bigtable/bttest/inmem_test.go @@ -1794,7 +1794,7 @@ func TestFilters(t *testing.T) { } } -func TestMutateRowsAggregate(t *testing.T) { +func TestMutateRowsAggregate_AddToCell(t *testing.T) { ctx := context.Background() s := &server{ @@ -1884,6 +1884,96 @@ func TestMutateRowsAggregate(t *testing.T) { } } +func TestMutateRowsAggregate_MergeToCell(t *testing.T) { + ctx := context.Background() + + s := &server{ + tables: make(map[string]*table), + } + + tblInfo, err := populateTable(ctx, s) + if err != nil { + t.Fatal(err) + } + + _, err = s.ModifyColumnFamilies(ctx, &btapb.ModifyColumnFamiliesRequest{ + Name: tblInfo.Name, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: "sum", + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{ + Create: &btapb.ColumnFamily{ + ValueType: &btapb.Type{ + Kind: &btapb.Type_AggregateType{ + AggregateType: &btapb.Type_Aggregate{ + InputType: &btapb.Type{ + Kind: &btapb.Type_Int64Type{}, + }, + Aggregator: &btapb.Type_Aggregate_Sum_{ + Sum: &btapb.Type_Aggregate_Sum{}, + }, + }, + }, + }, + }, + }}, + }}) + + if err != nil { + t.Fatal(err) + } + + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: tblInfo.GetName(), + RowKey: []byte("row1"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{ + FamilyName: "sum", + ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}}, + Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}}, + Input: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: binary.BigEndian.AppendUint64([]byte{}, 1)}}, + }}, + }}, + }) + + if err != nil { + t.Fatal(err) + } + + _, err = s.MutateRow(ctx, &btpb.MutateRowRequest{ + TableName: tblInfo.GetName(), + RowKey: []byte("row1"), + Mutations: []*btpb.Mutation{{ + Mutation: &btpb.Mutation_MergeToCell_{MergeToCell: &btpb.Mutation_MergeToCell{ + FamilyName: "sum", + ColumnQualifier: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: []byte("col1")}}, + Timestamp: &btpb.Value{Kind: &btpb.Value_RawTimestampMicros{RawTimestampMicros: 0}}, + Input: &btpb.Value{Kind: &btpb.Value_RawValue{RawValue: binary.BigEndian.AppendUint64([]byte{}, 2)}}, + }}, + }}, + }) + + if err != nil { + t.Fatal(err) + } + + mock := &MockReadRowsServer{} + err = s.ReadRows(&btpb.ReadRowsRequest{ + TableName: tblInfo.GetName(), + Rows: &btpb.RowSet{ + RowKeys: [][]byte{ + []byte("row1"), + }, + }}, mock) + if err != nil { + t.Fatal(err) + } + got := mock.responses[0] + + if !bytes.Equal(got.Chunks[0].Value, binary.BigEndian.AppendUint64([]byte{}, 3)) { + t.Error() + } +} + func Test_Mutation_DeleteFromColumn(t *testing.T) { ctx := context.Background() diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 7f3c5a4a4c7f..240e5bc54222 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -18,6 +18,7 @@ package bigtable import ( "context" + "encoding/binary" "flag" "fmt" "log" @@ -280,6 +281,73 @@ func TestIntegration_ReadRowList(t *testing.T) { } } +func TestIntegration_Aggregates(t *testing.T) { + ctx := context.Background() + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) + if err != nil { + t.Fatal(err) + } + defer cleanup() + key := "some-key" + family := "sum" + column := "col" + mut := NewMutation() + mut.AddIntToCell(family, column, 1000, 5) + + // Add 5 to empty cell. + if err := table.Apply(ctx, key, mut); err != nil { + t.Fatalf("Mutating row %q: %v", key, err) + } + row, err := table.ReadRow(ctx, key) + if err != nil { + t.Fatalf("Reading a row: %v", err) + } + wantRow := Row{ + family: []ReadItem{ + {Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 5)}, + }, + } + if !testutil.Equal(row, wantRow) { + t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) + } + + // Add 5 again. + if err := table.Apply(ctx, key, mut); err != nil { + t.Fatalf("Mutating row %q: %v", key, err) + } + row, err = table.ReadRow(ctx, key) + if err != nil { + t.Fatalf("Reading a row: %v", err) + } + wantRow = Row{ + family: []ReadItem{ + {Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 10)}, + }, + } + if !testutil.Equal(row, wantRow) { + t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) + } + + // Merge 5, which translates in the backend to adding 5 for sum column families. + mut2 := NewMutation() + mut2.MergeBytesToCell(family, column, 1000, binary.BigEndian.AppendUint64([]byte{}, 5)) + if err := table.Apply(ctx, key, mut); err != nil { + t.Fatalf("Mutating row %q: %v", key, err) + } + row, err = table.ReadRow(ctx, key) + if err != nil { + t.Fatalf("Reading a row: %v", err) + } + wantRow = Row{ + family: []ReadItem{ + {Row: key, Column: fmt.Sprintf("%s:%s", family, column), Timestamp: 1000, Value: binary.BigEndian.AppendUint64([]byte{}, 15)}, + }, + } + if !testutil.Equal(row, wantRow) { + t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) + } +} + func TestIntegration_ReadRowListReverse(t *testing.T) { ctx := context.Background() _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) @@ -4221,6 +4289,18 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C return nil, nil, nil, nil, "", nil, err } + err = retryOnUnavailable(ctx, func() error { + return adminClient.CreateColumnFamilyWithConfig(ctx, tableName, "sum", Family{ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: SumAggregator{}, + }}) + }) + if err != nil { + cancel() + t.Logf("Error creating aggregate column family: %v", err) + return nil, nil, nil, nil, "", nil, err + } + return testEnv, client, adminClient, client.Open(tableName), tableName, func() { if err := adminClient.DeleteTable(ctx, tableName); err != nil { t.Errorf("DeleteTable got error %v", err)