From 07e17b2ad86e0040dd223345a5724df0889bbdca Mon Sep 17 00:00:00 2001 From: simlecode Date: Mon, 31 May 2021 17:45:48 +0800 Subject: [PATCH] add reset address command --- api/client/api.go | 6 +++ api/controller/auth_map.go | 59 +++++++++++----------- cli/address.go | 46 ++++++++++++++++- cli/msg.go | 10 ++-- models/mysql/message.go | 8 +++ models/repo/message_repo.go | 1 + models/sqlite/message.go | 8 +++ service/address_service.go | 98 ++++++++++++++++++++++++++++++++++++- service/message_service.go | 21 +++++--- 9 files changed, 215 insertions(+), 42 deletions(-) diff --git a/api/client/api.go b/api/client/api.go index cdc2a8fe..55e7d494 100644 --- a/api/client/api.go +++ b/api/client/api.go @@ -47,6 +47,7 @@ type IMessager interface { ActiveAddress(ctx context.Context, addr address.Address) (address.Address, error) //perm:admin SetSelectMsgNum(ctx context.Context, addr address.Address, num uint64) (address.Address, error) //perm:admin SetFeeParams(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) //perm:admin + ResetAddress(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) //perm:admin GetSharedParams(ctx context.Context) (*types.SharedParams, error) //perm:admin SetSharedParams(ctx context.Context, params *types.SharedParams) (struct{}, error) //perm:admin @@ -99,6 +100,7 @@ type Message struct { ActiveAddress func(ctx context.Context, addr address.Address) (address.Address, error) SetSelectMsgNum func(ctx context.Context, addr address.Address, num uint64) (address.Address, error) SetFeeParams func(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) + ResetAddress func(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) GetSharedParams func(context.Context) (*types.SharedParams, error) SetSharedParams func(context.Context, *types.SharedParams) (struct{}, error) @@ -238,6 +240,10 @@ func (message *Message) SetSelectMsgNum(ctx context.Context, addr address.Addres return message.Internal.SetSelectMsgNum(ctx, addr, num) } +func (message *Message) ResetAddress(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) { + return message.Internal.ResetAddress(ctx, addr, nonce) +} + func (message *Message) SetFeeParams(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) { return message.Internal.SetFeeParams(ctx, addr, gasOverEstimation, maxFee, maxFeeCap) } diff --git a/api/controller/auth_map.go b/api/controller/auth_map.go index 7a4c3503..b2b3d94d 100644 --- a/api/controller/auth_map.go +++ b/api/controller/auth_map.go @@ -1,46 +1,47 @@ package controller var AuthMap = map[string]string{ - "DeleteNode": "admin", "PushMessage": "write", - "GetMessageBySignedCid": "read", - "ListBlockedMessage": "admin", - "HasAddress": "read", - "UpdateNonce": "admin", - "GetNode": "admin", - "ListMessage": "admin", - "HasNode": "admin", - "ResponseEvent": "write", - "HasMessageByUid": "read", - "GetMessageByUid": "read", - "ListFailedMessage": "admin", + "ReplaceMessage": "admin", "RepublishMessage": "admin", - "GetSharedParams": "admin", + "ForbiddenAddress": "admin", "SetSharedParams": "admin", - "SupportNewAccount": "write", + "DeleteNode": "admin", "WaitMessage": "read", - "PushMessageWithId": "write", - "UpdateFilledMessageByID": "admin", - "SaveAddress": "admin", - "RefreshSharedParams": "admin", + "GetMessageByUid": "read", + "GetMessageByFromAndNonce": "read", + "WalletHas": "read", + "UpdateNonce": "admin", "ListNode": "admin", + "GetMessageByUnsignedCid": "read", + "SaveAddress": "admin", "GetMessageByCid": "read", + "GetNode": "admin", + "ResponseEvent": "write", + "HasMessageByUid": "read", "ListMessageByFromState": "admin", + "UpdateMessageStateByID": "admin", "GetAddress": "admin", - "ForbiddenAddress": "admin", + "ResetAddress": "admin", + "ListMessage": "admin", + "ListMessageByAddress": "admin", + "GetSharedParams": "admin", + "HasNode": "admin", + "DeleteAddress": "admin", "SetSelectMsgNum": "admin", - "GetMessageByFromAndNonce": "read", + "PushMessageWithId": "write", + "GetMessageBySignedCid": "read", + "ListBlockedMessage": "admin", "UpdateAllFilledMessage": "admin", + "UpdateFilledMessageByID": "admin", "MarkBadMessage": "admin", - "ActiveAddress": "admin", - "ReplaceMessage": "admin", - "SaveNode": "admin", + "SetFeeParams": "admin", + "SupportNewAccount": "write", "ListenWalletEvent": "write", - "GetMessageByUnsignedCid": "read", - "ListMessageByAddress": "admin", - "UpdateMessageStateByID": "admin", + "ListFailedMessage": "admin", + "HasAddress": "read", "ListAddress": "admin", - "DeleteAddress": "admin", - "SetFeeParams": "admin", - "WalletHas": "read", + "ActiveAddress": "admin", + "RefreshSharedParams": "admin", + "SaveNode": "admin", } diff --git a/cli/address.go b/cli/address.go index 72a67ac2..bd0199fc 100644 --- a/cli/address.go +++ b/cli/address.go @@ -17,10 +17,11 @@ var AddrCmds = &cli.Command{ listAddrCmd, deleteAddrCmd, //updateNonceCmd, - forbiddenAddrCmd, - activeAddrCmd, + //forbiddenAddrCmd, + //activeAddrCmd, setAddrSelMsgNumCmd, setFeeParamsCmd, + resetAddrCmd, }, } @@ -145,6 +146,7 @@ var deleteAddrCmd = &cli.Command{ }, } +// nolint var forbiddenAddrCmd = &cli.Command{ Name: "forbidden", Usage: "forbidden address", @@ -182,6 +184,7 @@ var forbiddenAddrCmd = &cli.Command{ }, } +// nolint var activeAddrCmd = &cli.Command{ Name: "active", Usage: "activate a frozen address", @@ -290,3 +293,42 @@ var setFeeParamsCmd = &cli.Command{ return err }, } + +var resetAddrCmd = &cli.Command{ + Name: "reset", + Usage: "reset address nonce", + ArgsUsage: "address", + Flags: []cli.Flag{ + ReallyDoItFlag, + &cli.Uint64Flag{ + Name: "nonce", + Usage: "The nonce you want to set", + }, + }, + Action: func(ctx *cli.Context) error { + client, closer, err := getAPI(ctx) + if err != nil { + return err + } + defer closer() + + if !ctx.Bool("really-do-it") { + return xerrors.New("confirm to exec this command, specify --really-do-it") + } + if !ctx.Args().Present() { + return xerrors.Errorf("must pass address") + } + + addr, err := address.NewFromString(ctx.Args().First()) + if err != nil { + return err + } + currentNonce, err := client.ResetAddress(ctx.Context, addr, ctx.Uint64("nonce")) + if err != nil { + return err + } + fmt.Printf("address %s current nonce %d \n", addr.String(), currentNonce) + + return nil + }, +} diff --git a/cli/msg.go b/cli/msg.go index 15c9853b..fd330460 100644 --- a/cli/msg.go +++ b/cli/msg.go @@ -21,6 +21,11 @@ import ( "github.com/filecoin-project/venus-messager/types" ) +var ReallyDoItFlag = &cli.BoolFlag{ + Name: "really-do-it", + Usage: "specify this flag to confirm mark-bad", +} + var MsgCmds = &cli.Command{ Name: "msg", Usage: "message commands", @@ -545,10 +550,7 @@ var markBadCmd = &cli.Command{ Name: "mark-bad", Usage: "mark bad message", Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "really-do-it", - Usage: "specify this flag to confirm mark-bad", - }, + ReallyDoItFlag, &cli.StringFlag{ Name: "from", Usage: "mark unfill message as bad message if specify this flag", diff --git a/models/mysql/message.go b/models/mysql/message.go index 249b9b53..6d3f34bf 100644 --- a/models/mysql/message.go +++ b/models/mysql/message.go @@ -394,6 +394,14 @@ func (m *mysqlMessageRepo) GetMessageByFromAndNonce(from address.Address, nonce return msg.Message(), nil } +func (m *mysqlMessageRepo) GetMessageByFromNonceAndState(from address.Address, nonce uint64, state types.MessageState) (*types.Message, error) { + var msg mysqlMessage + if err := m.DB.Where("from_addr = ? and nonce = ? and state = ?", from.String(), nonce, state).Take(&msg).Error; err != nil { + return nil, err + } + return msg.Message(), nil +} + func (m *mysqlMessageRepo) ListMessage() ([]*types.Message, error) { var sqlMsgs []*mysqlMessage if err := m.DB.Find(&sqlMsgs).Error; err != nil { diff --git a/models/repo/message_repo.go b/models/repo/message_repo.go index 6fb5ccbc..1be9e3ff 100644 --- a/models/repo/message_repo.go +++ b/models/repo/message_repo.go @@ -19,6 +19,7 @@ type MessageRepo interface { SaveMessage(msg *types.Message) error GetMessageByFromAndNonce(from address.Address, nonce uint64) (*types.Message, error) + GetMessageByFromNonceAndState(from address.Address, nonce uint64, state types.MessageState) (*types.Message, error) GetMessageByUid(id string) (*types.Message, error) HasMessageByUid(id string) (bool, error) GetMessageState(id string) (types.MessageState, error) diff --git a/models/sqlite/message.go b/models/sqlite/message.go index 45d3ea48..c6ee7ab8 100644 --- a/models/sqlite/message.go +++ b/models/sqlite/message.go @@ -371,6 +371,14 @@ func (m *sqliteMessageRepo) GetMessageByFromAndNonce(from address.Address, nonce return msg.Message(), nil } +func (m *sqliteMessageRepo) GetMessageByFromNonceAndState(from address.Address, nonce uint64, state types.MessageState) (*types.Message, error) { + var msg sqliteMessage + if err := m.DB.Where("from_addr = ? and nonce = ? and state = ?", from.String(), nonce, state).Take(&msg).Error; err != nil { + return nil, err + } + return msg.Message(), nil +} + func (m *sqliteMessageRepo) ListMessage() ([]*types.Message, error) { var sqlMsgs []*sqliteMessage if err := m.DB.Find(&sqlMsgs).Error; err != nil { diff --git a/service/address_service.go b/service/address_service.go index e47ec075..4e2628c5 100644 --- a/service/address_service.go +++ b/service/address_service.go @@ -3,6 +3,8 @@ package service import ( "context" + "gorm.io/gorm" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-address" @@ -22,16 +24,29 @@ type AddressService struct { log *logrus.Logger sps *SharedParamsService + nodeClient *NodeClient walletClient *gateway.IWalletCli + + resetAddressFunc chan func() (uint64, error) + resetAddressRes chan resetAddressResult + isResetAddress bool } -func NewAddressService(repo repo.Repo, logger *logrus.Logger, sps *SharedParamsService, walletClient *gateway.IWalletCli) *AddressService { +func NewAddressService(repo repo.Repo, + logger *logrus.Logger, + sps *SharedParamsService, + walletClient *gateway.IWalletCli, + nodeClient *NodeClient) *AddressService { addressService := &AddressService{ repo: repo, log: logger, sps: sps, + nodeClient: nodeClient, walletClient: walletClient, + + resetAddressFunc: make(chan func() (uint64, error)), + resetAddressRes: make(chan resetAddressResult), } return addressService @@ -136,6 +151,87 @@ func (addressService *AddressService) SetFeeParams(ctx context.Context, addr add return addr, addressService.repo.AddressRepo().UpdateFeeParams(ctx, addr, gasOverEstimation, maxFee, maxFeeCap) } +type resetAddressResult struct { + latestNonce uint64 + err error +} + +func (addressService *AddressService) resetAddress(ctx context.Context, addr address.Address, targetNonce uint64) (uint64, error) { + addrInfo, err := addressService.GetAddress(ctx, addr) + if err != nil { + return 0, err + } + actor, err := addressService.nodeClient.StateGetActor(ctx, addr, venusTypes.EmptyTSK) + if err != nil { + return 0, err + } + + if targetNonce != 0 { + if targetNonce < actor.Nonce { + return 0, xerrors.Errorf("target nonce(%d) smaller than chain nonce(%d)", targetNonce, actor.Nonce) + } + } else { + targetNonce = actor.Nonce + } + addressService.log.Infof("reset address target nonce %d, chain nonce %d", targetNonce, actor.Nonce) + + latestNonce := addrInfo.Nonce + if err := addressService.repo.Transaction(func(txRepo repo.TxRepo) error { + for nonce := addrInfo.Nonce - 1; nonce >= targetNonce; nonce-- { + msg, err := txRepo.MessageRepo().GetMessageByFromNonceAndState(addr, nonce, types.FillMsg) + if err != nil { + if xerrors.Is(err, gorm.ErrRecordNotFound) { + continue + } + return xerrors.Errorf("found message by address(%s) and nonce(%d) failed %v", addr.String(), nonce, err) + } + if msg.State == types.FillMsg { + if _, err := txRepo.MessageRepo().MarkBadMessage(msg.ID); err != nil { + return xerrors.Errorf("mark bad message %s failed %v", msg.ID, err) + } + latestNonce = nonce + } else if msg.State == types.OnChainMsg { + break + } + } + if latestNonce < addrInfo.Nonce { + return txRepo.AddressRepo().UpdateNonce(ctx, addr, latestNonce) + } + return nil + }); err != nil { + return 0, err + } + + return latestNonce, nil +} + +func (addressService *AddressService) ResetAddress(ctx context.Context, addr address.Address, targetNonce uint64) (uint64, error) { + if addressService.isResetAddress { + return 0, xerrors.Errorf("resetting the address is already underway") + } + addressService.isResetAddress = true + defer func() { + addressService.isResetAddress = false + }() + + addressService.resetAddressFunc <- func() (uint64, error) { + return addressService.resetAddress(ctx, addr, targetNonce) + } + + for { + select { + case r, ok := <-addressService.resetAddressRes: + if !ok { + return 0, xerrors.Errorf("unexpect error") + } + addressService.log.Infof("reset address %s success, current nonce %d ", addr.String(), r.latestNonce) + return r.latestNonce, r.err + case <-ctx.Done(): + return 0, ctx.Err() + } + } +} + func (addressService *AddressService) Addresses() map[address.Address]struct{} { addrs := make(map[address.Address]struct{}) addrList, err := addressService.ListAddress(context.Background()) diff --git a/service/message_service.go b/service/message_service.go index d9f63eeb..736821cb 100644 --- a/service/message_service.go +++ b/service/message_service.go @@ -761,13 +761,22 @@ func (ms *MessageService) StartPushMessage(ctx context.Context) { // ms.log.Errorf("push message error %v", err) //} case newHead := <-ms.triggerPush: - start := time.Now() - ms.log.Infof("start to push message %s task wait task %d", newHead.String(), len(ms.triggerPush)) - err := ms.pushMessageToPool(ctx, newHead) - if err != nil { - ms.log.Errorf("push message error %v", err) + select { + case f := <-ms.addressService.resetAddressFunc: + nonce, err := f() + ms.addressService.resetAddressRes <- resetAddressResult{ + latestNonce: nonce, + err: err, + } + default: + start := time.Now() + ms.log.Infof("start to push message %s task wait task %d", newHead.String(), len(ms.triggerPush)) + err := ms.pushMessageToPool(ctx, newHead) + if err != nil { + ms.log.Errorf("push message error %v", err) + } + ms.log.Infof("end push message spent %d ms", time.Since(start).Milliseconds()) } - ms.log.Infof("end push message spent %d ms", time.Since(start).Milliseconds()) } } }