Skip to content

Commit

Permalink
add reset address command
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Jun 7, 2021
1 parent dd17f5f commit 07e17b2
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 42 deletions.
6 changes: 6 additions & 0 deletions api/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type IMessager interface {
ActiveAddress(ctx context.Context, addr address.Address) (address.Address, error) //perm:admin
SetSelectMsgNum(ctx context.Context, addr address.Address, num uint64) (address.Address, error) //perm:admin
SetFeeParams(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) //perm:admin
ResetAddress(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) //perm:admin

GetSharedParams(ctx context.Context) (*types.SharedParams, error) //perm:admin
SetSharedParams(ctx context.Context, params *types.SharedParams) (struct{}, error) //perm:admin
Expand Down Expand Up @@ -99,6 +100,7 @@ type Message struct {
ActiveAddress func(ctx context.Context, addr address.Address) (address.Address, error)
SetSelectMsgNum func(ctx context.Context, addr address.Address, num uint64) (address.Address, error)
SetFeeParams func(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error)
ResetAddress func(ctx context.Context, addr address.Address, nonce uint64) (uint64, error)

GetSharedParams func(context.Context) (*types.SharedParams, error)
SetSharedParams func(context.Context, *types.SharedParams) (struct{}, error)
Expand Down Expand Up @@ -238,6 +240,10 @@ func (message *Message) SetSelectMsgNum(ctx context.Context, addr address.Addres
return message.Internal.SetSelectMsgNum(ctx, addr, num)
}

func (message *Message) ResetAddress(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) {
return message.Internal.ResetAddress(ctx, addr, nonce)
}

func (message *Message) SetFeeParams(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) {
return message.Internal.SetFeeParams(ctx, addr, gasOverEstimation, maxFee, maxFeeCap)
}
Expand Down
59 changes: 30 additions & 29 deletions api/controller/auth_map.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,47 @@
package controller

var AuthMap = map[string]string{
"DeleteNode": "admin",
"PushMessage": "write",
"GetMessageBySignedCid": "read",
"ListBlockedMessage": "admin",
"HasAddress": "read",
"UpdateNonce": "admin",
"GetNode": "admin",
"ListMessage": "admin",
"HasNode": "admin",
"ResponseEvent": "write",
"HasMessageByUid": "read",
"GetMessageByUid": "read",
"ListFailedMessage": "admin",
"ReplaceMessage": "admin",
"RepublishMessage": "admin",
"GetSharedParams": "admin",
"ForbiddenAddress": "admin",
"SetSharedParams": "admin",
"SupportNewAccount": "write",
"DeleteNode": "admin",
"WaitMessage": "read",
"PushMessageWithId": "write",
"UpdateFilledMessageByID": "admin",
"SaveAddress": "admin",
"RefreshSharedParams": "admin",
"GetMessageByUid": "read",
"GetMessageByFromAndNonce": "read",
"WalletHas": "read",
"UpdateNonce": "admin",
"ListNode": "admin",
"GetMessageByUnsignedCid": "read",
"SaveAddress": "admin",
"GetMessageByCid": "read",
"GetNode": "admin",
"ResponseEvent": "write",
"HasMessageByUid": "read",
"ListMessageByFromState": "admin",
"UpdateMessageStateByID": "admin",
"GetAddress": "admin",
"ForbiddenAddress": "admin",
"ResetAddress": "admin",
"ListMessage": "admin",
"ListMessageByAddress": "admin",
"GetSharedParams": "admin",
"HasNode": "admin",
"DeleteAddress": "admin",
"SetSelectMsgNum": "admin",
"GetMessageByFromAndNonce": "read",
"PushMessageWithId": "write",
"GetMessageBySignedCid": "read",
"ListBlockedMessage": "admin",
"UpdateAllFilledMessage": "admin",
"UpdateFilledMessageByID": "admin",
"MarkBadMessage": "admin",
"ActiveAddress": "admin",
"ReplaceMessage": "admin",
"SaveNode": "admin",
"SetFeeParams": "admin",
"SupportNewAccount": "write",
"ListenWalletEvent": "write",
"GetMessageByUnsignedCid": "read",
"ListMessageByAddress": "admin",
"UpdateMessageStateByID": "admin",
"ListFailedMessage": "admin",
"HasAddress": "read",
"ListAddress": "admin",
"DeleteAddress": "admin",
"SetFeeParams": "admin",
"WalletHas": "read",
"ActiveAddress": "admin",
"RefreshSharedParams": "admin",
"SaveNode": "admin",
}
46 changes: 44 additions & 2 deletions cli/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ var AddrCmds = &cli.Command{
listAddrCmd,
deleteAddrCmd,
//updateNonceCmd,
forbiddenAddrCmd,
activeAddrCmd,
//forbiddenAddrCmd,
//activeAddrCmd,
setAddrSelMsgNumCmd,
setFeeParamsCmd,
resetAddrCmd,
},
}

Expand Down Expand Up @@ -145,6 +146,7 @@ var deleteAddrCmd = &cli.Command{
},
}

