Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): add column family type to FamilyInfo in TableInfo #10520

Merged
merged 5 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ type FamilyInfo struct {
Name string
GCPolicy string
FullGCPolicy GCPolicy
ValueType Type
}

func (ac *AdminClient) getTable(ctx context.Context, table string, view btapb.Table_View) (*btapb.Table, error) {
Expand Down Expand Up @@ -638,6 +639,7 @@ func (ac *AdminClient) TableInfo(ctx context.Context, table string) (*TableInfo,
Name: name,
GCPolicy: GCRuleToString(fam.GcRule),
FullGCPolicy: gcRuleToPolicy(fam.GcRule),
ValueType: protoToType(fam.ValueType),
})
}
// we expect DeletionProtection to be in the response because Table_SCHEMA_VIEW is being used in this function
Expand Down
14 changes: 14 additions & 0 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4027,6 +4027,20 @@ func TestIntegration_TestUpdateColumnFamilyValueType(t *testing.T) {
if err != nil {
t.Fatalf("Failed to update value type of family: %v", err)
}

table, err := adminClient.TableInfo(ctx, tblConf.TableID)
if err != nil {
t.Fatalf("Failed to get table info: %v", err)
}
if len(table.FamilyInfos) != 0 {
t.Fatalf("Unexpected number of family infos. Got %d, want %d", len(table.FamilyInfos), 0)
}
if table.FamilyInfos[0].Name != "cf" {
t.Errorf("Unexpected family name. Got %q, want %q", table.FamilyInfos[0].Name, "cf")
}
if _, ok := table.FamilyInfos[0].ValueType.proto().GetKind().(*btapb.Type_StringType); !ok {
t.Errorf("Unexpected value type. Got %T, want *btapb.Type_StringType", table.FamilyInfos[0].ValueType.proto().GetKind())
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
Expand Down
86 changes: 86 additions & 0 deletions bigtable/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type Type interface {
proto() *btapb.Type
}

type unknown[T interface{}] struct {
wrapped *T
}

func (u unknown[T]) proto() *T {
return u.wrapped
}

// BytesEncoding represents the encoding of a Bytes type.
type BytesEncoding interface {
proto() *btapb.Type_Bytes_Encoding
Expand Down Expand Up @@ -142,6 +150,17 @@ func (sum SumAggregator) fillProto(proto *btapb.Type_Aggregate) {
proto.Aggregator = &btapb.Type_Aggregate_Sum_{Sum: &btapb.Type_Aggregate_Sum{}}
}

type unknownAggregator struct {
wrapped *btapb.Type_Aggregate
}

func (ua unknownAggregator) fillProto(proto *btapb.Type_Aggregate) {
if ua.wrapped == nil {
return
}
proto.Aggregator = ua.wrapped.Aggregator
}

// AggregateType represents an aggregate. See types.proto for more details
// on aggregate types.
type AggregateType struct {
Expand All @@ -157,3 +176,70 @@ func (agg AggregateType) proto() *btapb.Type {
agg.Aggregator.fillProto(protoAgg)
return &btapb.Type{Kind: &btapb.Type_AggregateType{AggregateType: protoAgg}}
}

func protoToType(pb *btapb.Type) Type {
if pb == nil {
return unknown[btapb.Type]{wrapped: nil}
}

switch t := pb.Kind.(type) {
case *btapb.Type_Int64Type:
return int64ProtoToType(t.Int64Type)
case *btapb.Type_BytesType:
return bytesProtoToType(t.BytesType)
case *btapb.Type_AggregateType:
return aggregateProtoToType(t.AggregateType)
default:
return unknown[btapb.Type]{wrapped: pb}
}
}

func bytesEncodingProtoToType(be *btapb.Type_Bytes_Encoding) BytesEncoding {
if be == nil {
return unknown[btapb.Type_Bytes_Encoding]{wrapped: be}
}

switch be.Encoding.(type) {
case *btapb.Type_Bytes_Encoding_Raw_:
return RawBytesEncoding{}
default:
return unknown[btapb.Type_Bytes_Encoding]{wrapped: be}
}
}

func bytesProtoToType(b *btapb.Type_Bytes) BytesType {
return BytesType{Encoding: bytesEncodingProtoToType(b.Encoding)}
}

func int64EncodingProtoToEncoding(ie *btapb.Type_Int64_Encoding) Int64Encoding {
if ie == nil {
return unknown[btapb.Type_Int64_Encoding]{wrapped: ie}
}

switch e := ie.Encoding.(type) {
case *btapb.Type_Int64_Encoding_BigEndianBytes_:
return BigEndianBytesEncoding{Bytes: bytesProtoToType(e.BigEndianBytes.BytesType)}
default:
return unknown[btapb.Type_Int64_Encoding]{wrapped: ie}
}
}

func int64ProtoToType(i *btapb.Type_Int64) Type {
return Int64Type{Encoding: int64EncodingProtoToEncoding(i.Encoding)}
}

func aggregateProtoToType(agg *btapb.Type_Aggregate) Type {
if agg == nil {
return AggregateType{Input: nil, Aggregator: unknownAggregator{wrapped: agg}}
}

it := protoToType(agg.InputType)
var aggregator Aggregator
switch agg.Aggregator.(type) {
case *btapb.Type_Aggregate_Sum_:
aggregator = SumAggregator{}
default:
aggregator = unknownAggregator{wrapped: agg}
}
return AggregateType{Input: it, Aggregator: aggregator}
}
64 changes: 62 additions & 2 deletions bigtable/type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"google.golang.org/protobuf/proto"
)

func TestInt64Proto(t *testing.T) {
want := &btapb.Type{
func aggregateProto() *btapb.Type {
return &btapb.Type{
Kind: &btapb.Type_Int64Type{
Int64Type: &btapb.Type_Int64{
Encoding: &btapb.Type_Int64_Encoding{
Expand All @@ -43,7 +43,10 @@ func TestInt64Proto(t *testing.T) {
},
},
}
}

func TestInt64Proto(t *testing.T) {
want := aggregateProto()
got := Int64Type{}.proto()
if !proto.Equal(got, want) {
t.Errorf("got type %v, want: %v", got, want)
Expand Down Expand Up @@ -102,3 +105,60 @@ func TestAggregateProto(t *testing.T) {
t.Errorf("got type %v, want: %v", got, want)
}
}

func TestProtoBijection(t *testing.T) {
want := aggregateProto()
got := protoToType(want).proto()
if !proto.Equal(got, want) {
t.Errorf("got type %v, want: %v", got, want)
}
}

func TestNilChecks(t *testing.T) {
// protoToType
if val, ok := protoToType(nil).(unknown[btapb.Type]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type]", val)
}
if val, ok := protoToType(&btapb.Type{}).(unknown[btapb.Type]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type]", val)
}

// bytesEncodingProtoToType
if val, ok := bytesEncodingProtoToType(nil).(unknown[btapb.Type_Bytes_Encoding]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type_Bytes_Encoding]", val)
}
if val, ok := bytesEncodingProtoToType(&btapb.Type_Bytes_Encoding{}).(unknown[btapb.Type_Bytes_Encoding]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type_Bytes_Encoding]", val)
}

// int64EncodingProtoToEncoding
if val, ok := int64EncodingProtoToEncoding(nil).(unknown[btapb.Type_Int64_Encoding]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type_Int64_Encoding]", val)
}
if val, ok := int64EncodingProtoToEncoding(&btapb.Type_Int64_Encoding{}).(unknown[btapb.Type_Int64_Encoding]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type_Int64_Encoding]", val)
}

// aggregateProtoToType
aggType1, ok := aggregateProtoToType(nil).(AggregateType)
if !ok {
t.Fatalf("got: %T, wanted AggregateType", aggType1)
}
if val, ok := aggType1.Aggregator.(unknownAggregator); !ok {
t.Errorf("got: %T, wanted unknownAggregator", val)
}
if aggType1.Input != nil {
t.Errorf("got: %v, wanted nil", aggType1.Input)
}

aggType2, ok := aggregateProtoToType(&btapb.Type_Aggregate{}).(AggregateType)
if !ok {
t.Fatalf("got: %T, wanted AggregateType", aggType2)
}
if val, ok := aggType2.Aggregator.(unknownAggregator); !ok {
t.Errorf("got: %T, wanted unknownAggregator", val)
}
if val, ok := aggType2.Input.(unknown[btapb.Type]); !ok {
t.Errorf("got: %T, wanted unknown[btapb.Type]", val)
}
}
Loading