Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the dubbo example #18

Merged
merged 4 commits into from
Mar 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions client/client_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package client

//
//type Transport interface {
// Call(ctx context.Context, url *service.ServiceURL, request Request, resp interface{}) error
// NewRequest(conf service.ServiceConfig, method string, args interface{}) Request
//}
//
////////////////////////////////////////////////
//// Request
////////////////////////////////////////////////
//
//type Request struct {
// ID int64
// Group string
// Protocol string
// Version string
// Service string
// Method string
// Args interface{}
// ContentType string
//}
//
//func (r *Request) ServiceConfig() service.ServiceConfigIf {
// return &service.ServiceConfig{
// Protocol: r.Protocol,
// Service: r.Service,
// Group: r.Group,
// Version: r.Version,
// }
//}
52 changes: 45 additions & 7 deletions client/invoker/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package invoker

import (
"context"
"github.com/dubbo/dubbo-go/dubbo"
"sync"
"time"
)
Expand All @@ -21,6 +22,9 @@ import (
type Options struct {
ServiceTTL time.Duration
selector loadBalance.Selector
//TODO:we should provider a transport client interface
HttpClient *jsonrpc.HTTPClient
DubboClient *dubbo.Client
}
type Option func(*Options)

Expand All @@ -29,6 +33,18 @@ func WithServiceTTL(ttl time.Duration) Option {
o.ServiceTTL = ttl
}
}

func WithHttpClient(client *jsonrpc.HTTPClient) Option {
return func(o *Options) {
o.HttpClient = client
}
}
func WithDubboClient(client *dubbo.Client) Option {
return func(o *Options) {
o.DubboClient = client
}
}

