Skip to content

Commit

Permalink
add MaxRecvMsgSize/MaxSendMsgSize
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 23, 2022
1 parent c3bc411 commit 460b71f
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 22 deletions.
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

// MaxRecvMsgSize/MaxSendMsgSize is the max gRPC message size in bytes the server can receive/send.
MaxRecvMsgSize int
MaxSendMsgSize int

WarningApplyDuration time.Duration
WarningUnaryRequestDuration time.Duration

Expand Down
17 changes: 14 additions & 3 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package embed
import (
"errors"
"fmt"
"math"
"net"
"net/http"
"net/url"
Expand All @@ -31,7 +32,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/tlsutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/flags"
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/server/v3/config"
Expand Down Expand Up @@ -59,6 +60,8 @@ const (
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultWarningUnaryRequestDuration = 300 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxRecvMsgSize = 4 * 1024 * 1024 // 4MB
DefaultMaxSendMsgSize = math.MaxInt32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
Expand Down Expand Up @@ -205,6 +208,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// MaxRecvMsgSize/MaxSendMsgSize is the max gRPC message size in bytes the server can receive/send.
MaxRecvMsgSize int `json:"max-recv-msg-bytes"`
MaxSendMsgSize int `json:"max-send-msg-bytes"`

LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
Expand Down Expand Up @@ -460,8 +467,12 @@ func NewConfig() *Config {
SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,

MaxRecvMsgSize: DefaultMaxRecvMsgSize,
MaxSendMsgSize: DefaultMaxSendMsgSize,

ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration,
Expand Down
2 changes: 2 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxRecvMsgSize: cfg.MaxRecvMsgSize,
MaxSendMsgSize: cfg.MaxSendMsgSize,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
Expand Down
10 changes: 3 additions & 7 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)
const maxStreams = math.MaxUint32

func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
Expand Down Expand Up @@ -66,8 +62,8 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(chainUnaryInterceptors...)))
opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(chainStreamInterceptors...)))

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxRecvMsgSize(s.Cfg.MaxRecvMsgSize))
opts = append(opts, grpc.MaxSendMsgSize(s.Cfg.MaxSendMsgSize))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))

grpcServer := grpc.NewServer(append(opts, gopts...)...)
Expand Down
5 changes: 4 additions & 1 deletion server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import (
"go.uber.org/zap"
)

const minWatchProgressInterval = 100 * time.Millisecond
const (
minWatchProgressInterval = 100 * time.Millisecond
grpcOverheadBytes = 512 * 1024
)