// nolint
var forbiddenAddrCmd = &cli.Command{
Name: "forbidden",
Usage: "forbidden address",
Expand Down Expand Up @@ -182,6 +184,7 @@ var forbiddenAddrCmd = &cli.Command{
},
}

// nolint
var activeAddrCmd = &cli.Command{
Name: "active",
Usage: "activate a frozen address",
Expand Down Expand Up @@ -290,3 +293,42 @@ var setFeeParamsCmd = &cli.Command{
return err
},
}

var resetAddrCmd = &cli.Command{
Name: "reset",
Usage: "reset address nonce",
ArgsUsage: "address",
Flags: []cli.Flag{
ReallyDoItFlag,
&cli.Uint64Flag{
Name: "nonce",
Usage: "The nonce you want to set",
},
},
Action: func(ctx *cli.Context) error {
client, closer, err := getAPI(ctx)
if err != nil {
return err
}
defer closer()

if !ctx.Bool("really-do-it") {
return xerrors.New("confirm to exec this command, specify --really-do-it")
}
if !ctx.Args().Present() {
return xerrors.Errorf("must pass address")
}

addr, err := address.NewFromString(ctx.Args().First())
if err != nil {
return err
}
currentNonce, err := client.ResetAddress(ctx.Context, addr, ctx.Uint64("nonce"))
if err != nil {
return err
}
fmt.Printf("address %s current nonce %d \n", addr.String(), currentNonce)

return nil
},
}
10 changes: 6 additions & 4 deletions cli/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"github.com/filecoin-project/venus-messager/types"
)

var ReallyDoItFlag = &cli.BoolFlag{
Name: "really-do-it",
Usage: "specify this flag to confirm mark-bad",
}

