From 7fe17d1bf6cca0644668a305cc2b708cd48e4399 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 23 Aug 2024 10:29:42 -0700 Subject: [PATCH] encoding: delete v1 proto codec and prefer v1 codecs over v2 codecs --- codec.go | 10 ++-- encoding/proto/proto.go | 58 ++++++++++++------ encoding/proto/proto_benchmark_test.go | 6 +- encoding/proto/proto_test.go | 13 +++-- encoding/proto/proto_v2.go | 81 -------------------------- 5 files changed, 56 insertions(+), 112 deletions(-) delete mode 100644 encoding/proto/proto_v2.go diff --git a/codec.go b/codec.go index d3c2b35bf7e0..37b5c6811f9c 100644 --- a/codec.go +++ b/codec.go @@ -39,16 +39,16 @@ type baseCodec interface { // with encoding.GetCodec and if it is registered wraps it with newCodecV1Bridge // to turn it into an encoding.CodecV2. Returns nil otherwise. func getCodec(name string) encoding.CodecV2 { - codecV2 := encoding.GetCodecV2(name) - if codecV2 != nil { - return codecV2 - } - codecV1 := encoding.GetCodec(name) if codecV1 != nil { return newCodecV1Bridge(codecV1) } + codecV2 := encoding.GetCodecV2(name) + if codecV2 != nil { + return codecV2 + } + return nil } diff --git a/encoding/proto/proto.go b/encoding/proto/proto.go index 66d5cdf03ec5..1be86c6bdd7a 100644 --- a/encoding/proto/proto.go +++ b/encoding/proto/proto.go @@ -1,6 +1,6 @@ /* * - * Copyright 2018 gRPC authors. + * Copyright 2024 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,13 @@ * */ -// Package proto defines the protobuf codec. Importing this package will -// register the codec. package proto import ( "fmt" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/mem" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/protoadapt" ) @@ -32,28 +31,39 @@ import ( const Name = "proto" func init() { - encoding.RegisterCodec(codec{}) + encoding.RegisterCodecV2(&codecV2{}) } -// codec is a Codec implementation with protobuf. It is the default codec for gRPC. -type codec struct{} +// codec is a CodecV2 implementation with protobuf. It is the default codec for +// gRPC. +type codecV2 struct{} -func (codec) Marshal(v any) ([]byte, error) { +var _ encoding.CodecV2 = (*codecV2)(nil) + +func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) { vv := messageV2Of(v) if vv == nil { - return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v) + return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v) } - return proto.Marshal(vv) -} - -func (codec) Unmarshal(data []byte, v any) error { - vv := messageV2Of(v) - if vv == nil { - return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v) + size := proto.Size(vv) + if mem.IsBelowBufferPoolingThreshold(size) { + buf, err := proto.Marshal(vv) + if err != nil { + return nil, err + } + data = append(data, mem.SliceBuffer(buf)) + } else { + pool := mem.DefaultBufferPool() + buf := pool.Get(size) + if _, err := (proto.MarshalOptions{}).MarshalAppend((*buf)[:0], vv); err != nil { + pool.Put(buf) + return nil, err + } + data = append(data, mem.NewBuffer(buf, pool)) } - return proto.Unmarshal(data, vv) + return data, nil } func messageV2Of(v any) proto.Message { @@ -67,6 +77,20 @@ func messageV2Of(v any) proto.Message { return nil } -func (codec) Name() string { +func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) (err error) { + vv := messageV2Of(v) + if vv == nil { + return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v) + } + + buf := data.MaterializeToBuffer(mem.DefaultBufferPool()) + defer buf.Free() + // TODO: Upgrade proto.Unmarshal to support mem.BufferSlice. Right now, it's not + // really possible without a major overhaul of the proto package, but the + // vtprotobuf library may be able to support this. + return proto.Unmarshal(buf.ReadOnlyData(), vv) +} + +func (c *codecV2) Name() string { return Name } diff --git a/encoding/proto/proto_benchmark_test.go b/encoding/proto/proto_benchmark_test.go index 8b4cec756342..58007dcbfecc 100644 --- a/encoding/proto/proto_benchmark_test.go +++ b/encoding/proto/proto_benchmark_test.go @@ -68,7 +68,7 @@ func BenchmarkProtoCodec(b *testing.B) { protoStructs := setupBenchmarkProtoCodecInputs(s) name := fmt.Sprintf("MinPayloadSize:%v/SetParallelism(%v)", s, p) b.Run(name, func(b *testing.B) { - codec := &codec{} + codec := &codecV2{} b.SetParallelism(p) b.RunParallel(func(pb *testing.PB) { benchmarkProtoCodec(codec, protoStructs, pb, b) @@ -78,7 +78,7 @@ func BenchmarkProtoCodec(b *testing.B) { } } -func benchmarkProtoCodec(codec *codec, protoStructs []proto.Message, pb *testing.PB, b *testing.B) { +func benchmarkProtoCodec(codec *codecV2, protoStructs []proto.Message, pb *testing.PB, b *testing.B) { counter := 0 for pb.Next() { counter++ @@ -87,7 +87,7 @@ func benchmarkProtoCodec(codec *codec, protoStructs []proto.Message, pb *testing } } -func fastMarshalAndUnmarshal(codec encoding.Codec, protoStruct proto.Message, b *testing.B) { +func fastMarshalAndUnmarshal(codec encoding.CodecV2, protoStruct proto.Message, b *testing.B) { marshaledBytes, err := codec.Marshal(protoStruct) if err != nil { b.Errorf("codec.Marshal(_) returned an error") diff --git a/encoding/proto/proto_test.go b/encoding/proto/proto_test.go index d017eb8ec30d..117f3cb97fe8 100644 --- a/encoding/proto/proto_test.go +++ b/encoding/proto/proto_test.go @@ -25,10 +25,11 @@ import ( "google.golang.org/grpc/encoding" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/mem" pb "google.golang.org/grpc/test/codec_perf" ) -func marshalAndUnmarshal(t *testing.T, codec encoding.Codec, expectedBody []byte) { +func marshalAndUnmarshal(t *testing.T, codec encoding.CodecV2, expectedBody []byte) { p := &pb.Buffer{} p.Body = expectedBody @@ -55,7 +56,7 @@ func Test(t *testing.T) { } func (s) TestBasicProtoCodecMarshalAndUnmarshal(t *testing.T) { - marshalAndUnmarshal(t, codec{}, []byte{1, 2, 3}) + marshalAndUnmarshal(t, &codecV2{}, []byte{1, 2, 3}) } // Try to catch possible race conditions around use of pools @@ -75,7 +76,7 @@ func (s) TestConcurrentUsage(t *testing.T) { } var wg sync.WaitGroup - codec := codec{} + codec := &codecV2{} for i := 0; i < numGoRoutines; i++ { wg.Add(1) @@ -93,8 +94,8 @@ func (s) TestConcurrentUsage(t *testing.T) { // TestStaggeredMarshalAndUnmarshalUsingSamePool tries to catch potential errors in which slices get // stomped on during reuse of a proto.Buffer. func (s) TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) { - codec1 := codec{} - codec2 := codec{} + codec1 := &codecV2{} + codec2 := &codecV2{} expectedBody1 := []byte{1, 2, 3} expectedBody2 := []byte{4, 5, 6} @@ -102,7 +103,7 @@ func (s) TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) { proto1 := pb.Buffer{Body: expectedBody1} proto2 := pb.Buffer{Body: expectedBody2} - var m1, m2 []byte + var m1, m2 mem.BufferSlice var err error if m1, err = codec1.Marshal(&proto1); err != nil { diff --git a/encoding/proto/proto_v2.go b/encoding/proto/proto_v2.go deleted file mode 100644 index 367a3cd66832..000000000000 --- a/encoding/proto/proto_v2.go +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Copyright 2024 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package proto - -import ( - "fmt" - - "google.golang.org/grpc/encoding" - "google.golang.org/grpc/mem" - "google.golang.org/protobuf/proto" -) - -func init() { - encoding.RegisterCodecV2(&codecV2{}) -} - -// codec is a CodecV2 implementation with protobuf. It is the default codec for -// gRPC. -type codecV2 struct{} - -var _ encoding.CodecV2 = (*codecV2)(nil) - -func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) { - vv := messageV2Of(v) - if vv == nil { - return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v) - } - - size := proto.Size(vv) - if mem.IsBelowBufferPoolingThreshold(size) { - buf, err := proto.Marshal(vv) - if err != nil { - return nil, err - } - data = append(data, mem.SliceBuffer(buf)) - } else { - pool := mem.DefaultBufferPool() - buf := pool.Get(size) - if _, err := (proto.MarshalOptions{}).MarshalAppend((*buf)[:0], vv); err != nil { - pool.Put(buf) - return nil, err - } - data = append(data, mem.NewBuffer(buf, pool)) - } - - return data, nil -} - -func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) (err error) { - vv := messageV2Of(v) - if vv == nil { - return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v) - } - - buf := data.MaterializeToBuffer(mem.DefaultBufferPool()) - defer buf.Free() - // TODO: Upgrade proto.Unmarshal to support mem.BufferSlice. Right now, it's not - // really possible without a major overhaul of the proto package, but the - // vtprotobuf library may be able to support this. - return proto.Unmarshal(buf.ReadOnlyData(), vv) -} - -func (c *codecV2) Name() string { - return Name -}