type watchServer struct {
lg *zap.Logger
Expand Down
22 changes: 19 additions & 3 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/tlsutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/grpc_testing"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/server/v3/config"
Expand Down Expand Up @@ -142,8 +142,12 @@ type ClusterConfig struct {

QuotaBackendBytes int64

MaxTxnOps uint
MaxRequestBytes uint
MaxTxnOps uint
MaxRequestBytes uint

MaxRecvMsgSize int
MaxSendMsgSize int

SnapshotCount uint64
SnapshotCatchUpEntries uint64

Expand Down Expand Up @@ -267,6 +271,8 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
QuotaBackendBytes: c.Cfg.QuotaBackendBytes,
MaxTxnOps: c.Cfg.MaxTxnOps,
MaxRequestBytes: c.Cfg.MaxRequestBytes,
MaxRecvMsgSize: c.Cfg.MaxRecvMsgSize,
MaxSendMsgSize: c.Cfg.MaxSendMsgSize,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
GrpcKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
Expand Down Expand Up @@ -588,6 +594,8 @@ type MemberConfig struct {
QuotaBackendBytes int64
MaxTxnOps uint
MaxRequestBytes uint
MaxRecvMsgSize int
MaxSendMsgSize int
SnapshotCount uint64
SnapshotCatchUpEntries uint64
GrpcKeepAliveMinTime time.Duration
Expand Down Expand Up @@ -664,6 +672,14 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if m.MaxRequestBytes == 0 {
m.MaxRequestBytes = embed.DefaultMaxRequestBytes
}
m.MaxRecvMsgSize = mcfg.MaxRecvMsgSize
if m.MaxRecvMsgSize == 0 {
m.MaxRecvMsgSize = embed.DefaultMaxRecvMsgSize
}
m.MaxSendMsgSize = mcfg.MaxSendMsgSize
if m.MaxSendMsgSize == 0 {
m.MaxSendMsgSize = embed.DefaultMaxSendMsgSize
}
m.SnapshotCount = etcdserver.DefaultSnapshotCount
if mcfg.SnapshotCount != 0 {
m.SnapshotCount = mcfg.SnapshotCount
Expand Down
10 changes: 9 additions & 1 deletion tests/integration/clientv3/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -686,13 +686,15 @@ func TestKVLargeRequests(t *testing.T) {
tests := []struct {
// make sure that "MaxCallSendMsgSize" < server-side default send/recv limit
maxRequestBytesServer uint
maxRecvMsgBytesServer int
maxCallSendBytesClient int
maxCallRecvBytesClient int

valueSize int
expectError error
}{
{
maxRecvMsgBytesServer: 256,
maxRequestBytesServer: 256,
maxCallSendBytesClient: 0,
maxCallRecvBytesClient: 0,
Expand All @@ -704,6 +706,7 @@ func TestKVLargeRequests(t *testing.T) {
// "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)"
{

maxRecvMsgBytesServer: 7*1024*1024 + 512*1024,
maxRequestBytesServer: 7*1024*1024 + 512*1024,
maxCallSendBytesClient: 7 * 1024 * 1024,
maxCallRecvBytesClient: 0,
Expand All @@ -712,27 +715,31 @@ func TestKVLargeRequests(t *testing.T) {
},

{
maxRecvMsgBytesServer: 10 * 1024 * 1024,
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 100 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: rpctypes.ErrRequestTooLarge,
},
{
maxRecvMsgBytesServer: 10 * 1024 * 1024,
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: status.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
{
maxRecvMsgBytesServer: 10 * 1024 * 1024,
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 100 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: rpctypes.ErrRequestTooLarge,
},
{
maxRecvMsgBytesServer: 10 * 1024 * 1024,
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
Expand All @@ -745,6 +752,7 @@ func TestKVLargeRequests(t *testing.T) {
&integration2.ClusterConfig{
Size: 1,
MaxRequestBytes: test.maxRequestBytesServer,
MaxRecvMsgSize: test.maxRecvMsgBytesServer + 512*1024,
ClientMaxCallSendMsgSize: test.maxCallSendBytesClient,
ClientMaxCallRecvMsgSize: test.maxCallRecvBytesClient,
},
Expand Down
14 changes: 7 additions & 7 deletions tests/integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/integration"

"google.golang.org/grpc"
Expand Down Expand Up @@ -1893,18 +1893,19 @@ func TestV3LargeRequests(t *testing.T) {
integration.BeforeTest(t)
tests := []struct {
maxRequestBytes uint
maxRecvMsgSize int
valueSize int
expectError error
}{
// don't set to 0. use 0 as the default.
{256, 1024, rpctypes.ErrGRPCRequestTooLarge},
{10 * 1024 * 1024, 9 * 1024 * 1024, nil},
{10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
{10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
{256, 256, 1024, rpctypes.ErrGRPCRequestTooLarge},
{10 * 1024 * 1024, 10 * 1024 * 1024, 9 * 1024 * 1024, nil},
{10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
{10 * 1024 * 1024, 10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
}
for i, test := range tests {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes, MaxRecvMsgSize: test.maxRecvMsgSize + 512*1024})
defer clus.Terminate(t)
kvcli := integration.ToGRPC(clus.Client(0)).KV
reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
Expand All @@ -1922,7 +1923,6 @@ func TestV3LargeRequests(t *testing.T) {
t.Errorf("#%d: range expected no error, got %v", i, err)
}
}

})
}
}
Expand Down

0 comments on commit 460b71f

Please sign in to comment.