diff --git a/common/constant/default.go b/common/constant/default.go index 93ab319d08..8c0f4ffbd3 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -17,6 +17,8 @@ package constant +import "math" + const ( Dubbo = "dubbo" ProviderProtocol = "provider" @@ -91,3 +93,11 @@ const ( ServiceDiscoveryDefaultGroup = "DEFAULT_GROUP" NotAvailable = "N/A" ) + +const ( + DefaultMaxServerRecvMsgSize = 1024 * 1024 * 4 + DefaultMaxServerSendMsgSize = math.MaxInt32 + + DefaultMaxCallRecvMsgSize = 1024 * 1024 * 4 + DefaultMaxCallSendMsgSize = math.MaxInt32 +) diff --git a/config/protocol_config.go b/config/protocol_config.go index 8c4f8ecb84..39cb95f1d7 100644 --- a/config/protocol_config.go +++ b/config/protocol_config.go @@ -31,6 +31,12 @@ type ProtocolConfig struct { Ip string `yaml:"ip" json:"ip,omitempty" property:"ip"` Port string `default:"20000" yaml:"port" json:"port,omitempty" property:"port"` Params interface{} `yaml:"params" json:"params,omitempty" property:"params"` + + // MaxServerSendMsgSize max size of server send message, 1mb=1000kb=1000000b 1mib=1024kb=1048576b. + // more detail to see https://pkg.go.dev/github.com/dustin/go-humanize#pkg-constants + MaxServerSendMsgSize string `yaml:"max-server-send-msg-size" json:"max-server-send-msg-size,omitempty"` + // MaxServerRecvMsgSize max size of server receive message + MaxServerRecvMsgSize string `default:"4mib" yaml:"max-server-recv-msg-size" json:"max-server-recv-msg-size,omitempty"` } // Prefix dubbo.config-center @@ -77,6 +83,16 @@ func (pcb *ProtocolConfigBuilder) SetParams(params interface{}) *ProtocolConfigB return pcb } +func (pcb *ProtocolConfigBuilder) SetMaxServerSendMsgSize(maxServerSendMsgSize string) *ProtocolConfigBuilder { + pcb.protocolConfig.MaxServerSendMsgSize = maxServerSendMsgSize + return pcb +} + +func (pcb *ProtocolConfigBuilder) SetMaxServerRecvMsgSize(maxServerRecvMsgSize string) *ProtocolConfigBuilder { + pcb.protocolConfig.MaxServerRecvMsgSize = maxServerRecvMsgSize + return pcb +} + func (pcb *ProtocolConfigBuilder) Build() *ProtocolConfig { return pcb.protocolConfig } diff --git a/config/protocol_config_test.go b/config/protocol_config_test.go index b27812046d..9068c91e74 100644 --- a/config/protocol_config_test.go +++ b/config/protocol_config_test.go @@ -35,6 +35,7 @@ func TestGetProtocolsConfig(t *testing.T) { // default assert.Equal(t, "dubbo", protocols["dubbo"].Name) assert.Equal(t, string("20000"), protocols["dubbo"].Port) + assert.Equal(t, "4mib", protocols["dubbo"].MaxServerRecvMsgSize) }) t.Run("use config", func(t *testing.T) { @@ -45,5 +46,7 @@ func TestGetProtocolsConfig(t *testing.T) { // default assert.Equal(t, "dubbo", protocols["dubbo"].Name) assert.Equal(t, string("20000"), protocols["dubbo"].Port) + assert.Equal(t, "4mib", protocols["dubbo"].MaxServerSendMsgSize) + assert.Equal(t, "4mib", protocols["dubbo"].MaxServerRecvMsgSize) }) } diff --git a/config/service_config.go b/config/service_config.go index 92a4e269cf..f66175321f 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -76,7 +76,6 @@ type ServiceConfig struct { NotRegister bool `yaml:"not_register" json:"not_register,omitempty" property:"not_register"` ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"` Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"` - GrpcMaxMessageSize int `default:"4" yaml:"max_message_size" json:"max_message_size,omitempty"` TracingKey string `yaml:"tracing-key" json:"tracing-key,omitempty" propertiy:"tracing-key"` RCProtocolsMap map[string]*ProtocolConfig @@ -278,6 +277,9 @@ func (s *ServiceConfig) Export() error { common.WithMethods(strings.Split(methods, ",")), common.WithToken(s.Token), common.WithParamsValue(constant.MetadataTypeKey, s.metadataType), + // fix https://github.com/apache/dubbo-go/issues/2176 + common.WithParamsValue(constant.MaxServerSendMsgSize, proto.MaxServerSendMsgSize), + common.WithParamsValue(constant.MaxServerRecvMsgSize, proto.MaxServerRecvMsgSize), ) if len(s.Tag) > 0 { ivkURL.AddParam(constant.Tagkey, s.Tag) @@ -331,13 +333,13 @@ func (s *ServiceConfig) Export() error { return nil } -//setRegistrySubURL set registry sub url is ivkURl +// setRegistrySubURL set registry sub url is ivkURl func setRegistrySubURL(ivkURL *common.URL, regUrl *common.URL) { ivkURL.AddParam(constant.RegistryKey, regUrl.GetParam(constant.RegistryKey, "")) regUrl.SubURL = ivkURL } -//loadProtocol filter protocols by ids +// loadProtocol filter protocols by ids func loadProtocol(protocolIds []string, protocols map[string]*ProtocolConfig) []*ProtocolConfig { returnProtocols := make([]*ProtocolConfig, 0, len(protocols)) for _, v := range protocolIds { @@ -435,7 +437,6 @@ func (s *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER)) urlMap.Set(constant.ReleaseKey, "dubbo-golang-"+constant.Version) urlMap.Set(constant.SideKey, (common.RoleType(common.PROVIDER)).Role()) - urlMap.Set(constant.MessageSizeKey, strconv.Itoa(s.GrpcMaxMessageSize)) // todo: move urlMap.Set(constant.SerializationKey, s.Serialization) // application config info diff --git a/config/testdata/config/protocol/application.yaml b/config/testdata/config/protocol/application.yaml index 4f06d63f49..c248092136 100644 --- a/config/testdata/config/protocol/application.yaml +++ b/config/testdata/config/protocol/application.yaml @@ -4,4 +4,9 @@ dubbo: timeout: 5s group: dev address: nacos://127.0.0.1:8848 - protocols: \ No newline at end of file + protocols: + dubbo: + name: dubbo + port: 20000 + max-server-send-msg-size: 4mib + max-server-recv-msg-size: 4mib \ No newline at end of file diff --git a/go.mod b/go.mod index ad35627c97..4822ecefdb 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,8 @@ require ( github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5 github.com/dubbogo/gost v1.13.2 github.com/dubbogo/grpc-go v1.42.10 - github.com/dubbogo/triple v1.2.2-rc2 + github.com/dubbogo/triple v1.2.2-rc3 + github.com/dustin/go-humanize v1.0.0 github.com/emicklei/go-restful/v3 v3.10.1 github.com/envoyproxy/go-control-plane v0.11.0 github.com/fsnotify/fsnotify v1.6.0 diff --git a/go.sum b/go.sum index 4c5f130c9b..91dd337e5f 100644 --- a/go.sum +++ b/go.sum @@ -405,7 +405,6 @@ github.com/apache/dubbo-getty v1.4.9 h1:Y8l1EYJqIc7BnmyfYtvG4H4Nmu4v7P1uS31fFQGd github.com/apache/dubbo-getty v1.4.9/go.mod h1:6qmrqBSPGs3B35zwEuGhEYNVsx1nfGT/xzV2yOt2amM= github.com/apache/dubbo-go-hessian2 v1.9.1/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE= github.com/apache/dubbo-go-hessian2 v1.9.3/go.mod h1:xQUjE7F8PX49nm80kChFvepA/AvqAZ0oh/UaB6+6pBE= -github.com/apache/dubbo-go-hessian2 v1.11.4/go.mod h1:QP9Tc0w/B/mDopjusebo/c7GgEfl6Lz8jeuFg8JA6yw= github.com/apache/dubbo-go-hessian2 v1.12.0 h1:n2JXPMGc4u/ihBbOt25d3mmv1k92X9TvLnqfgyNscKQ= github.com/apache/dubbo-go-hessian2 v1.12.0/go.mod h1:QP9Tc0w/B/mDopjusebo/c7GgEfl6Lz8jeuFg8JA6yw= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -524,8 +523,8 @@ github.com/dubbogo/grpc-go v1.42.10/go.mod h1:JMkPt1mIHL96GAFeYsMoMjew6f1ROKycik github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU= github.com/dubbogo/net v0.0.4/go.mod h1:1CGOnM7X3he+qgGNqjeADuE5vKZQx/eMSeUkpU3ujIc= github.com/dubbogo/triple v1.0.9/go.mod h1:1t9me4j4CTvNDcsMZy6/OGarbRyAUSY0tFXGXHCp7Iw= -github.com/dubbogo/triple v1.2.2-rc2 h1:2AaLd+uKwnNnR3qOIXTNPU/OHk77qIDNGMX3GstEtaY= -github.com/dubbogo/triple v1.2.2-rc2/go.mod h1:8qprF2uJX82IE5hjiIuswp416sEr0oL/+bb7IjiizYs= +github.com/dubbogo/triple v1.2.2-rc3 h1:9rxLqru35MmJkypCHJMiZb1VzwH+zmbPBend9Cq+VOI= +github.com/dubbogo/triple v1.2.2-rc3/go.mod h1:9pgEahtmsY/avYJp3dzUQE8CMMVe1NtGBmUhfICKLJk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -1153,8 +1152,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= -github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-client-go v2.29.1+incompatible h1:R9ec3zO3sGpzs0abd43Y+fBZRJ9uiH6lXyR/+u6brW4= +github.com/uber/jaeger-client-go v2.29.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.2.6 h1:tGiWC9HENWE2tqYycIqFTNorMmFRVhNwCpDOpWqnk8E= diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go index 9675cc9781..29f2af03fa 100644 --- a/protocol/dubbo3/dubbo3_invoker.go +++ b/protocol/dubbo3/dubbo3_invoker.go @@ -28,6 +28,7 @@ import ( import ( "github.com/dubbogo/gost/log/logger" + "github.com/dustin/go-humanize" "github.com/dubbogo/grpc-go/metadata" @@ -84,16 +85,16 @@ func NewDubboInvoker(url *common.URL) (*DubboInvoker, error) { triConfig.WithHeaderGroup(url.GetParam(constant.GroupKey, "")), triConfig.WithLogger(logger.GetLogger()), } - if maxCall := url.GetParam(constant.MaxCallRecvMsgSize, ""); maxCall != "" { - if size, err := strconv.Atoi(maxCall); err == nil && size != 0 { - opts = append(opts, triConfig.WithGRPCMaxCallRecvMessageSize(size)) - } + maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize + if maxCall, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && maxCall != 0 { + maxCallRecvMsgSize = int(maxCall) } - if maxCall := url.GetParam(constant.MaxCallSendMsgSize, ""); maxCall != "" { - if size, err := strconv.Atoi(maxCall); err == nil && size != 0 { - opts = append(opts, triConfig.WithGRPCMaxCallSendMessageSize(size)) - } + maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize + if maxCall, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && maxCall != 0 { + maxCallSendMsgSize = int(maxCall) } + opts = append(opts, triConfig.WithGRPCMaxCallRecvMessageSize(maxCallRecvMsgSize)) + opts = append(opts, triConfig.WithGRPCMaxCallSendMessageSize(maxCallSendMsgSize)) tracingKey := url.GetParam(constant.TracingConfigKey, "") if tracingKey != "" { diff --git a/protocol/dubbo3/dubbo3_protocol.go b/protocol/dubbo3/dubbo3_protocol.go index 6d298e13f5..ea5a9ece83 100644 --- a/protocol/dubbo3/dubbo3_protocol.go +++ b/protocol/dubbo3/dubbo3_protocol.go @@ -21,12 +21,12 @@ import ( "context" "fmt" "reflect" - "strconv" "sync" ) import ( "github.com/dubbogo/gost/log/logger" + "github.com/dustin/go-humanize" "github.com/dubbogo/grpc-go" "github.com/dubbogo/grpc-go/metadata" @@ -242,16 +242,16 @@ func (dp *DubboProtocol) openServer(url *common.URL, tripleCodecType tripleConst } } - if maxCall := url.GetParam(constant.MaxServerRecvMsgSize, ""); maxCall != "" { - if size, err := strconv.Atoi(maxCall); err == nil && size != 0 { - opts = append(opts, triConfig.WithGRPCMaxServerRecvMessageSize(size)) - } + maxServerRecvMsgSize := constant.DefaultMaxServerRecvMsgSize + if recvMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxServerRecvMsgSize, "")); err == nil && recvMsgSize != 0 { + maxServerRecvMsgSize = int(recvMsgSize) } - if maxCall := url.GetParam(constant.MaxServerSendMsgSize, ""); maxCall != "" { - if size, err := strconv.Atoi(maxCall); err == nil && size != 0 { - opts = append(opts, triConfig.WithGRPCMaxServerSendMessageSize(size)) - } + maxServerSendMsgSize := constant.DefaultMaxServerSendMsgSize + if sendMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxServerSendMsgSize, "")); err == nil && sendMsgSize != 0 { + maxServerSendMsgSize = int(sendMsgSize) } + opts = append(opts, triConfig.WithGRPCMaxServerRecvMessageSize(maxServerRecvMsgSize)) + opts = append(opts, triConfig.WithGRPCMaxServerSendMessageSize(maxServerSendMsgSize)) triOption := triConfig.NewTripleOption(opts...) diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go index a3c3d3485e..fdb127858b 100644 --- a/protocol/grpc/client.go +++ b/protocol/grpc/client.go @@ -19,13 +19,13 @@ package grpc import ( "reflect" - "strconv" "sync" "time" ) import ( "github.com/dubbogo/gost/log/logger" + "github.com/dustin/go-humanize" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" @@ -61,7 +61,16 @@ func NewClient(url *common.URL) (*Client, error) { // If not, will return NoopTracer. tracer := opentracing.GlobalTracer() dialOpts := make([]grpc.DialOption, 0, 4) - maxMessageSize, _ := strconv.Atoi(url.GetParam(constant.MessageSizeKey, "4")) + + // set max send and recv msg size + maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize + if recvMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && recvMsgSize > 0 { + maxCallRecvMsgSize = int(recvMsgSize) + } + maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize + if sendMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && sendMsgSize > 0 { + maxCallSendMsgSize = int(sendMsgSize) + } // consumer config client connectTimeout //connectTimeout := config.GetConsumerConfig().ConnectTimeout @@ -74,8 +83,8 @@ func NewClient(url *common.URL) (*Client, error) { grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer, otgrpc.LogPayloads())), grpc.WithDefaultCallOptions( grpc.CallContentSubtype(clientConf.ContentSubType), - grpc.MaxCallRecvMsgSize(1024*1024*maxMessageSize), - grpc.MaxCallSendMsgSize(1024*1024*maxMessageSize), + grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize), + grpc.MaxCallSendMsgSize(maxCallSendMsgSize), ), ) tlsConfig := config.GetRootConfig().TLSConfig diff --git a/protocol/grpc/client_test.go b/protocol/grpc/client_test.go index 5a5ed63e05..4dc48689e8 100644 --- a/protocol/grpc/client_test.go +++ b/protocol/grpc/client_test.go @@ -19,6 +19,8 @@ package grpc import ( "context" + "fmt" + "github.com/dustin/go-humanize" "testing" ) @@ -88,3 +90,8 @@ func TestStreamClient(t *testing.T) { assert.NoError(t, err) routeguide.RunRouteChat(routeChatStream) } + +func TestT(t *testing.T) { + bytes, err := humanize.ParseBytes("0") + fmt.Println(bytes, err) +} diff --git a/protocol/grpc/grpc_protocol.go b/protocol/grpc/grpc_protocol.go index 5521109f4d..ac610bd2f1 100644 --- a/protocol/grpc/grpc_protocol.go +++ b/protocol/grpc/grpc_protocol.go @@ -18,7 +18,6 @@ package grpc import ( - "strconv" "sync" ) @@ -28,7 +27,6 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/protocol" ) @@ -82,9 +80,7 @@ func (gp *GrpcProtocol) openServer(url *common.URL) { panic("[GrpcProtocol]" + url.Key() + "is not existing") } - grpcMessageSize, _ := strconv.Atoi(url.GetParam(constant.MessageSizeKey, "4")) srv := NewServer() - srv.SetBufferSize(grpcMessageSize) gp.serverMap[url.Location] = srv srv.Start(url) } diff --git a/protocol/grpc/server.go b/protocol/grpc/server.go index 533e4085a2..f1538630c7 100644 --- a/protocol/grpc/server.go +++ b/protocol/grpc/server.go @@ -27,6 +27,7 @@ import ( import ( "github.com/dubbogo/gost/log/logger" + "github.com/dustin/go-humanize" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" @@ -40,6 +41,7 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/config" "dubbo.apache.org/dubbo-go/v3/protocol" ) @@ -81,6 +83,15 @@ func (s *Server) Start(url *common.URL) { panic(err) } + maxServerRecvMsgSize := constant.DefaultMaxServerRecvMsgSize + if recvMsgSize, convertErr := humanize.ParseBytes(url.GetParam(constant.MaxServerRecvMsgSize, "")); convertErr == nil && recvMsgSize != 0 { + maxServerRecvMsgSize = int(recvMsgSize) + } + maxServerSendMsgSize := constant.DefaultMaxServerSendMsgSize + if sendMsgSize, convertErr := humanize.ParseBytes(url.GetParam(constant.MaxServerSendMsgSize, "")); err == convertErr && sendMsgSize != 0 { + maxServerSendMsgSize = int(sendMsgSize) + } + // If global trace instance was set, then server tracer instance // can be get. If not, will return NoopTracer. tracer := opentracing.GlobalTracer() @@ -88,8 +99,8 @@ func (s *Server) Start(url *common.URL) { serverOpts = append(serverOpts, grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)), grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)), - grpc.MaxRecvMsgSize(1024*1024*s.bufferSize), - grpc.MaxSendMsgSize(1024*1024*s.bufferSize), + grpc.MaxRecvMsgSize(maxServerRecvMsgSize), + grpc.MaxSendMsgSize(maxServerSendMsgSize), ) tlsConfig := config.GetRootConfig().TLSConfig