var MsgCmds = &cli.Command{
Name: "msg",
Usage: "message commands",
Expand Down Expand Up @@ -545,10 +550,7 @@ var markBadCmd = &cli.Command{
Name: "mark-bad",
Usage: "mark bad message",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "really-do-it",
Usage: "specify this flag to confirm mark-bad",
},
ReallyDoItFlag,
&cli.StringFlag{
Name: "from",
Usage: "mark unfill message as bad message if specify this flag",
Expand Down
8 changes: 8 additions & 0 deletions models/mysql/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,14 @@ func (m *mysqlMessageRepo) GetMessageByFromAndNonce(from address.Address, nonce
return msg.Message(), nil
}

func (m *mysqlMessageRepo) GetMessageByFromNonceAndState(from address.Address, nonce uint64, state types.MessageState) (*types.Message, error) {
var msg mysqlMessage
if err := m.DB.Where("from_addr = ? and nonce = ? and state = ?", from.String(), nonce, state).Take(&msg).Error; err != nil {
return nil, err
}
return msg.Message(), nil
}

func (m *mysqlMessageRepo) ListMessage() ([]*types.Message, error) {
var sqlMsgs []*mysqlMessage
if err := m.DB.Find(&sqlMsgs).Error; err != nil {
Expand Down
1 change: 1 addition & 0 deletions models/repo/message_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type MessageRepo interface {
SaveMessage(msg *types.Message) error

GetMessageByFromAndNonce(from address.Address, nonce uint64) (*types.Message, error)
GetMessageByFromNonceAndState(from address.Address, nonce uint64, state types.MessageState) (*types.Message, error)
GetMessageByUid(id string) (*types.Message, error)
HasMessageByUid(id string) (bool, error)
GetMessageState(id string) (types.MessageState, error)
Expand Down
8 changes: 8 additions & 0 deletions models/sqlite/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,14 @@ func (m *sqliteMessageRepo) GetMessageByFromAndNonce(from address.Address, nonce
return msg.Message(), nil
}

func (m *sqliteMessageRepo) GetMessageByFromNonceAndState(from address.Address, nonce uint64, state types.MessageState) (*types.Message, error) {
var msg sqliteMessage
if err := m.DB.Where("from_addr = ? and nonce = ? and state = ?", from.String(), nonce, state).Take(&msg).Error; err != nil {
return nil, err
}
return msg.Message(), nil
}

func (m *sqliteMessageRepo) ListMessage() ([]*types.Message, error) {
var sqlMsgs []*sqliteMessage
if err := m.DB.Find(&sqlMsgs).Error; err != nil {
Expand Down
98 changes: 97 additions & 1 deletion service/address_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package service
import (
"context"

"gorm.io/gorm"

"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/go-address"
Expand All @@ -22,16 +24,29 @@ type AddressService struct {
log *logrus.Logger

sps *SharedParamsService
nodeClient *NodeClient
walletClient *gateway.IWalletCli

resetAddressFunc chan func() (uint64, error)
resetAddressRes chan resetAddressResult
isResetAddress bool
}

func NewAddressService(repo repo.Repo, logger *logrus.Logger, sps *SharedParamsService, walletClient *gateway.IWalletCli) *AddressService {
func NewAddressService(repo repo.Repo,
logger *logrus.Logger,
sps *SharedParamsService,
walletClient *gateway.IWalletCli,
nodeClient *NodeClient) *AddressService {
addressService := &AddressService{
repo: repo,
log: logger,

sps: sps,
nodeClient: nodeClient,
walletClient: walletClient,

resetAddressFunc: make(chan func() (uint64, error)),
resetAddressRes: make(chan resetAddressResult),
}

return addressService
Expand Down Expand Up @@ -136,6 +151,87 @@ func (addressService *AddressService) SetFeeParams(ctx context.Context, addr add
return addr, addressService.repo.AddressRepo().UpdateFeeParams(ctx, addr, gasOverEstimation, maxFee, maxFeeCap)
}

type resetAddressResult struct {
latestNonce uint64
err error
}

func (addressService *AddressService) resetAddress(ctx context.Context, addr address.Address, targetNonce uint64) (uint64, error) {
addrInfo, err := addressService.GetAddress(ctx, addr)
if err != nil {
return 0, err
}
actor, err := addressService.nodeClient.StateGetActor(ctx, addr, venusTypes.EmptyTSK)
if err != nil {
return 0, err
}

if targetNonce != 0 {
if targetNonce < actor.Nonce {
return 0, xerrors.Errorf("target nonce(%d) smaller than chain nonce(%d)", targetNonce, actor.Nonce)
}
} else {
targetNonce = actor.Nonce
}
addressService.log.Infof("reset address target nonce %d, chain nonce %d", targetNonce, actor.Nonce)

latestNonce := addrInfo.Nonce
if err := addressService.repo.Transaction(func(txRepo repo.TxRepo) error {
for nonce := addrInfo.Nonce - 1; nonce >= targetNonce; nonce-- {
msg, err := txRepo.MessageRepo().GetMessageByFromNonceAndState(addr, nonce, types.FillMsg)
if err != nil {
if xerrors.Is(err, gorm.ErrRecordNotFound) {
continue
}
return xerrors.Errorf("found message by address(%s) and nonce(%d) failed %v", addr.String(), nonce, err)
}
if msg.State == types.FillMsg {
if _, err := txRepo.MessageRepo().MarkBadMessage(msg.ID); err != nil {
return xerrors.Errorf("mark bad message %s failed %v", msg.ID, err)
}
latestNonce = nonce
} else if msg.State == types.OnChainMsg {
break
}
}
if latestNonce < addrInfo.Nonce {
return txRepo.AddressRepo().UpdateNonce(ctx, addr, latestNonce)
}
return nil
}); err != nil {
return 0, err
}

return latestNonce, nil
}

func (addressService *AddressService) ResetAddress(ctx context.Context, addr address.Address, targetNonce uint64) (uint64, error) {
if addressService.isResetAddress {
return 0, xerrors.Errorf("resetting the address is already underway")
}
addressService.isResetAddress = true
defer func() {
addressService.isResetAddress = false
}()

addressService.resetAddressFunc <- func() (uint64, error) {
return addressService.resetAddress(ctx, addr, targetNonce)
}

for {
select {
case r, ok := <-addressService.resetAddressRes:
if !ok {
return 0, xerrors.Errorf("unexpect error")
}
addressService.log.Infof("reset address %s success, current nonce %d ", addr.String(), r.latestNonce)
return r.latestNonce, r.err
case <-ctx.Done():
return 0, ctx.Err()
}
}
}

func (addressService *AddressService) Addresses() map[address.Address]struct{} {
addrs := make(map[address.Address]struct{})
addrList, err := addressService.ListAddress(context.Background())
Expand Down
21 changes: 15 additions & 6 deletions service/message_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,13 +761,22 @@ func (ms *MessageService) StartPushMessage(ctx context.Context) {
// ms.log.Errorf("push message error %v", err)
//}
case newHead := <-ms.triggerPush:
start := time.Now()
ms.log.Infof("start to push message %s task wait task %d", newHead.String(), len(ms.triggerPush))
err := ms.pushMessageToPool(ctx, newHead)
if err != nil {
ms.log.Errorf("push message error %v", err)
select {
case f := <-ms.addressService.resetAddressFunc:
nonce, err := f()
ms.addressService.resetAddressRes <- resetAddressResult{
latestNonce: nonce,
err: err,
}
default:
start := time.Now()
ms.log.Infof("start to push message %s task wait task %d", newHead.String(), len(ms.triggerPush))
err := ms.pushMessageToPool(ctx, newHead)
if err != nil {
ms.log.Errorf("push message error %v", err)
}
ms.log.Infof("end push message spent %d ms", time.Since(start).Milliseconds())
}
ms.log.Infof("end push message spent %d ms", time.Since(start).Milliseconds())
}
}
}
Expand Down

0 comments on commit 07e17b2

Please sign in to comment.