diff --git a/client/client_transport.go b/client/client_transport.go new file mode 100644 index 0000000000..e3d31b60ef --- /dev/null +++ b/client/client_transport.go @@ -0,0 +1,31 @@ +package client + +// +//type Transport interface { +// Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error +// NewRequest(conf service.ServiceConfig, method string, args interface{}) Request +//} +// +//////////////////////////////////////////////// +//// Request +//////////////////////////////////////////////// +// +//type Request struct { +// ID int64 +// Group string +// Protocol string +// Version string +// Service string +// Method string +// Args interface{} +// ContentType string +//} +// +//func (r *Request) ServiceConfig() service.ServiceConfigIf { +// return &service.ServiceConfig{ +// Protocol: r.Protocol, +// Service: r.Service, +// Group: r.Group, +// Version: r.Version, +// } +//} diff --git a/client/invoker/invoker.go b/client/invoker/invoker.go index f91d0de8c7..704de9231b 100644 --- a/client/invoker/invoker.go +++ b/client/invoker/invoker.go @@ -2,6 +2,7 @@ package invoker import ( "context" + "github.com/dubbo/dubbo-go/dubbo" "sync" "time" ) @@ -21,6 +22,9 @@ import ( type Options struct { ServiceTTL time.Duration selector loadBalance.Selector + //TODO:we should provider a transport client interface + HttpClient *jsonrpc.HTTPClient + DubboClient *dubbo.Client } type Option func(*Options) @@ -29,6 +33,18 @@ func WithServiceTTL(ttl time.Duration) Option { o.ServiceTTL = ttl } } + +func WithHttpClient(client *jsonrpc.HTTPClient) Option { + return func(o *Options) { + o.HttpClient = client + } +} +func WithDubboClient(client *dubbo.Client) Option { + return func(o *Options) { + o.DubboClient = client + } +} + func WithLBSelector(selector loadBalance.Selector) Option { return func(o *Options) { o.selector = selector @@ -37,14 +53,12 @@ func WithLBSelector(selector loadBalance.Selector) Option { type Invoker struct { Options - //TODO:we should provider a transport client interface - Client *jsonrpc.HTTPClient cacheServiceMap map[string]*ServiceArray registry registry.Registry listenerLock sync.Mutex } -func NewInvoker(registry registry.Registry, client *jsonrpc.HTTPClient, opts ...Option) *Invoker { +func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) { options := Options{ //default 300s ServiceTTL: time.Duration(300e9), @@ -53,14 +67,16 @@ func NewInvoker(registry registry.Registry, client *jsonrpc.HTTPClient, opts ... for _, opt := range opts { opt(&options) } + if options.HttpClient == nil && options.DubboClient == nil { + return nil, jerrors.New("Must specify the transport client!") + } invoker := &Invoker{ Options: options, - Client: client, cacheServiceMap: make(map[string]*ServiceArray), registry: registry, } invoker.Listen() - return invoker + return invoker, nil } func (ivk *Invoker) Listen() { @@ -144,7 +160,7 @@ func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArra return newSvcArr, nil } -func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error { +func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error { serviceArray, err := ivk.getService(serviceConf) if err != nil { @@ -157,10 +173,32 @@ func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service. if err != nil { return err } - if err = ivk.Client.Call(ctx, url, req, resp); err != nil { + if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) return err } log.Info("response result:%s", resp) return nil } + +func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error { + + serviceArray, err := ivk.getService(serviceConf) + if err != nil { + return err + } + if len(serviceArray.arr) == 0 { + return jerrors.New("cannot find svc " + serviceConf.String()) + } + url, err := ivk.selector.Select(reqId, serviceArray) + if err != nil { + return err + } + //TODO:这里要改一下call方法改为接收指针类型 + if err = ivk.DubboClient.Call(url.Ip+":"+url.Port, *url, method, args, reply, opts...); err != nil { + log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) + return err + } + log.Info("response result:%s", reply) + return nil +} diff --git a/client/loadBalance/load_balance.go b/client/loadBalance/load_balance.go index 3e8137a7d8..61a5473d3f 100644 --- a/client/loadBalance/load_balance.go +++ b/client/loadBalance/load_balance.go @@ -6,7 +6,7 @@ import ( ) type Selector interface { - Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) + Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error) } ////////////////////////////////////////// @@ -37,5 +37,3 @@ type Selector interface { // // return "" //} - - diff --git a/dubbo/client.go b/dubbo/client.go index 08e0a72a36..958706a6b7 100644 --- a/dubbo/client.go +++ b/dubbo/client.go @@ -16,7 +16,7 @@ import ( import ( "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" + svc "github.com/dubbo/dubbo-go/service" ) var ( @@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) { } // call one way -func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error { +func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method stri } // if @reply is nil, the transport layer will get the response without notify the invoker. -func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { +func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error { var copts CallOptions for _, o := range opts { @@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, ar return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts)) } -func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{}, +func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{}, callback AsyncCallback, reply interface{}, opts ...CallOption) error { var copts CallOptions @@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method strin return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts)) } -func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string, +func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, callback AsyncCallback, opts CallOptions) error { if opts.RequestTimeout == 0 { diff --git a/examples/jsonrpc/go-client/app/config.go b/examples/client_config.go similarity index 84% rename from examples/jsonrpc/go-client/app/config.go rename to examples/client_config.go index 3d706ed4bf..63bd492e65 100644 --- a/examples/jsonrpc/go-client/app/config.go +++ b/examples/client_config.go @@ -1,4 +1,4 @@ -package main +package examples import ( "fmt" @@ -17,9 +17,9 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/registry" "github.com/dubbo/dubbo-go/registry/zookeeper" "github.com/dubbo/dubbo-go/service" - "github.com/dubbo/dubbo-go/registry" ) const ( @@ -27,10 +27,6 @@ const ( APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" ) -var ( - clientConfig *ClientConfig -) - type ( // Client holds supported types by the multiconfig package ClientConfig struct { @@ -40,28 +36,30 @@ type ( // client Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"` - connectTimeout time.Duration + ConnectTimeout time.Duration Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m - requestTimeout time.Duration + RequestTimeout time.Duration // codec & selector & transport & registry Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"` Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"` //client load balance algorithm - ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"` - Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` + ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"` + Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` // application Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` - ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"` + ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"` // 一个客户端只允许使用一个service的其中一个group和其中一个version Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"` } ) -func initClientConfig() error { +func InitClientConfig() *ClientConfig { + var ( - confFile string + clientConfig *ClientConfig + confFile string ) // configure @@ -105,5 +103,5 @@ func initClientConfig() error { } log.LoadConfiguration(confFile) - return nil + return clientConfig } diff --git a/examples/dubbo/go-client/app/client.go b/examples/dubbo/go-client/app/client.go index 00f03669c7..43c3bae1e0 100644 --- a/examples/dubbo/go-client/app/client.go +++ b/examples/dubbo/go-client/app/client.go @@ -2,6 +2,9 @@ package main import ( "fmt" + "github.com/dubbo/dubbo-go/dubbo" + "github.com/dubbo/dubbo-go/plugins" + "github.com/dubbo/dubbo-go/registry/zookeeper" "net/http" _ "net/http/pprof" "os" @@ -19,39 +22,34 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/client/invoker" + "github.com/dubbo/dubbo-go/examples" "github.com/dubbo/dubbo-go/public" "github.com/dubbo/dubbo-go/registry" ) var ( survivalTimeout int = 10e9 - clientRegistry *registry.ZkConsumerRegistry + clientInvoker *invoker.Invoker ) func main() { - var ( - err error - ) - err = initClientConfig() - if err != nil { - log.Error("initClientConfig() = error{%#v}", err) - return - } - initProfiling() - initClient() + clientConfig := examples.InitClientConfig() + initProfiling(clientConfig) + initClient(clientConfig) time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test dubbo") - testDubborpc("A003") + testDubborpc(clientConfig, "A003") time.Sleep(3e9) initSignal() } -func initClient() { +func initClient(clientConfig *examples.ClientConfig) { var ( err error codecType public.CodecType @@ -63,32 +61,24 @@ func initClient() { } // registry - clientRegistry, err = registry.NewZkConsumerRegistry( - registry.ApplicationConf(clientConfig.Application_Config), - registry.RegistryConf(clientConfig.Registry_Config), - registry.BalanceMode(registry.SM_RoundRobin), - registry.ServiceTTL(300e9), + clientRegistry, err := plugins.PluggableRegistries[clientConfig.Registry]( + registry.WithDubboType(registry.CONSUMER), + registry.WithApplicationConf(clientConfig.Application_Config), + zookeeper.WithRegistryConf(clientConfig.ZkRegistryConfig), ) if err != nil { panic(fmt.Sprintf("fail to init registry.Registy, err:%s", jerrors.ErrorStack(err))) return } - for _, service := range clientConfig.Service_List { - err = clientRegistry.Register(service) - if err != nil { - panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err))) - return - } - } // consumer - clientConfig.requestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) + clientConfig.RequestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Request_Timeout{%#v}) = error{%v}", clientConfig.Request_Timeout, err)) return } - clientConfig.connectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) + clientConfig.ConnectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Connect_Timeout{%#v}) = error{%v}", clientConfig.Connect_Timeout, err)) @@ -101,13 +91,58 @@ func initClient() { panic(fmt.Sprintf("unknown protocol %s", clientConfig.Service_List[idx].Protocol)) } } + + for _, service := range clientConfig.Service_List { + err = clientRegistry.ConsumerRegister(&service) + if err != nil { + panic(fmt.Sprintf("registry.Register(service{%#v}) = error{%v}", service, jerrors.ErrorStack(err))) + return + } + } + + //read the client lb config in config.yml + configClientLB := plugins.PluggableLoadbalance[clientConfig.ClientLoadBalance]() + + //init dubbo rpc client & init invoker + var cltD *dubbo.Client + + cltD, err = dubbo.NewClient(&dubbo.ClientConfig{ + PoolSize: 64, + PoolTTL: 600, + ConnectionNum: 2, // 不能太大 + FailFastTimeout: "5s", + SessionTimeout: "20s", + HeartbeatPeriod: "5s", + GettySessionParam: dubbo.GettySessionParam{ + CompressEncoding: false, // 必须false + TcpNoDelay: true, + KeepAlivePeriod: "120s", + TcpRBufSize: 262144, + TcpKeepAlive: true, + TcpWBufSize: 65536, + PkgRQSize: 1024, + PkgWQSize: 512, + TcpReadTimeout: "1s", + TcpWriteTimeout: "5s", + WaitTimeout: "1s", + MaxMsgLen: 1024, + SessionName: "client", + }, + }) + if err != nil { + log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err)) + return + } + clientInvoker, err = invoker.NewInvoker(clientRegistry, + invoker.WithDubboClient(cltD), + invoker.WithLBSelector(configClientLB)) } func uninitClient() { log.Close() } -func initProfiling() { +func initProfiling(clientConfig *examples.ClientConfig) { if !clientConfig.Pprof_Enabled { return } diff --git a/examples/dubbo/go-client/app/config.go b/examples/dubbo/go-client/app/config.go deleted file mode 100644 index cc02b1bbf5..0000000000 --- a/examples/dubbo/go-client/app/config.go +++ /dev/null @@ -1,108 +0,0 @@ -package main - -import ( - "fmt" - "github.com/dubbo/dubbo-go/registry/zookeeper" - "github.com/dubbo/dubbo-go/service" - "io/ioutil" - "os" - "path" - "time" -) - -import ( - "github.com/AlexStocks/goext/log" - log "github.com/AlexStocks/log4go" - jerrors "github.com/juju/errors" - yaml "gopkg.in/yaml.v2" -) - -import ( - "github.com/dubbo/dubbo-go/registry" -) - -const ( - APP_CONF_FILE = "APP_CONF_FILE" - APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE" -) - -var ( - clientConfig *ClientConfig -) - -type ( - // Client holds supported types by the multiconfig package - ClientConfig struct { - // pprof - Pprof_Enabled bool `default:"false" yaml:"pprof_enabled" json:"pprof_enabled,omitempty"` - Pprof_Port int `default:"10086" yaml:"pprof_port" json:"pprof_port,omitempty"` - - // client - Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"` - connectTimeout time.Duration - - Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m - requestTimeout time.Duration - - // codec & selector & transport & registry - Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"` - Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"` - //client load balance algorithm - ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"` - Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"` - // application - Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"` - ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"` - // 一个客户端只允许使用一个service的其中一个group和其中一个version - Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"` - } -) - -func initClientConfig() error { - var ( - confFile string - ) - - // configure - confFile = os.Getenv(APP_CONF_FILE) - if confFile == "" { - panic(fmt.Sprintf("application configure file name is nil")) - return nil // I know it is of no usage. Just Err Protection. - } - if path.Ext(confFile) != ".yml" { - panic(fmt.Sprintf("application configure file name{%v} suffix must be .yml", confFile)) - return nil - } - clientConfig = new(ClientConfig) - - confFileStream, err := ioutil.ReadFile(confFile) - if err != nil { - panic(fmt.Sprintf("ioutil.ReadFile(file:%s) = error:%s", confFile, jerrors.ErrorStack(err))) - return nil - } - err = yaml.Unmarshal(confFileStream, clientConfig) - if err != nil { - panic(fmt.Sprintf("yaml.Unmarshal() = error:%s", jerrors.ErrorStack(err))) - return nil - } - if clientConfig.ZkRegistryConfig.Timeout, err = time.ParseDuration(clientConfig.ZkRegistryConfig.TimeoutStr); err != nil { - panic(fmt.Sprintf("time.ParseDuration(Registry_Config.Timeout:%#v) = error:%s", clientConfig.ZkRegistryConfig.TimeoutStr, err)) - return nil - } - - gxlog.CInfo("config{%#v}\n", clientConfig) - - // log - confFile = os.Getenv(APP_LOG_CONF_FILE) - if confFile == "" { - panic(fmt.Sprintf("log configure file name is nil")) - return nil - } - if path.Ext(confFile) != ".xml" { - panic(fmt.Sprintf("log configure file name{%v} suffix must be .xml", confFile)) - return nil - } - log.LoadConfiguration(confFile) - - return nil -} diff --git a/examples/dubbo/go-client/app/test.go b/examples/dubbo/go-client/app/test.go index a4471d5c1a..804b294fa3 100644 --- a/examples/dubbo/go-client/app/test.go +++ b/examples/dubbo/go-client/app/test.go @@ -2,6 +2,8 @@ package main import ( "fmt" + "github.com/dubbo/dubbo-go/examples" + "github.com/dubbo/dubbo-go/service" "github.com/dubbogo/hessian2" _ "net/http/pprof" ) @@ -15,53 +17,21 @@ import ( import ( "github.com/dubbo/dubbo-go/dubbo" "github.com/dubbo/dubbo-go/public" - "github.com/dubbo/dubbo-go/registry" ) -func testDubborpc(userKey string) { +func testDubborpc(clientConfig *examples.ClientConfig, userKey string) { var ( err error - service string + svc string method string serviceIdx int user *DubboUser - conf registry.ServiceConfig - serviceURL *registry.ServiceURL - cltD *dubbo.Client + conf service.ServiceConfig ) - - cltD, err = dubbo.NewClient(&dubbo.ClientConfig{ - PoolSize: 64, - PoolTTL: 600, - ConnectionNum: 2, // 不能太大 - FailFastTimeout: "5s", - SessionTimeout: "20s", - HeartbeatPeriod: "5s", - GettySessionParam: dubbo.GettySessionParam{ - CompressEncoding: false, // 必须false - TcpNoDelay: true, - KeepAlivePeriod: "120s", - TcpRBufSize: 262144, - TcpKeepAlive: true, - TcpWBufSize: 65536, - PkgRQSize: 1024, - PkgWQSize: 512, - TcpReadTimeout: "1s", - TcpWriteTimeout: "5s", - WaitTimeout: "1s", - MaxMsgLen: 1024, - SessionName: "client", - }, - }) - if err != nil { - log.Error("hessian.NewClient(conf) = error:%s", jerrors.ErrorStack(err)) - return - } - defer cltD.Close() serviceIdx = -1 - service = "com.ikurento.user.UserProvider" + svc = "com.ikurento.user.UserProvider" for i := range clientConfig.Service_List { - if clientConfig.Service_List[i].Service == service && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() { + if clientConfig.Service_List[i].Service == svc && clientConfig.Service_List[i].Protocol == public.CODECTYPE_DUBBO.String() { serviceIdx = i break } @@ -72,20 +42,13 @@ func testDubborpc(userKey string) { // Create request method = string("GetUser") - conf = registry.ServiceConfig{ + conf = service.ServiceConfig{ Group: clientConfig.Service_List[serviceIdx].Group, Protocol: public.CodecType(public.CODECTYPE_DUBBO).String(), Version: clientConfig.Service_List[serviceIdx].Version, Service: clientConfig.Service_List[serviceIdx].Service, } - serviceURL, err = clientRegistry.Filter(conf, 1) - if err != nil { - log.Error("registry.Filter(conf:%#v) = error:%s", conf, jerrors.ErrorStack(err)) - return - } - log.Debug("got serviceURL: %s", serviceURL) - // registry pojo hessian.RegisterJavaEnum(Gender(MAN)) hessian.RegisterJavaEnum(Gender(WOMAN)) @@ -93,7 +56,8 @@ func testDubborpc(userKey string) { hessian.RegisterPOJO(&Response{}) user = new(DubboUser) - err = cltD.Call(serviceURL.Ip+":"+serviceURL.Port, *serviceURL, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default)) + defer clientInvoker.DubboClient.Close() + err = clientInvoker.DubboCall(1, &conf, method, []interface{}{userKey}, user, dubbo.WithCallRequestTimeout(10e9), dubbo.WithCallResponseTimeout(10e9), dubbo.WithCallSerialID(dubbo.S_Default)) // Call service if err != nil { log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err)) diff --git a/examples/dubbo/go-client/profiles/dev/client.yml b/examples/dubbo/go-client/profiles/dev/client.yml index d3332bade9..de296c7c05 100644 --- a/examples/dubbo/go-client/profiles/dev/client.yml +++ b/examples/dubbo/go-client/profiles/dev/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "dev" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181" diff --git a/examples/dubbo/go-client/profiles/release/client.yml b/examples/dubbo/go-client/profiles/release/client.yml index c7f7e427a2..b91ae0ea12 100644 --- a/examples/dubbo/go-client/profiles/release/client.yml +++ b/examples/dubbo/go-client/profiles/release/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "product" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181" diff --git a/examples/dubbo/go-client/profiles/test/client.yml b/examples/dubbo/go-client/profiles/test/client.yml index 22ea30eb00..e7ada98290 100644 --- a/examples/dubbo/go-client/profiles/test/client.yml +++ b/examples/dubbo/go-client/profiles/test/client.yml @@ -13,6 +13,7 @@ connect_timeout : "100ms" selector : "cache" selector_ttl : "10m" registry : "zookeeper" +client_load_balance: "round_robin" # application config application_config: @@ -23,7 +24,7 @@ application_config: owner : "ZX" environment : "test" -registry_config: +zk_registry_config: timeout : "3s" address: - "127.0.0.1:2181" diff --git a/examples/jsonrpc/go-client/app/client.go b/examples/jsonrpc/go-client/app/client.go index 0b7bb05d90..28f4f267e0 100644 --- a/examples/jsonrpc/go-client/app/client.go +++ b/examples/jsonrpc/go-client/app/client.go @@ -20,6 +20,7 @@ import ( import ( "github.com/dubbo/dubbo-go/client/invoker" + "github.com/dubbo/dubbo-go/examples" "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/plugins" "github.com/dubbo/dubbo-go/public" @@ -33,32 +34,26 @@ var ( ) func main() { - var ( - err error - ) - err = initClientConfig() - if err != nil { - log.Error("initClientConfig() = error{%#v}", err) - return - } - initProfiling() - initClient() + clientConfig := examples.InitClientConfig() + + initProfiling(clientConfig) + initClient(clientConfig) time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test jsonrpc") - testJsonrpc("A003", "GetUser") + testJsonrpc(clientConfig, "A003", "GetUser") time.Sleep(3e9) gxlog.CInfo("\n\n\nstart to test jsonrpc illegal method") - testJsonrpc("A003", "GetUser1") + testJsonrpc(clientConfig, "A003", "GetUser1") initSignal() } -func initClient() { +func initClient(clientConfig *examples.ClientConfig) { var ( codecType public.CodecType ) @@ -80,13 +75,13 @@ func initClient() { } // consumer - clientConfig.requestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) + clientConfig.RequestTimeout, err = time.ParseDuration(clientConfig.Request_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Request_Timeout{%#v}) = error{%v}", clientConfig.Request_Timeout, err)) return } - clientConfig.connectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) + clientConfig.ConnectTimeout, err = time.ParseDuration(clientConfig.Connect_Timeout) if err != nil { panic(fmt.Sprintf("time.ParseDuration(Connect_Timeout{%#v}) = error{%v}", clientConfig.Connect_Timeout, err)) @@ -114,12 +109,13 @@ func initClient() { //init http client & init invoker clt := jsonrpc.NewHTTPClient( &jsonrpc.HTTPOptions{ - HandshakeTimeout: clientConfig.connectTimeout, - HTTPTimeout: clientConfig.requestTimeout, + HandshakeTimeout: clientConfig.ConnectTimeout, + HTTPTimeout: clientConfig.RequestTimeout, }, ) - clientInvoker = invoker.NewInvoker(clientRegistry, clt, + clientInvoker, err = invoker.NewInvoker(clientRegistry, + invoker.WithHttpClient(clt), invoker.WithLBSelector(configClientLB)) } @@ -128,7 +124,7 @@ func uninitClient() { log.Close() } -func initProfiling() { +func initProfiling(clientConfig *examples.ClientConfig) { if !clientConfig.Pprof_Enabled { return } diff --git a/examples/jsonrpc/go-client/app/test.go b/examples/jsonrpc/go-client/app/test.go index d0018884d3..fb3d008b04 100644 --- a/examples/jsonrpc/go-client/app/test.go +++ b/examples/jsonrpc/go-client/app/test.go @@ -3,7 +3,6 @@ package main import ( "context" "fmt" - _ "net/http/pprof" ) @@ -12,12 +11,13 @@ import ( ) import ( + "github.com/dubbo/dubbo-go/examples" "github.com/dubbo/dubbo-go/jsonrpc" "github.com/dubbo/dubbo-go/public" "github.com/dubbo/dubbo-go/service" ) -func testJsonrpc(userKey string, method string) { +func testJsonrpc(clientConfig *examples.ClientConfig, userKey string, method string) { var ( err error svc string @@ -49,7 +49,7 @@ func testJsonrpc(userKey string, method string) { Service: clientConfig.Service_List[serviceIdx].Service, } // Attention the last parameter : []UserKey{userKey} - req = clientInvoker.Client.NewRequest(conf, method, []string{userKey}) + req = clientInvoker.HttpClient.NewRequest(conf, method, []string{userKey}) ctx = context.WithValue(context.Background(), public.DUBBOGO_CTX_KEY, map[string]string{ "X-Proxy-Id": "dubbogo", @@ -59,7 +59,7 @@ func testJsonrpc(userKey string, method string) { user = new(JsonRPCUser) - err = clientInvoker.Call(ctx, 1, &conf, req, user) + err = clientInvoker.HttpCall(ctx, 1, &conf, req, user) if err != nil { panic(err) } else { diff --git a/plugins/plugins.go b/plugins/plugins.go index dae9131482..740a1e9fa4 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -6,11 +6,11 @@ import ( "github.com/dubbo/dubbo-go/registry/zookeeper" ) -var PluggableRegistries = map[string]func(...registry.RegistryOptions) (registry.Registry,error){ - "zookeeper":zookeeper.NewZkRegistry, +var PluggableRegistries = map[string]func(...registry.RegistryOption) (registry.Registry, error){ + "zookeeper": zookeeper.NewZkRegistry, } -var PluggableLoadbalance = map[string]func()loadBalance.Selector{ - "round_robin":loadBalance.NewRoundRobinSelector, - "random":loadBalance.NewRandomSelector, +var PluggableLoadbalance = map[string]func() loadBalance.Selector{ + "round_robin": loadBalance.NewRoundRobinSelector, + "random": loadBalance.NewRandomSelector, }