From 4d47771b877ac76ec0945c4655ee0403c088804e Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sat, 30 Mar 2019 22:11:51 +0800 Subject: [PATCH 1/3] add transport interface in client --- client/client_transport.go | 35 ++++++ client/invoker/invoker.go | 24 ++-- .../app/config.go => client_config.go} | 26 ++--- examples/dubbo/go-client/app/client.go | 87 +++++++++----- examples/dubbo/go-client/app/config.go | 108 ------------------ examples/dubbo/go-client/app/test.go | 3 +- examples/jsonrpc/go-client/app/client.go | 34 +++--- examples/jsonrpc/go-client/app/test.go | 10 +- jsonrpc/http.go | 47 ++------ registry/options.go | 23 ++-- 10 files changed, 166 insertions(+), 231 deletions(-) create mode 100644 client/client_transport.go rename examples/{jsonrpc/go-client/app/config.go => client_config.go} (84%) delete mode 100644 examples/dubbo/go-client/app/config.go diff --git a/client/client_transport.go b/client/client_transport.go new file mode 100644 index 0000000000..14544cc92d --- /dev/null +++ b/client/client_transport.go @@ -0,0 +1,35 @@ +package client + +import ( + "context" + "github.com/dubbo/dubbo-go/service" +) + +type Transport interface { + Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error + NewRequest(conf service.ServiceConfig, method string, args interface{}) Request +} + +////////////////////////////////////////////// +// Request +////////////////////////////////////////////// + +type Request struct { + ID int64 + Group string + Protocol string + Version string + Service string + Method string + Args interface{} + ContentType string +} + +func (r *Request) ServiceConfig() service.ServiceConfigIf { + return &service.ServiceConfig{ + Protocol: r.Protocol, + Service: r.Service, + Group: r.Group, + Version: r.Version, + } +} diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go index f91d0de8c7..eac4f16119 100644 --- a/client/invoker/invoker.go +++ b/client/invoker/invoker.go @@ -2,6 +2,7 @@ package invoker import ( "context" + "github.com/dubbo/dubbo-go/client" "sync" "time" ) @@ -13,7 +14,6 @@ import ( import ( "github.com/dubbo/dubbo-go/client/loadBalance" - "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/registry" "github.com/dubbo/dubbo-go/service" ) @@ -21,6 +21,7 @@ import ( type Options struct { ServiceTTL time.Duration selector loadBalance.Selector + Transport client.Transport } type Option func(*Options) @@ -29,6 +30,13 @@ func WithServiceTTL(ttl time.Duration) Option { o.ServiceTTL = ttl } } + +func WithClientTransport(client client.Transport) Option { + return func(o *Options) { + o.Transport = client + } +} + func WithLBSelector(selector loadBalance.Selector) Option { return func(o *Options) { o.selector = selector @@ -37,14 +45,12 @@ func WithLBSelector(selector loadBalance.Selector) Option { type Invoker struct { Options - //TODO:we should provider a transport client interface - Client *jsonrpc.HTTPClient cacheServiceMap map[string]*ServiceArray registry registry.Registry listenerLock sync.Mutex } -func NewInvoker(registry registry.Registry, client *jsonrpc.HTTPClient, opts ...Option) *Invoker { +func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) { options := Options{ //default 300s ServiceTTL: time.Duration(300e9), @@ -53,14 +59,16 @@ func NewInvoker(registry registry.Registry, client *jsonrpc.HTTPClient, opts ... for _, opt := range opts { opt(&options) } + if options.Transport == nil { + return nil, jerrors.New("Must specify the client transport !") + } invoker := &Invoker{ Options: options, - Client: client, cacheServiceMap: make(map[string]*ServiceArray), registry: registry, } invoker.Listen() - return invoker + return invoker, nil } func (ivk *Invoker) Listen() { @@ -144,7 +152,7 @@ func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArra return newSvcArr, nil } -func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error { +func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req client.Request, resp interface{}) error { serviceArray, err := ivk.getService(serviceConf) if err != nil { @@ -157,7 +165,7 @@ func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service. if err != nil { return err } - if err = ivk.Client.Call(ctx, url, req, resp); err != nil { + if err = ivk.Transport.Call(ctx, url, req, resp); err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) return err } diff --git a/examples/jsonrpc/go-client/app/config.go b/examples/client_config.go similarity index 84% rename from examples/jsonrpc/go-client/app/config.go rename to examples/client_config.go index 3d706ed4bf..63bd492e65 100644 --- a/examples/jsonrpc/go-client/app/config.go +++ b/examples/client_config.go @@ -1,4 +1,4 @@ -package main +package examples import ( "fmt" @@ -17,9 +17,9 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/registry" "github.com/dubbo/dubbo-go/registry/zookeeper" "github.com/dubbo/dubbo-go/service" - "github.com/dubbo/dubbo-go/registry" ) const ( @@ -27,10 +27,6 @@ const ( APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" ) -var ( - clientConfig *ClientConfig -) - type ( // Client holds supported types by the multiconfig package ClientConfig struct { @@ -40,28 +36,30 @@ type ( // client Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"` - connectTimeout time.Duration + ConnectTimeout time.Duration Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m - requestTimeout time.Duration + RequestTimeout time.Duration // codec & selector & transport & registry Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"` Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"` //client load balance algorithm - ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"` - Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` + ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"` + Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` // application Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` - ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"` + ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"` // 一个客户端只允许使用一个service的其中一个group和其中一个version Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"` } ) -func initClientConfig() error { +func InitClientConfig() *ClientConfig { + var ( - confFile string + clientConfig *ClientConfig + confFile string ) // configure @@ -105,5 +103,5 @@ func initClientConfig() error { } log.LoadConfiguration(confFile) - return nil + return clientConfig } diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 00f03669c7..d0b092c38d 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -2,6 +2,9 @@ package main import ( "fmt" + "github.com/dubbo/dubbo-go/dubbo" + "github.com/dubbo/dubbo-go/plugins" + "github.com/dubbo/dubbo-go/registry/zookeeper" "net/http" _ "net/http/pprof" "os" @@ -19,39 +22,34 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/client/invoker" + "github.com/dubbo/dubbo-go/examples" "github.com/dubbo/dubbo-go/public" "github.com/dubbo/dubbo-go/registry" ) var ( survivalTimeout int = 10e9 - clientRegistry *registry.ZkConsumerRegistry + clientInvoker *invoker.Invoker ) func main() { - var ( - err error - ) - err = initClientConfig() - if err != nil { - log.Error("initClientConfig() = error{%#v}", err) - return - } - initProfiling() - initClient() + clientConfig := examples.InitClientConfig() + initProfiling(clientConfig) + initClient(clientConfig) time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test dubbo") - testDubborpc("A003") + testDubborpc(clientConfig, "A003") time.Sleep(3e9) initSignal() } -func initClient() { +func initClient(clientConfig *examples.ClientConfig) { var ( err error codecType public.CodecType @@ -63,32 +61,24 @@ func initClient() { } // registry - clientRegistry, err = registry.NewZkConsumerRegistry( - registry.ApplicationConf(clientConfig.Application_Config), - registry.RegistryConf(clientConfig.Registry_Config), - registry.BalanceMode(registry.SM_RoundRobin), - registry.ServiceTTL(300e9), + clientRegistry, err := plugins.PluggableRegistries[clientConfig.Registry]( + registry.WithDubboType(registry.CONSUMER), + registry.WithApplicationConf(clientConfig.Application_Config), + zookeeper.WithRegistryConf(clientConfig.ZkRegistryConfig), ) if err != nil { panic(fmt.Sprintf("fail to init registry.Registy, err:%s", jerrors.ErrorStack(err))) return } - for _, service := range clientConfig.Service_List { - err = clientRegistry.Register(service) - if err != nil { - panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err))) - return - } - } // consumer - clientConfig.requestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) + clientConfig.RequestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Request_Timeout{%#v}) = error{%v}", clientConfig.Request_Timeout, err)) return } - clientConfig.connectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) + clientConfig.ConnectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Connect_Timeout{%#v}) = error{%v}", clientConfig.Connect_Timeout, err)) @@ -101,13 +91,54 @@ func initClient() { panic(fmt.Sprintf("unknown protocol %s", clientConfig.Service_List[idx].Protocol)) } } + + for _, service := range clientConfig.Service_List { + err = clientRegistry.ConsumerRegister(&service) + if err != nil { + panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err))) + return + } + } + + //read the client lb config in config.yml + configClientLB := plugins.PluggableLoadbalance[clientConfig.ClientLoadBalance]() + + //init dubbo rpc client & init invoker + var cltD *dubbo.Client + + cltD, err = dubbo.NewClient(&dubbo.ClientConfig{ + PoolSize: 64, + PoolTTL: 600, + ConnectionNum: 2, // 不能太大 + FailFastTimeout: "5s", + SessionTimeout: "20s", + HeartbeatPeriod: "5s", + GettySessionParam: dubbo.GettySessionParam{ + CompressEncoding: false, // 必须false + TcpNoDelay: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpKeepAlive: true, + TcpWBufSize: 65536, + PkgRQSize: 1024, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 1024, + SessionName: "client", + }, + }) + clientInvoker, err = invoker.NewInvoker(clientRegistry, + invoker.WithClientTransport(cltD), + invoker.WithLBSelector(configClientLB)) } func uninitClient() { log.Close() } -func initProfiling() { +func initProfiling(clientConfig *examples.ClientConfig) { if !clientConfig.Pprof_Enabled { return } diff --git a/examples/dubbo/go-client/app/config.go b/examples/dubbo/go-client/app/config.go deleted file mode 100644 index cc02b1bbf5..0000000000 --- a/examples/dubbo/go-client/app/config.go +++ /dev/null @@ -1,108 +0,0 @@ -package main - -import ( - "fmt" - "github.com/dubbo/dubbo-go/registry/zookeeper" - "github.com/dubbo/dubbo-go/service" - "io/ioutil" - "os" - "path" - "time" -) - -import ( - "github.com/AlexStocks/goext/log" - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" - yaml "gopkg.in/yaml.v2" -) - -import ( - "github.com/dubbo/dubbo-go/registry" -) - -const ( - APP_CONF_FILE = "APP_CONF_FILE" - APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" -) - -var ( - clientConfig *ClientConfig -) - -type ( - // Client holds supported types by the multiconfig package - ClientConfig struct { - // pprof - Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` - Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` - - // client - Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"` - connectTimeout time.Duration - - Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m - requestTimeout time.Duration - - // codec & selector & transport & registry - Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"` - Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"` - //client load balance algorithm - ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"` - Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` - // application - Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` - ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"` - // 一个客户端只允许使用一个service的其中一个group和其中一个version - Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"` - } -) - -func initClientConfig() error { - var ( - confFile string - ) - - // configure - confFile = os.Getenv(APP_CONF_FILE) - if confFile == "" { - panic(fmt.Sprintf("application configure file name is nil")) - return nil // I know it is of no usage. Just Err Protection. - } - if path.Ext(confFile) != ".yml" { - panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile)) - return nil - } - clientConfig = new(ClientConfig) - - confFileStream, err := ioutil.ReadFile(confFile) - if err != nil { - panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err))) - return nil - } - err = yaml.Unmarshal(confFileStream, clientConfig) - if err != nil { - panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))) - return nil - } - if clientConfig.ZkRegistryConfig.Timeout, err = time.ParseDuration(clientConfig.ZkRegistryConfig.TimeoutStr); err != nil { - panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s", clientConfig.ZkRegistryConfig.TimeoutStr, err)) - return nil - } - - gxlog.CInfo("config{%#v}\n", clientConfig) - - // log - confFile = os.Getenv(APP_LOG_CONF_FILE) - if confFile == "" { - panic(fmt.Sprintf("log configure file name is nil")) - return nil - } - if path.Ext(confFile) != ".xml" { - panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", confFile)) - return nil - } - log.LoadConfiguration(confFile) - - return nil -} diff --git a/examples/dubbo/go-client/app/test.go b/examples/dubbo/go-client/app/test.go index a4471d5c1a..5e78c983eb 100644 --- a/examples/dubbo/go-client/app/test.go +++ b/examples/dubbo/go-client/app/test.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/dubbo/dubbo-go/examples" "github.com/dubbogo/hessian2" _ "net/http/pprof" ) @@ -18,7 +19,7 @@ import ( "github.com/dubbo/dubbo-go/registry" ) -func testDubborpc(userKey string) { +func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { var ( err error service string diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index 0b7bb05d90..bb3e853e2d 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -20,6 +20,7 @@ import ( import ( "github.com/dubbo/dubbo-go/client/invoker" + "github.com/dubbo/dubbo-go/examples" "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/plugins" "github.com/dubbo/dubbo-go/public" @@ -33,32 +34,26 @@ var ( ) func main() { - var ( - err error - ) - err = initClientConfig() - if err != nil { - log.Error("initClientConfig() = error{%#v}", err) - return - } - initProfiling() - initClient() + clientConfig := examples.InitClientConfig() + + initProfiling(clientConfig) + initClient(clientConfig) time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test jsonrpc") - testJsonrpc("A003", "GetUser") + testJsonrpc(clientConfig, "A003", "GetUser") time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test jsonrpc illegal method") - testJsonrpc("A003", "GetUser1") + testJsonrpc(clientConfig, "A003", "GetUser1") initSignal() } -func initClient() { +func initClient(clientConfig *examples.ClientConfig) { var ( codecType public.CodecType ) @@ -80,13 +75,13 @@ func initClient() { } // consumer - clientConfig.requestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) + clientConfig.RequestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Request_Timeout{%#v}) = error{%v}", clientConfig.Request_Timeout, err)) return } - clientConfig.connectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) + clientConfig.ConnectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Connect_Timeout{%#v}) = error{%v}", clientConfig.Connect_Timeout, err)) @@ -114,12 +109,13 @@ func initClient() { //init http client & init invoker clt := jsonrpc.NewHTTPClient( &jsonrpc.HTTPOptions{ - HandshakeTimeout: clientConfig.connectTimeout, - HTTPTimeout: clientConfig.requestTimeout, + HandshakeTimeout: clientConfig.ConnectTimeout, + HTTPTimeout: clientConfig.RequestTimeout, }, ) - clientInvoker = invoker.NewInvoker(clientRegistry, clt, + clientInvoker, err = invoker.NewInvoker(clientRegistry, + invoker.WithClientTransport(clt), invoker.WithLBSelector(configClientLB)) } @@ -128,7 +124,7 @@ func uninitClient() { log.Close() } -func initProfiling() { +func initProfiling(clientConfig *examples.ClientConfig) { if !clientConfig.Pprof_Enabled { return } diff --git a/examples/jsonrpc/go-client/app/test.go b/examples/jsonrpc/go-client/app/test.go index d0018884d3..c649e6ea7f 100644 --- a/examples/jsonrpc/go-client/app/test.go +++ b/examples/jsonrpc/go-client/app/test.go @@ -3,7 +3,7 @@ package main import ( "context" "fmt" - + "github.com/dubbo/dubbo-go/client" _ "net/http/pprof" ) @@ -12,12 +12,12 @@ import ( ) import ( - "github.com/dubbo/dubbo-go/jsonrpc" + "github.com/dubbo/dubbo-go/examples" "github.com/dubbo/dubbo-go/public" "github.com/dubbo/dubbo-go/service" ) -func testJsonrpc(userKey string, method string) { +func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method string) { var ( err error svc string @@ -25,7 +25,7 @@ func testJsonrpc(userKey string, method string) { user *JsonRPCUser ctx context.Context conf service.ServiceConfig - req jsonrpc.Request + req client.Request ) serviceIdx = -1 @@ -49,7 +49,7 @@ func testJsonrpc(userKey string, method string) { Service: clientConfig.Service_List[serviceIdx].Service, } // Attention the last parameter : []UserKey{userKey} - req = clientInvoker.Client.NewRequest(conf, method, []string{userKey}) + req = clientInvoker.Transport.NewRequest(conf, method, []string{userKey}) ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", diff --git a/jsonrpc/http.go b/jsonrpc/http.go index 053a135bd0..fb6050cf60 100644 --- a/jsonrpc/http.go +++ b/jsonrpc/http.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "github.com/dubbo/dubbo-go/client" "github.com/dubbo/dubbo-go/service" "io/ioutil" "net" @@ -24,30 +25,6 @@ import ( "github.com/dubbo/dubbo-go/public" ) -////////////////////////////////////////////// -// Request -////////////////////////////////////////////// - -type Request struct { - ID int64 - group string - protocol string - version string - service string - method string - args interface{} - contentType string -} - -func (r *Request) ServiceConfig() service.ServiceConfigIf { - return &service.ServiceConfig{ - Protocol: r.protocol, - Service: r.service, - Group: r.group, - Version: r.version, - } -} - ////////////////////////////////////////////// // HTTP Client ////////////////////////////////////////////// @@ -86,19 +63,19 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request { - return Request{ +func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) client.Request { + return client.Request{ ID: atomic.AddInt64(&c.ID, 1), - group: conf.Group, - protocol: conf.Protocol, - version: conf.Version, - service: conf.Service, - method: method, - args: args, + Group: conf.Group, + Protocol: conf.Protocol, + Version: conf.Version, + Service: conf.Service, + Method: method, + Args: args, } } -func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error { +func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req client.Request, rsp interface{}) error { // header httpHeader := http.Header{} httpHeader.Set("Content-Type", "application/json") @@ -122,8 +99,8 @@ func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req codec := newJsonClientCodec() codecData := CodecData{ ID: req.ID, - Method: req.method, - Args: req.args, + Method: req.Method, + Args: req.Args, } reqBody, err := codec.Write(&codecData) if err != nil { diff --git a/registry/options.go b/registry/options.go index aa9ce8cc84..d286d48c5a 100644 --- a/registry/options.go +++ b/registry/options.go @@ -11,6 +11,7 @@ var ( DubboNodes = [...]string{"consumers", "configurators", "routers", "providers"} DubboRole = [...]string{"consumer", "", "", "provider"} ) + type DubboType int func (t DubboType) String() string { @@ -20,6 +21,7 @@ func (t DubboType) String() string { func (t DubboType) Role() string { return DubboRole[t] } + type ApplicationConfig struct { Organization string `yaml:"organization" json:"organization,omitempty"` Name string `yaml:"name" json:"name,omitempty"` @@ -29,19 +31,14 @@ type ApplicationConfig struct { Environment string `yaml:"environment" json:"environment,omitempty"` } - - - -type OptionInf interface{ - OptionName()string +type OptionInf interface { + OptionName() string } -type Options struct{ +type Options struct { ApplicationConfig - DubboType DubboType + DubboType DubboType } - - //func (c *ApplicationConfig) ToString() string { // return fmt.Sprintf("ApplicationConfig is {name:%s, version:%s, owner:%s, module:%s, organization:%s}", // c.Name, c.Version, c.Owner, c.Module, c.Organization) @@ -49,17 +46,17 @@ type Options struct{ type Option func(*Options) -func(Option)OptionName() string { +func (Option) OptionName() string { return "Abstact option func" } -func WithDubboType(tp DubboType)Option{ - return func (o *Options){ +func WithDubboType(tp DubboType) Option { + return func(o *Options) { o.DubboType = tp } } -func WithApplicationConf(conf ApplicationConfig) Option { +func WithApplicationConf(conf ApplicationConfig) Option { return func(o *Options) { o.ApplicationConfig = conf } From c1f89b0b531018ef718dda70676a111bf0baf6e1 Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sat, 30 Mar 2019 22:58:15 +0800 Subject: [PATCH 2/3] rollback add transport interface in client & success to integrate to call dubbo~ --- client/client_transport.go | 62 +++++++++---------- client/invoker/invoker.go | 46 +++++++++++--- client/loadBalance/load_balance.go | 4 +- dubbo/client.go | 10 +-- examples/dubbo/go-client/app/client.go | 6 +- examples/dubbo/go-client/app/test.go | 55 +++------------- .../dubbo/go-client/profiles/dev/client.yml | 3 +- examples/jsonrpc/go-client/app/client.go | 2 +- examples/jsonrpc/go-client/app/test.go | 8 +-- jsonrpc/http.go | 47 ++++++++++---- 10 files changed, 129 insertions(+), 114 deletions(-) diff --git a/client/client_transport.go b/client/client_transport.go index 14544cc92d..e3d31b60ef 100644 --- a/client/client_transport.go +++ b/client/client_transport.go @@ -1,35 +1,31 @@ package client -import ( - "context" - "github.com/dubbo/dubbo-go/service" -) - -type Transport interface { - Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error - NewRequest(conf service.ServiceConfig, method string, args interface{}) Request -} - -////////////////////////////////////////////// -// Request -////////////////////////////////////////////// - -type Request struct { - ID int64 - Group string - Protocol string - Version string - Service string - Method string - Args interface{} - ContentType string -} - -func (r *Request) ServiceConfig() service.ServiceConfigIf { - return &service.ServiceConfig{ - Protocol: r.Protocol, - Service: r.Service, - Group: r.Group, - Version: r.Version, - } -} +// +//type Transport interface { +// Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error +// NewRequest(conf service.ServiceConfig, method string, args interface{}) Request +//} +// +//////////////////////////////////////////////// +//// Request +//////////////////////////////////////////////// +// +//type Request struct { +// ID int64 +// Group string +// Protocol string +// Version string +// Service string +// Method string +// Args interface{} +// ContentType string +//} +// +//func (r *Request) ServiceConfig() service.ServiceConfigIf { +// return &service.ServiceConfig{ +// Protocol: r.Protocol, +// Service: r.Service, +// Group: r.Group, +// Version: r.Version, +// } +//} diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go index eac4f16119..704de9231b 100644 --- a/client/invoker/invoker.go +++ b/client/invoker/invoker.go @@ -2,7 +2,7 @@ package invoker import ( "context" - "github.com/dubbo/dubbo-go/client" + "github.com/dubbo/dubbo-go/dubbo" "sync" "time" ) @@ -14,6 +14,7 @@ import ( import ( "github.com/dubbo/dubbo-go/client/loadBalance" + "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/registry" "github.com/dubbo/dubbo-go/service" ) @@ -21,7 +22,9 @@ import ( type Options struct { ServiceTTL time.Duration selector loadBalance.Selector - Transport client.Transport + //TODO:we should provider a transport client interface + HttpClient *jsonrpc.HTTPClient + DubboClient *dubbo.Client } type Option func(*Options) @@ -31,9 +34,14 @@ func WithServiceTTL(ttl time.Duration) Option { } } -func WithClientTransport(client client.Transport) Option { +func WithHttpClient(client *jsonrpc.HTTPClient) Option { return func(o *Options) { - o.Transport = client + o.HttpClient = client + } +} +func WithDubboClient(client *dubbo.Client) Option { + return func(o *Options) { + o.DubboClient = client } } @@ -59,8 +67,8 @@ func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) { for _, opt := range opts { opt(&options) } - if options.Transport == nil { - return nil, jerrors.New("Must specify the client transport !") + if options.HttpClient == nil && options.DubboClient == nil { + return nil, jerrors.New("Must specify the transport client!") } invoker := &Invoker{ Options: options, @@ -152,7 +160,7 @@ func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArra return newSvcArr, nil } -func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req client.Request, resp interface{}) error { +func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error { serviceArray, err := ivk.getService(serviceConf) if err != nil { @@ -165,10 +173,32 @@ func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service. if err != nil { return err } - if err = ivk.Transport.Call(ctx, url, req, resp); err != nil { + if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) return err } log.Info("response result:%s", resp) return nil } + +func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { + + serviceArray, err := ivk.getService(serviceConf) + if err != nil { + return err + } + if len(serviceArray.arr) == 0 { + return jerrors.New("cannot find svc " + serviceConf.String()) + } + url, err := ivk.selector.Select(reqId, serviceArray) + if err != nil { + return err + } + //TODO:这里要改一下call方法改为接收指针类型 + if err = ivk.DubboClient.Call(url.Ip+":"+url.Port, *url, method, args, reply, opts...); err != nil { + log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) + return err + } + log.Info("response result:%s", reply) + return nil +} diff --git a/client/loadBalance/load_balance.go b/client/loadBalance/load_balance.go index 3e8137a7d8..61a5473d3f 100644 --- a/client/loadBalance/load_balance.go +++ b/client/loadBalance/load_balance.go @@ -6,7 +6,7 @@ import ( ) type Selector interface { - Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) + Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) } ////////////////////////////////////////// @@ -37,5 +37,3 @@ type Selector interface { // // return "" //} - - diff --git a/dubbo/client.go b/dubbo/client.go index 08e0a72a36..958706a6b7 100644 --- a/dubbo/client.go +++ b/dubbo/client.go @@ -16,7 +16,7 @@ import ( import ( "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" + svc "github.com/dubbo/dubbo-go/service" ) var ( @@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) { } // call one way -func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error { +func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method stri } // if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { +func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, ar return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) } -func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{}, +func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{}, callback AsyncCallback, reply interface{}, opts ...CallOption) error { var copts CallOptions @@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method strin return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string, +func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { if opts.RequestTimeout == 0 { diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index d0b092c38d..43c3bae1e0 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -129,8 +129,12 @@ func initClient(clientConfig *examples.ClientConfig) { SessionName: "client", }, }) + if err != nil { + log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err)) + return + } clientInvoker, err = invoker.NewInvoker(clientRegistry, - invoker.WithClientTransport(cltD), + invoker.WithDubboClient(cltD), invoker.WithLBSelector(configClientLB)) } diff --git a/examples/dubbo/go-client/app/test.go b/examples/dubbo/go-client/app/test.go index 5e78c983eb..804b294fa3 100644 --- a/examples/dubbo/go-client/app/test.go +++ b/examples/dubbo/go-client/app/test.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/dubbo/dubbo-go/examples" + "github.com/dubbo/dubbo-go/service" "github.com/dubbogo/hessian2" _ "net/http/pprof" ) @@ -16,53 +17,21 @@ import ( import ( "github.com/dubbo/dubbo-go/dubbo" "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" ) -func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { +func testDubborpc(clientConfig *examples.ClientConfig, userKey string) { var ( err error - service string + svc string method string serviceIdx int user *DubboUser - conf registry.ServiceConfig - serviceURL *registry.ServiceURL - cltD *dubbo.Client + conf service.ServiceConfig ) - - cltD, err = dubbo.NewClient(&dubbo.ClientConfig{ - PoolSize: 64, - PoolTTL: 600, - ConnectionNum: 2, // 不能太大 - FailFastTimeout: "5s", - SessionTimeout: "20s", - HeartbeatPeriod: "5s", - GettySessionParam: dubbo.GettySessionParam{ - CompressEncoding: false, // 必须false - TcpNoDelay: true, - KeepAlivePeriod: "120s", - TcpRBufSize: 262144, - TcpKeepAlive: true, - TcpWBufSize: 65536, - PkgRQSize: 1024, - PkgWQSize: 512, - TcpReadTimeout: "1s", - TcpWriteTimeout: "5s", - WaitTimeout: "1s", - MaxMsgLen: 1024, - SessionName: "client", - }, - }) - if err != nil { - log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err)) - return - } - defer cltD.Close() serviceIdx = -1 - service = "com.ikurento.user.UserProvider" + svc = "com.ikurento.user.UserProvider" for i := range clientConfig.Service_List { - if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() { + if clientConfig.Service_List[i].Service == svc && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() { serviceIdx = i break } @@ -73,20 +42,13 @@ func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { // Create request method = string("GetUser") - conf = registry.ServiceConfig{ + conf = service.ServiceConfig{ Group: clientConfig.Service_List[serviceIdx].Group, Protocol: public.CodecType(public.CODECTYPE_DUBBO).String(), Version: clientConfig.Service_List[serviceIdx].Version, Service: clientConfig.Service_List[serviceIdx].Service, } - serviceURL, err = clientRegistry.Filter(conf, 1) - if err != nil { - log.Error("registry.Filter(conf:%#v) = error:%s", conf, jerrors.ErrorStack(err)) - return - } - log.Debug("got serviceURL: %s", serviceURL) - // registry pojo hessian.RegisterJavaEnum(Gender(MAN)) hessian.RegisterJavaEnum(Gender(WOMAN)) @@ -94,7 +56,8 @@ func testDubborpc(clientConfig *examples.ClientConfig,userKey string) { hessian.RegisterPOJO(&Response{}) user = new(DubboUser) - err = cltD.Call(serviceURL.Ip+":"+serviceURL.Port, *serviceURL, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default)) + defer clientInvoker.DubboClient.Close() + err = clientInvoker.DubboCall(1, &conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default)) // Call service if err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index d3332bade9..de296c7c05 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "dev" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181" diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index bb3e853e2d..28f4f267e0 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -115,7 +115,7 @@ func initClient(clientConfig *examples.ClientConfig) { ) clientInvoker, err = invoker.NewInvoker(clientRegistry, - invoker.WithClientTransport(clt), + invoker.WithHttpClient(clt), invoker.WithLBSelector(configClientLB)) } diff --git a/examples/jsonrpc/go-client/app/test.go b/examples/jsonrpc/go-client/app/test.go index c649e6ea7f..fb3d008b04 100644 --- a/examples/jsonrpc/go-client/app/test.go +++ b/examples/jsonrpc/go-client/app/test.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - "github.com/dubbo/dubbo-go/client" _ "net/http/pprof" ) @@ -13,6 +12,7 @@ import ( import ( "github.com/dubbo/dubbo-go/examples" + "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/public" "github.com/dubbo/dubbo-go/service" ) @@ -25,7 +25,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str user *JsonRPCUser ctx context.Context conf service.ServiceConfig - req client.Request + req jsonrpc.Request ) serviceIdx = -1 @@ -49,7 +49,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str Service: clientConfig.Service_List[serviceIdx].Service, } // Attention the last parameter : []UserKey{userKey} - req = clientInvoker.Transport.NewRequest(conf, method, []string{userKey}) + req = clientInvoker.HttpClient.NewRequest(conf, method, []string{userKey}) ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", @@ -59,7 +59,7 @@ func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method str user = new(JsonRPCUser) - err = clientInvoker.Call(ctx, 1, &conf, req, user) + err = clientInvoker.HttpCall(ctx, 1, &conf, req, user) if err != nil { panic(err) } else { diff --git a/jsonrpc/http.go b/jsonrpc/http.go index fb6050cf60..053a135bd0 100644 --- a/jsonrpc/http.go +++ b/jsonrpc/http.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "fmt" - "github.com/dubbo/dubbo-go/client" "github.com/dubbo/dubbo-go/service" "io/ioutil" "net" @@ -25,6 +24,30 @@ import ( "github.com/dubbo/dubbo-go/public" ) +////////////////////////////////////////////// +// Request +////////////////////////////////////////////// + +type Request struct { + ID int64 + group string + protocol string + version string + service string + method string + args interface{} + contentType string +} + +func (r *Request) ServiceConfig() service.ServiceConfigIf { + return &service.ServiceConfig{ + Protocol: r.protocol, + Service: r.service, + Group: r.group, + Version: r.version, + } +} + ////////////////////////////////////////////// // HTTP Client ////////////////////////////////////////////// @@ -63,19 +86,19 @@ func NewHTTPClient(opt *HTTPOptions) *HTTPClient { } } -func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) client.Request { - return client.Request{ +func (c *HTTPClient) NewRequest(conf service.ServiceConfig, method string, args interface{}) Request { + return Request{ ID: atomic.AddInt64(&c.ID, 1), - Group: conf.Group, - Protocol: conf.Protocol, - Version: conf.Version, - Service: conf.Service, - Method: method, - Args: args, + group: conf.Group, + protocol: conf.Protocol, + version: conf.Version, + service: conf.Service, + method: method, + args: args, } } -func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req client.Request, rsp interface{}) error { +func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req Request, rsp interface{}) error { // header httpHeader := http.Header{} httpHeader.Set("Content-Type", "application/json") @@ -99,8 +122,8 @@ func (c *HTTPClient) Call(ctx context.Context, service *service.ServiceURL, req codec := newJsonClientCodec() codecData := CodecData{ ID: req.ID, - Method: req.Method, - Args: req.Args, + Method: req.method, + Args: req.args, } reqBody, err := codec.Write(&codecData) if err != nil { From ea0925935806d5ca0f21eedcbdd8fc3ae20d826a Mon Sep 17 00:00:00 2001 From: "vito.he" Date: Sat, 30 Mar 2019 23:02:40 +0800 Subject: [PATCH 3/3] rollback add transport interface in client & success to integrate to call dubbo~ --- examples/dubbo/go-client/profiles/release/client.yml | 3 ++- examples/dubbo/go-client/profiles/test/client.yml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index c7f7e427a2..b91ae0ea12 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "product" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181" diff --git a/examples/dubbo/go-client/profiles/test/client.yml b/examples/dubbo/go-client/profiles/test/client.yml index 22ea30eb00..e7ada98290 100644 --- a/examples/dubbo/go-client/profiles/test/client.yml +++ b/examples/dubbo/go-client/profiles/test/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "test" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181"