func WithLBSelector(selector loadBalance.Selector) Option {
return func(o *Options) {
o.selector = selector
Expand All @@ -37,14 +53,12 @@ func WithLBSelector(selector loadBalance.Selector) Option {

type Invoker struct {
Options
//TODO:we should provider a transport client interface
Client *jsonrpc.HTTPClient
cacheServiceMap map[string]*ServiceArray
registry registry.Registry
listenerLock sync.Mutex
}

func NewInvoker(registry registry.Registry, client *jsonrpc.HTTPClient, opts ...Option) *Invoker {
func NewInvoker(registry registry.Registry, opts ...Option) (*Invoker, error) {
options := Options{
//default 300s
ServiceTTL: time.Duration(300e9),
Expand All @@ -53,14 +67,16 @@ func NewInvoker(registry registry.Registry, client *jsonrpc.HTTPClient, opts ...
for _, opt := range opts {
opt(&options)
}
if options.HttpClient == nil && options.DubboClient == nil {
return nil, jerrors.New("Must specify the transport client!")
}
invoker := &Invoker{
Options: options,
Client: client,
cacheServiceMap: make(map[string]*ServiceArray),
registry: registry,
}
invoker.Listen()
return invoker
return invoker, nil
}

func (ivk *Invoker) Listen() {
Expand Down Expand Up @@ -144,7 +160,7 @@ func (ivk *Invoker) getService(serviceConf *service.ServiceConfig) (*ServiceArra
return newSvcArr, nil
}

func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error {
func (ivk *Invoker) HttpCall(ctx context.Context, reqId int64, serviceConf *service.ServiceConfig, req jsonrpc.Request, resp interface{}) error {

serviceArray, err := ivk.getService(serviceConf)
if err != nil {
Expand All @@ -157,10 +173,32 @@ func (ivk *Invoker) Call(ctx context.Context, reqId int64, serviceConf *service.
if err != nil {
return err
}
if err = ivk.Client.Call(ctx, url, req, resp); err != nil {
if err = ivk.HttpClient.Call(ctx, url, req, resp); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", resp)
return nil
}

func (ivk *Invoker) DubboCall(reqId int64, serviceConf *service.ServiceConfig, method string, args, reply interface{}, opts ...dubbo.CallOption) error {

serviceArray, err := ivk.getService(serviceConf)
if err != nil {
return err
}
if len(serviceArray.arr) == 0 {
return jerrors.New("cannot find svc " + serviceConf.String())
}
url, err := ivk.selector.Select(reqId, serviceArray)
if err != nil {
return err
}
//TODO:这里要改一下call方法改为接收指针类型
if err = ivk.DubboClient.Call(url.Ip+":"+url.Port, *url, method, args, reply, opts...); err != nil {
log.Error("client.Call() return error:%+v", jerrors.ErrorStack(err))
return err
}
log.Info("response result:%s", reply)
return nil
}
4 changes: 1 addition & 3 deletions client/loadBalance/load_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Selector interface {
Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error)
Select(ID int64, array client.ServiceArrayIf) (*service.ServiceURL, error)
}

//////////////////////////////////////////
Expand Down Expand Up @@ -37,5 +37,3 @@ type Selector interface {
//
// return ""
//}


10 changes: 5 additions & 5 deletions dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

import (
"github.com/dubbo/dubbo-go/public"
"github.com/dubbo/dubbo-go/registry"
svc "github.com/dubbo/dubbo-go/service"
)

var (
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewClient(conf *ClientConfig) (*Client, error) {
}

// call one way
func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method string, args interface{}, opts ...CallOption) error {
func (c *Client) CallOneway(addr string, svcUrl svc.ServiceURL, method string, args interface{}, opts ...CallOption) error {
var copts CallOptions

for _, o := range opts {
Expand All @@ -115,7 +115,7 @@ func (c *Client) CallOneway(addr string, svcUrl registry.ServiceURL, method stri
}

// if @reply is nil, the transport layer will get the response without notify the invoker.
func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error {
func (c *Client) Call(addr string, svcUrl svc.ServiceURL, method string, args, reply interface{}, opts ...CallOption) error {
var copts CallOptions

for _, o := range opts {
Expand All @@ -130,7 +130,7 @@ func (c *Client) Call(addr string, svcUrl registry.ServiceURL, method string, ar
return jerrors.Trace(c.call(ct, addr, svcUrl, method, args, reply, nil, copts))
}

func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method string, args interface{},
func (c *Client) AsyncCall(addr string, svcUrl svc.ServiceURL, method string, args interface{},
callback AsyncCallback, reply interface{}, opts ...CallOption) error {

var copts CallOptions
Expand All @@ -141,7 +141,7 @@ func (c *Client) AsyncCall(addr string, svcUrl registry.ServiceURL, method strin
return jerrors.Trace(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}

func (c *Client) call(ct CallType, addr string, svcUrl registry.ServiceURL, method string,
func (c *Client) call(ct CallType, addr string, svcUrl svc.ServiceURL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {

if opts.RequestTimeout == 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package examples

import (
"fmt"
Expand All @@ -17,20 +17,16 @@ import (
)

import (
"github.com/dubbo/dubbo-go/registry"
"github.com/dubbo/dubbo-go/registry/zookeeper"
"github.com/dubbo/dubbo-go/service"
"github.com/dubbo/dubbo-go/registry"
)

const (
APP_CONF_FILE = "APP_CONF_FILE"
APP_LOG_CONF_FILE = "APP_LOG_CONF_FILE"
)

var (
clientConfig *ClientConfig
)

type (
// Client holds supported types by the multiconfig package
ClientConfig struct {
Expand All @@ -40,28 +36,30 @@ type (

// client
Connect_Timeout string `default:"100ms" yaml:"connect_timeout" json:"connect_timeout,omitempty"`
connectTimeout time.Duration
ConnectTimeout time.Duration

Request_Timeout string `yaml:"request_timeout" default:"5s" json:"request_timeout,omitempty"` // 500ms, 1m
requestTimeout time.Duration
RequestTimeout time.Duration

// codec & selector & transport & registry
Selector string `default:"cache" yaml:"selector" json:"selector,omitempty"`
Selector_TTL string `default:"10m" yaml:"selector_ttl" json:"selector_ttl,omitempty"`
//client load balance algorithm
ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"`
Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"`
ClientLoadBalance string `default:"round_robin" yaml:"client_load_balance" json:"client_load_balance,omitempty"`
Registry string `default:"zookeeper" yaml:"registry" json:"registry,omitempty"`
// application
Application_Config registry.ApplicationConfig `yaml:"application_config" json:"application_config,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
ZkRegistryConfig zookeeper.ZkRegistryConfig `yaml:"zk_registry_config" json:"zk_registry_config,omitempty"`
// 一个客户端只允许使用一个service的其中一个group和其中一个version
Service_List []service.ServiceConfig `yaml:"service_list" json:"service_list,omitempty"`
}
)

func initClientConfig() error {
func InitClientConfig() *ClientConfig {

var (
confFile string
clientConfig *ClientConfig
confFile string
)

// configure
Expand Down Expand Up @@ -105,5 +103,5 @@ func initClientConfig() error {
}
log.LoadConfiguration(confFile)

return nil
return clientConfig
}
Loading