Skip to content

Commit

Permalink
rmove reset address cmd, add clear unfill message cmd (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode authored Aug 5, 2021
1 parent 51ff15a commit 39be72c
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 177 deletions.
30 changes: 15 additions & 15 deletions api/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type IMessager interface {
ActiveAddress(ctx context.Context, addr address.Address) (address.Address, error) //perm:admin
SetSelectMsgNum(ctx context.Context, addr address.Address, num uint64) (address.Address, error) //perm:admin
SetFeeParams(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) //perm:admin
ResetAddress(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) //perm:admin
ClearUnFillMessage(ctx context.Context, addr address.Address) (int, error) //perm:admin

GetSharedParams(ctx context.Context) (*types.SharedParams, error) //perm:admin
SetSharedParams(ctx context.Context, params *types.SharedParams) (struct{}, error) //perm:admin
Expand Down Expand Up @@ -93,18 +93,18 @@ type Message struct {
RepublishMessage func(ctx context.Context, id string) (struct{}, error)
MarkBadMessage func(ctx context.Context, id string) (struct{}, error)

SaveAddress func(ctx context.Context, address *types.Address) (types.UUID, error)
GetAddress func(ctx context.Context, addr address.Address) (*types.Address, error)
HasAddress func(ctx context.Context, addr address.Address) (bool, error)
WalletHas func(ctx context.Context, addr address.Address) (bool, error)
ListAddress func(ctx context.Context) ([]*types.Address, error)
UpdateNonce func(ctx context.Context, addr address.Address, nonce uint64) (address.Address, error)
DeleteAddress func(ctx context.Context, addr address.Address) (address.Address, error)
ForbiddenAddress func(ctx context.Context, addr address.Address) (address.Address, error)
ActiveAddress func(ctx context.Context, addr address.Address) (address.Address, error)
SetSelectMsgNum func(ctx context.Context, addr address.Address, num uint64) (address.Address, error)
SetFeeParams func(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error)
ResetAddress func(ctx context.Context, addr address.Address, nonce uint64) (uint64, error)
SaveAddress func(ctx context.Context, address *types.Address) (types.UUID, error)
GetAddress func(ctx context.Context, addr address.Address) (*types.Address, error)
HasAddress func(ctx context.Context, addr address.Address) (bool, error)
WalletHas func(ctx context.Context, addr address.Address) (bool, error)
ListAddress func(ctx context.Context) ([]*types.Address, error)
UpdateNonce func(ctx context.Context, addr address.Address, nonce uint64) (address.Address, error)
DeleteAddress func(ctx context.Context, addr address.Address) (address.Address, error)
ForbiddenAddress func(ctx context.Context, addr address.Address) (address.Address, error)
ActiveAddress func(ctx context.Context, addr address.Address) (address.Address, error)
SetSelectMsgNum func(ctx context.Context, addr address.Address, num uint64) (address.Address, error)
SetFeeParams func(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error)
ClearUnFillMessage func(ctx context.Context, addr address.Address) (int, error)

GetSharedParams func(context.Context) (*types.SharedParams, error)
SetSharedParams func(context.Context, *types.SharedParams) (struct{}, error)
Expand Down Expand Up @@ -248,8 +248,8 @@ func (message *Message) SetSelectMsgNum(ctx context.Context, addr address.Addres
return message.Internal.SetSelectMsgNum(ctx, addr, num)
}

func (message *Message) ResetAddress(ctx context.Context, addr address.Address, nonce uint64) (uint64, error) {
return message.Internal.ResetAddress(ctx, addr, nonce)
func (message *Message) ClearUnFillMessage(ctx context.Context, addr address.Address) (int, error) {
return message.Internal.ClearUnFillMessage(ctx, addr)
}

func (message *Message) SetFeeParams(ctx context.Context, addr address.Address, gasOverEstimation float64, maxFee, maxFeeCap string) (address.Address, error) {
Expand Down
2 changes: 1 addition & 1 deletion api/controller/auth_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var AuthMap = map[string]string{
"ListMessageByFromState": "admin",
"RepublishMessage": "admin",
"MarkBadMessage": "admin",
"ResetAddress": "admin",
"ClearUnFillMessage": "admin",
"SetSharedParams": "admin",
"GetNode": "admin",
"HasMessageByUid": "read",
Expand Down
42 changes: 0 additions & 42 deletions cli/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ var AddrCmds = &cli.Command{
activeAddrCmd,
setAddrSelMsgNumCmd,
setFeeParamsCmd,
resetAddrCmd,
},
}

Expand Down Expand Up @@ -292,44 +291,3 @@ var setFeeParamsCmd = &cli.Command{
return err
},
}

var resetAddrCmd = &cli.Command{
Name: "reset",
Usage: "reset address nonce",
ArgsUsage: "address",
Flags: []cli.Flag{
ReallyDoItFlag,
&cli.Uint64Flag{
Name: "nonce",
Usage: "The nonce you want to set",
},
},
Action: func(ctx *cli.Context) error {
client, closer, err := getAPI(ctx)
if err != nil {
return err
}
defer closer()

if !ctx.Bool("really-do-it") {
return xerrors.New("confirm to exec this command, specify --really-do-it")
}
if !ctx.Args().Present() {
return xerrors.Errorf("must pass address")
}

addr, err := address.NewFromString(ctx.Args().First())
if err != nil {
return err
}
fmt.Println("It will take dozens of seconds.")

currentNonce, err := client.ResetAddress(ctx.Context, addr, ctx.Uint64("nonce"))
if err != nil {
return err
}
fmt.Printf("address %s current nonce %d \n", addr.String(), currentNonce)

return nil
},
}
38 changes: 38 additions & 0 deletions cli/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var MsgCmds = &cli.Command{
waitMessagerCmd,
republishCmd,
markBadCmd,
clearUnFillMessageCmd,
},
}

Expand Down Expand Up @@ -666,3 +667,40 @@ func transformMessage(msg *types.Message) *message {

return m
}

var clearUnFillMessageCmd = &cli.Command{
Name: "clear-unfill-msg",
Usage: "clear unfill messages by address",
ArgsUsage: "address",
Flags: []cli.Flag{
ReallyDoItFlag,
},
Action: func(ctx *cli.Context) error {
client, closer, err := getAPI(ctx)
if err != nil {
return err
}
defer closer()

if !ctx.Bool("really-do-it") {
return xerrors.New("confirm to exec this command, specify --really-do-it")
}
if !ctx.Args().Present() {
return xerrors.Errorf("must pass address")
}

addr, err := address.NewFromString(ctx.Args().First())
if err != nil {
return err
}
fmt.Println("It will take dozens of seconds.")

count, err := client.ClearUnFillMessage(ctx.Context, addr)
if err != nil {
return err
}
fmt.Printf("clear %d unfill messages \n", count)

return nil
},
}
4 changes: 2 additions & 2 deletions cli/shared_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ var setSharedParamsCmd = &cli.Command{
Value: 1.25,
},
&cli.StringFlag{
Name: "max-fee",
Name: "max-fee",
Value: "7000000000000000",
},
&cli.StringFlag{
Name: "max-feecap",
Name: "max-feecap",
Value: "0",
},
&cli.Uint64Flag{
Expand Down
8 changes: 4 additions & 4 deletions models/mysql/shared_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
type mysqlSharedParams struct {
ID uint `gorm:"primary_key;column:id;type:SMALLINT(2) unsigned AUTO_INCREMENT;NOT NULL" json:"id"`

GasOverEstimation float64 `gorm:"column:gas_over_estimation;type:DOUBLE;NOT NULL"`
MaxFee types.Int `gorm:"column:max_fee;type:varchar(256);NOT NULL"`
MaxFeeCap types.Int `gorm:"column:max_fee_cap;type:varchar(256);NOT NULL"`
SelMsgNum uint64 `gorm:"column:sel_msg_num;type:BIGINT(20) UNSIGNED;NOT NULL"`
GasOverEstimation float64 `gorm:"column:gas_over_estimation;type:DOUBLE;NOT NULL"`
MaxFee types.Int `gorm:"column:max_fee;type:varchar(256);NOT NULL"`
MaxFeeCap types.Int `gorm:"column:max_fee_cap;type:varchar(256);NOT NULL"`
SelMsgNum uint64 `gorm:"column:sel_msg_num;type:BIGINT(20) UNSIGNED;NOT NULL"`
}

func FromSharedParams(sp types.SharedParams) *mysqlSharedParams {
Expand Down
6 changes: 3 additions & 3 deletions models/sqlite/shared_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
type sqliteSharedParams struct {
ID uint `gorm:"primary_key;column:id;type:INT unsigned AUTO_INCREMENT;NOT NULL" json:"id"`

GasOverEstimation float64 `gorm:"column:gas_over_estimation;type:REAL;NOT NULL"`
MaxFee types.Int `gorm:"column:max_fee;type:varchar(256);NOT NULL"`
MaxFeeCap types.Int `gorm:"column:max_fee_cap;type:varchar(256);NOT NULL"`
GasOverEstimation float64 `gorm:"column:gas_over_estimation;type:REAL;NOT NULL"`
MaxFee types.Int `gorm:"column:max_fee;type:varchar(256);NOT NULL"`
MaxFeeCap types.Int `gorm:"column:max_fee_cap;type:varchar(256);NOT NULL"`

SelMsgNum uint64 `gorm:"column:sel_msg_num;type:UNSIGNED BIG INT;NOT NULL"`
}
Expand Down
94 changes: 2 additions & 92 deletions service/address_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
venusTypes "github.com/filecoin-project/venus/pkg/types"
"golang.org/x/xerrors"
"gorm.io/gorm"

"github.com/filecoin-project/venus-messager/gateway"
"github.com/filecoin-project/venus-messager/log"
"github.com/filecoin-project/venus-messager/models/repo"
"github.com/filecoin-project/venus-messager/types"
venusTypes "github.com/filecoin-project/venus/pkg/types"
"golang.org/x/xerrors"
)

var errAddressNotExists = xerrors.New("address not exists")
Expand All @@ -24,9 +22,6 @@ type AddressService struct {
sps *SharedParamsService
nodeClient *NodeClient
walletClient *gateway.IWalletCli

resetAddressFunc chan func() (uint64, error)
resetAddressRes chan resetAddressResult
}

func NewAddressService(repo repo.Repo,
Expand All @@ -41,9 +36,6 @@ func NewAddressService(repo repo.Repo,
sps: sps,
nodeClient: nodeClient,
walletClient: walletClient,

resetAddressFunc: make(chan func() (uint64, error)),
resetAddressRes: make(chan resetAddressResult),
}

return addressService
Expand Down Expand Up @@ -148,88 +140,6 @@ func (addressService *AddressService) SetFeeParams(ctx context.Context, addr add
return addr, addressService.repo.AddressRepo().UpdateFeeParams(ctx, addr, gasOverEstimation, maxFee, maxFeeCap)
}

type resetAddressResult struct {
latestNonce uint64
err error
}

func (addressService *AddressService) resetAddress(ctx context.Context, addr address.Address, targetNonce uint64) (uint64, error) {
addrInfo, err := addressService.GetAddress(ctx, addr)
if err != nil {
return 0, err
}
actor, err := addressService.nodeClient.StateGetActor(ctx, addr, venusTypes.EmptyTSK)
if err != nil {
return 0, err
}

if targetNonce != 0 {
if targetNonce < actor.Nonce {
return 0, xerrors.Errorf("target nonce(%d) smaller than chain nonce(%d)", targetNonce, actor.Nonce)
}
} else {
targetNonce = actor.Nonce
}
addressService.log.Infof("reset address target nonce %d, chain nonce %d", targetNonce, actor.Nonce)

latestNonce := addrInfo.Nonce
if err := addressService.repo.Transaction(func(txRepo repo.TxRepo) error {
for nonce := addrInfo.Nonce - 1; nonce >= targetNonce; nonce-- {
msg, err := txRepo.MessageRepo().GetMessageByFromNonceAndState(addr, nonce, types.FillMsg)
if err != nil {
if xerrors.Is(err, gorm.ErrRecordNotFound) {
continue
}
return xerrors.Errorf("found message by address(%s) and nonce(%d) failed %v", addr.String(), nonce, err)
}
if msg.State == types.FillMsg {
if _, err := txRepo.MessageRepo().MarkBadMessage(msg.ID); err != nil {
return xerrors.Errorf("mark bad message %s failed %v", msg.ID, err)
}
latestNonce = nonce
} else if msg.State == types.OnChainMsg {
break
}
}

unFillMsgs, err := txRepo.MessageRepo().ListUnFilledMessage(addr)
if err != nil {
return err
}
for _, msg := range unFillMsgs {
if _, err := txRepo.MessageRepo().MarkBadMessage(msg.ID); err != nil {
return xerrors.Errorf("mark bad message %s failed %v", msg.ID, err)
}
}

if latestNonce < addrInfo.Nonce {
return txRepo.AddressRepo().UpdateNonce(ctx, addr, latestNonce)
}
return nil
}); err != nil {
return 0, err
}

return latestNonce, nil
}

func (addressService *AddressService) ResetAddress(ctx context.Context, addr address.Address, targetNonce uint64) (uint64, error) {
addressService.resetAddressFunc <- func() (uint64, error) {
return addressService.resetAddress(ctx, addr, targetNonce)
}

select {
case r, ok := <-addressService.resetAddressRes:
if !ok {
return 0, xerrors.Errorf("unexpect error")
}
addressService.log.Infof("reset address %s success, current nonce %d ", addr.String(), r.latestNonce)
return r.latestNonce, r.err
case <-ctx.Done():
return 0, ctx.Err()
}
}

func (addressService *AddressService) Addresses() map[address.Address]struct{} {
addrs := make(map[address.Address]struct{})
addrList, err := addressService.ListAddress(context.Background())
Expand Down
Loading

0 comments on commit 39be72c

Please sign in to comment.