Skip to content

Commit

Permalink
encoding: delete v1 proto codec and prefer v1 codecs over v2 codecs
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 23, 2024
1 parent 3d95421 commit 7fe17d1
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 112 deletions.
10 changes: 5 additions & 5 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ type baseCodec interface {
// with encoding.GetCodec and if it is registered wraps it with newCodecV1Bridge
// to turn it into an encoding.CodecV2. Returns nil otherwise.
func getCodec(name string) encoding.CodecV2 {
codecV2 := encoding.GetCodecV2(name)
if codecV2 != nil {
return codecV2
}

codecV1 := encoding.GetCodec(name)
if codecV1 != nil {
return newCodecV1Bridge(codecV1)
}

codecV2 := encoding.GetCodecV2(name)
if codecV2 != nil {
return codecV2
}

return nil
}

Expand Down
58 changes: 41 additions & 17 deletions encoding/proto/proto.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright 2018 gRPC authors.
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,13 @@
*
*/

// Package proto defines the protobuf codec. Importing this package will
// register the codec.
package proto

Check failure on line 19 in encoding/proto/proto.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

at least one file in a package should have a package comment (ST1000)

import (
"fmt"

"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/protoadapt"
)
Expand All @@ -32,28 +31,39 @@ import (
const Name = "proto"

func init() {
encoding.RegisterCodec(codec{})
encoding.RegisterCodecV2(&codecV2{})
}

// codec is a Codec implementation with protobuf. It is the default codec for gRPC.
type codec struct{}
// codec is a CodecV2 implementation with protobuf. It is the default codec for
// gRPC.
type codecV2 struct{}

func (codec) Marshal(v any) ([]byte, error) {
var _ encoding.CodecV2 = (*codecV2)(nil)

func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) {
vv := messageV2Of(v)
if vv == nil {
return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v)
}

return proto.Marshal(vv)
}

func (codec) Unmarshal(data []byte, v any) error {
vv := messageV2Of(v)
if vv == nil {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
size := proto.Size(vv)
if mem.IsBelowBufferPoolingThreshold(size) {
buf, err := proto.Marshal(vv)
if err != nil {
return nil, err
}
data = append(data, mem.SliceBuffer(buf))
} else {
pool := mem.DefaultBufferPool()
buf := pool.Get(size)
if _, err := (proto.MarshalOptions{}).MarshalAppend((*buf)[:0], vv); err != nil {
pool.Put(buf)
return nil, err
}
data = append(data, mem.NewBuffer(buf, pool))
}

return proto.Unmarshal(data, vv)
return data, nil
}

func messageV2Of(v any) proto.Message {
Expand All @@ -67,6 +77,20 @@ func messageV2Of(v any) proto.Message {
return nil
}

func (codec) Name() string {
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) (err error) {
vv := messageV2Of(v)
if vv == nil {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
defer buf.Free()
// TODO: Upgrade proto.Unmarshal to support mem.BufferSlice. Right now, it's not
// really possible without a major overhaul of the proto package, but the
// vtprotobuf library may be able to support this.
return proto.Unmarshal(buf.ReadOnlyData(), vv)
}

func (c *codecV2) Name() string {
return Name
}
6 changes: 3 additions & 3 deletions encoding/proto/proto_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func BenchmarkProtoCodec(b *testing.B) {
protoStructs := setupBenchmarkProtoCodecInputs(s)
name := fmt.Sprintf("MinPayloadSize:%v/SetParallelism(%v)", s, p)
b.Run(name, func(b *testing.B) {
codec := &codec{}
codec := &codecV2{}
b.SetParallelism(p)
b.RunParallel(func(pb *testing.PB) {
benchmarkProtoCodec(codec, protoStructs, pb, b)
Expand All @@ -78,7 +78,7 @@ func BenchmarkProtoCodec(b *testing.B) {
}
}

func benchmarkProtoCodec(codec *codec, protoStructs []proto.Message, pb *testing.PB, b *testing.B) {
func benchmarkProtoCodec(codec *codecV2, protoStructs []proto.Message, pb *testing.PB, b *testing.B) {
counter := 0
for pb.Next() {
counter++
Expand All @@ -87,7 +87,7 @@ func benchmarkProtoCodec(codec *codec, protoStructs []proto.Message, pb *testing
}
}

func fastMarshalAndUnmarshal(codec encoding.Codec, protoStruct proto.Message, b *testing.B) {
func fastMarshalAndUnmarshal(codec encoding.CodecV2, protoStruct proto.Message, b *testing.B) {
marshaledBytes, err := codec.Marshal(protoStruct)
if err != nil {
b.Errorf("codec.Marshal(_) returned an error")
Expand Down
13 changes: 7 additions & 6 deletions encoding/proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (

"google.golang.org/grpc/encoding"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/mem"
pb "google.golang.org/grpc/test/codec_perf"
)

func marshalAndUnmarshal(t *testing.T, codec encoding.Codec, expectedBody []byte) {
func marshalAndUnmarshal(t *testing.T, codec encoding.CodecV2, expectedBody []byte) {
p := &pb.Buffer{}
p.Body = expectedBody

Expand All @@ -55,7 +56,7 @@ func Test(t *testing.T) {
}

func (s) TestBasicProtoCodecMarshalAndUnmarshal(t *testing.T) {
marshalAndUnmarshal(t, codec{}, []byte{1, 2, 3})
marshalAndUnmarshal(t, &codecV2{}, []byte{1, 2, 3})
}

// Try to catch possible race conditions around use of pools
Expand All @@ -75,7 +76,7 @@ func (s) TestConcurrentUsage(t *testing.T) {
}

var wg sync.WaitGroup
codec := codec{}
codec := &codecV2{}

for i := 0; i < numGoRoutines; i++ {
wg.Add(1)
Expand All @@ -93,16 +94,16 @@ func (s) TestConcurrentUsage(t *testing.T) {
// TestStaggeredMarshalAndUnmarshalUsingSamePool tries to catch potential errors in which slices get
// stomped on during reuse of a proto.Buffer.
func (s) TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) {
codec1 := codec{}
codec2 := codec{}
codec1 := &codecV2{}
codec2 := &codecV2{}

expectedBody1 := []byte{1, 2, 3}
expectedBody2 := []byte{4, 5, 6}

proto1 := pb.Buffer{Body: expectedBody1}
proto2 := pb.Buffer{Body: expectedBody2}

var m1, m2 []byte
var m1, m2 mem.BufferSlice
var err error

if m1, err = codec1.Marshal(&proto1); err != nil {
Expand Down
81 changes: 0 additions & 81 deletions encoding/proto/proto_v2.go

This file was deleted.

0 comments on commit 7fe17d1

Please sign in to comment.