diff --git a/config/config.go b/config/config.go index fbe73de9..be3c13b1 100644 --- a/config/config.go +++ b/config/config.go @@ -83,7 +83,7 @@ type MessageStateConfig struct { type GatewayConfig struct { RemoteEnable bool `toml:"remoteEnable"` Token string `toml:"token"` - Url string `toml:"url"` + Url []string `toml:"url"` Cfg gatewayTypes.Config `toml:"cfg"` } @@ -133,7 +133,7 @@ func DefaultConfig() *Config { Gateway: GatewayConfig{ RemoteEnable: true, Token: "", - Url: "/ip4/127.0.0.1/tcp/45132", + Url: []string{"/ip4/127.0.0.1/tcp/45132"}, Cfg: gatewayTypes.Config{ RequestQueueSize: 30, RequestTimeout: time.Minute * 5, diff --git a/gateway/gateway_client.go b/gateway/gateway_client.go index 30b4f901..e276cf31 100644 --- a/gateway/gateway_client.go +++ b/gateway/gateway_client.go @@ -2,14 +2,17 @@ package gateway import ( "context" + "fmt" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/venus-messager/config" + "github.com/filecoin-project/venus-messager/log" "github.com/filecoin-project/venus-wallet/core" "github.com/ipfs-force-community/venus-common-utils/apiinfo" "github.com/ipfs-force-community/venus-gateway/walletevent" - - "github.com/filecoin-project/venus-messager/config" + "golang.org/x/xerrors" + "sync" ) type IWalletClient interface { @@ -33,6 +36,125 @@ type WalletClient struct { } } +type WalletProxy struct { + clients map[string]*WalletClient + logger *log.Logger + + mutx sync.RWMutex + avaliabeClientCache map[cacheKey]*WalletClient +} + +type cacheKey string + +func newCacheKey(account string, addr address.Address) cacheKey { + return cacheKey("walletClientCache:" + account + addr.String()) +} + +func (w *WalletProxy) putCache(account string, addr address.Address, client *WalletClient) { + w.mutx.Lock() + defer w.mutx.Unlock() + w.avaliabeClientCache[newCacheKey(account, addr)] = client +} + +func (w *WalletProxy) delCache(account string, addr address.Address) bool { + key := newCacheKey(account, addr) + w.mutx.Lock() + defer w.mutx.Unlock() + _, exist := w.avaliabeClientCache[key] + if exist { + delete(w.avaliabeClientCache, key) + } + return exist +} + +func (w *WalletProxy) getCachedClient(account string, addr address.Address) *WalletClient { + key := newCacheKey(account, addr) + w.mutx.RLock() + defer w.mutx.RUnlock() + return w.avaliabeClientCache[key] +} + +// todo: think about 'fastSelectAvaClient' was called parallelly, +// input the same params('account', 'address') +func (w *WalletProxy) fastSelectAvaClient(ctx context.Context, account string, addr address.Address) (*WalletClient, error) { + var g = &sync.WaitGroup{} + var ch = make(chan *WalletClient, 1) + for url, c := range w.clients { + g.Add(1) + go func(url string, c *WalletClient) { + has, err := c.WalletHas(ctx, account, addr) + if err != nil { + w.logger.Errorf("fastSelectAvaClient, call %s:'WalletHas' failed:%s", url, err) + } + if has { + ch <- c + } + g.Done() + }(url, c) + } + + go func() { + g.Wait() + close(ch) + }() + + c, isok := <-ch + if !isok || c == nil { + return nil, fmt.Errorf("can't find a wallet with(account:%s wallet:%s)", account, addr.String()) + } + + w.putCache(account, addr, c) + return c, nil +} + +func (w *WalletProxy) WalletHas(ctx context.Context, supportAccount string, addr address.Address) (bool, error) { + c := w.getCachedClient(supportAccount, addr) + if c != nil { + return true, nil + } + c, err := w.fastSelectAvaClient(ctx, supportAccount, addr) + return c != nil, err +} + +func (w *WalletProxy) WalletSign(ctx context.Context, account string, + addr address.Address, toSign []byte, meta core.MsgMeta) (*crypto.Signature, error) { + var err error + var useCachedClient bool + + c := w.getCachedClient(account, addr) + + if c == nil { + if c, err = w.fastSelectAvaClient(ctx, account, addr); err != nil { + return nil, err + } + } else { + useCachedClient = true + } + + var s *crypto.Signature + if s, err = c.WalletSign(ctx, account, addr, toSign, meta); err != nil { + if useCachedClient { + + w.logger.Warnf("sign with cached client failed:%s, will re-SelectAvaliableClient, and retry", + err.Error()) + + w.delCache(account, addr) + + if c, err = w.fastSelectAvaClient(ctx, account, addr); err != nil { + return nil, err + } + + s, err = c.WalletSign(ctx, account, addr, toSign, meta) + } + } + + return s, err +} + +func (w *WalletProxy) ListWalletInfo(ctx context.Context) ([]*walletevent.WalletDetail, error) { + return nil, fmt.Errorf("to implement") +} + func (w *WalletClient) WalletHas(ctx context.Context, supportAccount string, addr address.Address) (bool, error) { return w.Internal.WalletHas(ctx, supportAccount, addr) } @@ -45,8 +167,37 @@ func (w *WalletClient) ListWalletInfo(ctx context.Context) ([]*walletevent.Walle return w.Internal.ListWalletInfo(ctx) } -func NewWalletClient(cfg *config.GatewayConfig) (IWalletClient, jsonrpc.ClientCloser, error) { - return newWalletClient(context.Background(), cfg.Token, cfg.Url) +func NewWalletClient(cfg *config.GatewayConfig, logger *log.Logger) (*WalletProxy, jsonrpc.ClientCloser, error) { + var proxy = &WalletProxy{ + clients: make(map[string]*WalletClient), + avaliabeClientCache: make(map[cacheKey]*WalletClient), + logger: logger} + var ctx = context.Background() + var closer jsonrpc.ClientCloser + + for _, url := range cfg.Url { + c, cls, err := newWalletClient(ctx, cfg.Token, url) + + if err != nil { + return nil, nil, xerrors.Errorf("create wallet client with url:%s failed:%w", url, err) + } + + proxy.clients[url] = c + + if closer == nil { + closer = cls + } else { + closer = func() { + closer() + cls() + } + } + } + + if len(proxy.clients) == 0 { + return nil, nil, fmt.Errorf("can't create any gateway client, please check 'GatewayConfig'") + } + return proxy, closer, nil } func newWalletClient(ctx context.Context, token, url string) (*WalletClient, jsonrpc.ClientCloser, error) { diff --git a/main.go b/main.go index 3ca5da35..fa396e53 100644 --- a/main.go +++ b/main.go @@ -187,7 +187,7 @@ func runAction(ctx *cli.Context) error { walletClient = &gateway.IWalletCli{IWalletClient: gatewayService} gatewayProvider = fx.Options(fx.Supply(gatewayService)) } else { - walletCli, walletCliCloser, err := gateway.NewWalletClient(&cfg.Gateway) + walletCli, walletCliCloser, err := gateway.NewWalletClient(&cfg.Gateway, log) walletClient = &gateway.IWalletCli{IWalletClient: walletCli} if err != nil { return err @@ -269,7 +269,7 @@ func updateFlag(cfg *config.Config, ctx *cli.Context) error { if ctx.IsSet("gateway-url") { cfg.Gateway.RemoteEnable = true - cfg.Gateway.Url = ctx.String("gateway-url") + cfg.Gateway.Url = ctx.StringSlice("gateway-url") } if ctx.IsSet("auth-token") { diff --git a/messager.toml b/messager.toml index 0f111134..53036a17 100644 --- a/messager.toml +++ b/messager.toml @@ -19,7 +19,7 @@ [gateway] remoteEnable = true token = "" - url = "/ip4/127.0.0.1/tcp/45132" + url = ["/ip4/127.0.0.1/tcp/45132"] [gateway.cfg] RequestQueueSize = 30