Skip to content

Commit

Permalink
support multi-gateway client (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
zl03jsj authored Sep 8, 2021
1 parent 2f8400c commit b5754d8
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 9 deletions.
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type MessageStateConfig struct {
type GatewayConfig struct {
RemoteEnable bool `toml:"remoteEnable"`
Token string `toml:"token"`
Url string `toml:"url"`
Url []string `toml:"url"`
Cfg gatewayTypes.Config `toml:"cfg"`
}

Expand Down Expand Up @@ -133,7 +133,7 @@ func DefaultConfig() *Config {
Gateway: GatewayConfig{
RemoteEnable: true,
Token: "",
Url: "/ip4/127.0.0.1/tcp/45132",
Url: []string{"/ip4/127.0.0.1/tcp/45132"},
Cfg: gatewayTypes.Config{
RequestQueueSize: 30,
RequestTimeout: time.Minute * 5,
Expand Down
159 changes: 155 additions & 4 deletions gateway/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package gateway

import (
"context"
"fmt"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/venus-messager/config"
"github.com/filecoin-project/venus-messager/log"
"github.com/filecoin-project/venus-wallet/core"
"github.com/ipfs-force-community/venus-common-utils/apiinfo"
"github.com/ipfs-force-community/venus-gateway/walletevent"

"github.com/filecoin-project/venus-messager/config"
"golang.org/x/xerrors"
"sync"
)

type IWalletClient interface {
Expand All @@ -33,6 +36,125 @@ type WalletClient struct {
}
}

type WalletProxy struct {
clients map[string]*WalletClient
logger *log.Logger

mutx sync.RWMutex
avaliabeClientCache map[cacheKey]*WalletClient
}

type cacheKey string

func newCacheKey(account string, addr address.Address) cacheKey {
return cacheKey("walletClientCache:" + account + addr.String())
}

func (w *WalletProxy) putCache(account string, addr address.Address, client *WalletClient) {
w.mutx.Lock()
defer w.mutx.Unlock()
w.avaliabeClientCache[newCacheKey(account, addr)] = client
}

func (w *WalletProxy) delCache(account string, addr address.Address) bool {
key := newCacheKey(account, addr)
w.mutx.Lock()
defer w.mutx.Unlock()
_, exist := w.avaliabeClientCache[key]
if exist {
delete(w.avaliabeClientCache, key)
}
return exist
}

func (w *WalletProxy) getCachedClient(account string, addr address.Address) *WalletClient {
key := newCacheKey(account, addr)
w.mutx.RLock()
defer w.mutx.RUnlock()
return w.avaliabeClientCache[key]
}

// todo: think about 'fastSelectAvaClient' was called parallelly,
// input the same params('account', 'address')
func (w *WalletProxy) fastSelectAvaClient(ctx context.Context, account string, addr address.Address) (*WalletClient, error) {
var g = &sync.WaitGroup{}
var ch = make(chan *WalletClient, 1)
for url, c := range w.clients {
g.Add(1)
go func(url string, c *WalletClient) {
has, err := c.WalletHas(ctx, account, addr)
if err != nil {
w.logger.Errorf("fastSelectAvaClient, call %s:'WalletHas' failed:%s", url, err)
}
if has {
ch <- c
}
g.Done()
}(url, c)
}

go func() {
g.Wait()
close(ch)
}()

c, isok := <-ch
if !isok || c == nil {
return nil, fmt.Errorf("can't find a wallet with(account:%s wallet:%s)", account, addr.String())
}

w.putCache(account, addr, c)
return c, nil
}

func (w *WalletProxy) WalletHas(ctx context.Context, supportAccount string, addr address.Address) (bool, error) {
c := w.getCachedClient(supportAccount, addr)
if c != nil {
return true, nil
}
c, err := w.fastSelectAvaClient(ctx, supportAccount, addr)
return c != nil, err
}

func (w *WalletProxy) WalletSign(ctx context.Context, account string,
addr address.Address, toSign []byte, meta core.MsgMeta) (*crypto.Signature, error) {
var err error
var useCachedClient bool

c := w.getCachedClient(account, addr)

if c == nil {
if c, err = w.fastSelectAvaClient(ctx, account, addr); err != nil {
return nil, err
}
} else {
useCachedClient = true
}

var s *crypto.Signature
if s, err = c.WalletSign(ctx, account, addr, toSign, meta); err != nil {
if useCachedClient {

w.logger.Warnf("sign with cached client failed:%s, will re-SelectAvaliableClient, and retry",
err.Error())

w.delCache(account, addr)

if c, err = w.fastSelectAvaClient(ctx, account, addr); err != nil {
return nil, err
}

s, err = c.WalletSign(ctx, account, addr, toSign, meta)
}
}

return s, err
}

func (w *WalletProxy) ListWalletInfo(ctx context.Context) ([]*walletevent.WalletDetail, error) {
return nil, fmt.Errorf("to implement")
}

func (w *WalletClient) WalletHas(ctx context.Context, supportAccount string, addr address.Address) (bool, error) {
return w.Internal.WalletHas(ctx, supportAccount, addr)
}
Expand All @@ -45,8 +167,37 @@ func (w *WalletClient) ListWalletInfo(ctx context.Context) ([]*walletevent.Walle
return w.Internal.ListWalletInfo(ctx)
}

func NewWalletClient(cfg *config.GatewayConfig) (IWalletClient, jsonrpc.ClientCloser, error) {
return newWalletClient(context.Background(), cfg.Token, cfg.Url)
func NewWalletClient(cfg *config.GatewayConfig, logger *log.Logger) (*WalletProxy, jsonrpc.ClientCloser, error) {
var proxy = &WalletProxy{
clients: make(map[string]*WalletClient),
avaliabeClientCache: make(map[cacheKey]*WalletClient),
logger: logger}
var ctx = context.Background()
var closer jsonrpc.ClientCloser

for _, url := range cfg.Url {
c, cls, err := newWalletClient(ctx, cfg.Token, url)

if err != nil {
return nil, nil, xerrors.Errorf("create wallet client with url:%s failed:%w", url, err)
}

proxy.clients[url] = c

if closer == nil {
closer = cls
} else {
closer = func() {
closer()
cls()
}
}
}

if len(proxy.clients) == 0 {
return nil, nil, fmt.Errorf("can't create any gateway client, please check 'GatewayConfig'")
}
return proxy, closer, nil
}

func newWalletClient(ctx context.Context, token, url string) (*WalletClient, jsonrpc.ClientCloser, error) {
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func runAction(ctx *cli.Context) error {
walletClient = &gateway.IWalletCli{IWalletClient: gatewayService}
gatewayProvider = fx.Options(fx.Supply(gatewayService))
} else {
walletCli, walletCliCloser, err := gateway.NewWalletClient(&cfg.Gateway)
walletCli, walletCliCloser, err := gateway.NewWalletClient(&cfg.Gateway, log)
walletClient = &gateway.IWalletCli{IWalletClient: walletCli}
if err != nil {
return err
Expand Down Expand Up @@ -269,7 +269,7 @@ func updateFlag(cfg *config.Config, ctx *cli.Context) error {

if ctx.IsSet("gateway-url") {
cfg.Gateway.RemoteEnable = true
cfg.Gateway.Url = ctx.String("gateway-url")
cfg.Gateway.Url = ctx.StringSlice("gateway-url")
}

if ctx.IsSet("auth-token") {
Expand Down
2 changes: 1 addition & 1 deletion messager.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
[gateway]
remoteEnable = true
token = ""
url = "/ip4/127.0.0.1/tcp/45132"
url = ["/ip4/127.0.0.1/tcp/45132"]

[gateway.cfg]
RequestQueueSize = 30
Expand Down

0 comments on commit b5754d8

Please sign in to comment.