Skip to content

Commit

Permalink
fix: incorrect message state (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode authored Sep 8, 2022
1 parent 9a72617 commit 78e65c9
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 5 deletions.
2 changes: 1 addition & 1 deletion integration_test/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ func testSetFeeParams(ctx context.Context, t *testing.T, api messager.IMessager,
for _, addr := range allAddrs {
_, ok := usedAddrs[addr]
params.Address = addr
usedAddr = addr
if ok {
usedAddr = addr
assert.NoError(t, api.SetFeeParams(ctx, &params))
addrInfo, err := api.GetAddress(ctx, addr)
assert.NoError(t, err)
Expand Down
14 changes: 13 additions & 1 deletion service/message_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type MessageService struct {

cleanUnFillMsgFunc chan func() (int, error)
cleanUnFillMsgRes chan cleanUnFillMsgResult

blockDelay time.Duration
}

type headChan struct {
Expand Down Expand Up @@ -119,6 +121,12 @@ func NewMessageService(ctx context.Context,
go ms.recordMetricsProc(ctx)
}

networkParams, err := ms.nodeClient.StateGetNetworkParams(ctx)
if err != nil {
return nil, fmt.Errorf("get network params failed %v", err)
}
ms.blockDelay = time.Duration(networkParams.BlockDelaySecs) * time.Second

return ms, ms.verifyNetworkName()
}

Expand Down Expand Up @@ -234,7 +242,11 @@ func (ms *MessageService) PushMessageWithId(ctx context.Context, account string,
}

func (ms *MessageService) WaitMessage(ctx context.Context, id string, confidence uint64) (*types.Message, error) {
tm := time.NewTicker(time.Second * 30)
d := time.Second * 30
if ms.blockDelay > 0 {
d = ms.blockDelay
}
tm := time.NewTicker(d)
defer tm.Stop()

doneCh := make(chan struct{}, 1)
Expand Down
8 changes: 5 additions & 3 deletions service/message_state_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ func (ms *MessageService) updateMessageState(ctx context.Context, applyMsgs []ap
}

for _, msg := range applyMsgs {
localMsg, err := txRepo.MessageRepo().GetMessageByFromAndNonce(msg.msg.From, msg.msg.Nonce)
// 两个 `nonce` 都为 `0` 的消息,第一条消息预估gas失败了,第二条消息成功上链,
// 若只按 `from` 和 `nonce` 查询,查到的是第一条消息,这样第二条消息一直是 `FillMsg`
localMsg, err := txRepo.MessageRepo().GetMessageByFromNonceAndState(msg.msg.From, msg.msg.Nonce, types.FillMsg)
if err != nil {
ms.log.Warnf("msg not exit in local db maybe address %s send out of messager", msg.msg.From)
ms.log.Warnf("msg %s not exist in local db maybe address %s send out of messager", msg.signedCID, msg.msg.From)
continue
}
if localMsg.SignedCid == nil || *localMsg.SignedCid != msg.signedCID {
if localMsg.SignedCid != nil && !(*localMsg.SignedCid).Equals(msg.signedCID) {
ms.log.Warnf("replace message old msg cid %s, new msg cid %s, id %s", localMsg.SignedCid, msg.signedCID, localMsg.ID)
// replace msg
localMsg.State = types.ReplacedMsg
Expand Down
58 changes: 58 additions & 0 deletions service/message_state_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,64 @@ func TestDoRefershMessageState(t *testing.T) {
assert.Equal(t, msgLookup.Receipt, *res.Receipt)
}
})

t.Run("tow message nonce is zero", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

cfg := config.DefaultConfig()
cfg.MessageService.SkipPushMessage = true
cfg.MessageService.WaitingChainHeadStableDuration = time.Second * 2
blockDelay := cfg.MessageService.WaitingChainHeadStableDuration * 2
fsRepo := filestore.NewMockFileStore(t.TempDir())
msh, err := newMessageServiceHelper(ctx, cfg, blockDelay, fsRepo)
assert.NoError(t, err)
ms := msh.ms

account := defaultLocalToken
addrCount := 1
addrs := testhelper.ResolveAddrs(t, testhelper.RandAddresses(t, addrCount))
assert.NoError(t, msh.walletProxy.AddAddress(account, addrs))
assert.NoError(t, msh.fullNode.AddActors(addrs))

lc := fxtest.NewLifecycle(t)
_ = StartNodeEvents(lc, msh.fullNode, ms, ms.log)
assert.NoError(t, lc.Start(ctx))
defer lc.RequireStop()

// first message will estimate gas failed
// second message will on chain
// both messages nonce is 0
msgs := genMessages(addrs, defaultLocalToken, 2)
msg := msgs[0]
msg.GasLimit = -1
assert.NoError(t, pushMessage(ctx, ms, msgs))

ts, err := msh.fullNode.ChainHead(ctx)
assert.NoError(t, err)
selectResult, err := ms.messageSelector.SelectMessage(ctx, ts)
assert.NoError(t, err)
assert.Len(t, selectResult.SelectMsg, 1)
assert.Equal(t, msgs[1].ID, selectResult.SelectMsg[0].ID)
assert.Len(t, selectResult.ErrMsg, 1)
assert.Equal(t, msgs[0].ID, selectResult.ErrMsg[0].id)

assert.NoError(t, saveMsgsAndUpdateCache(ctx, ms, selectResult))

ctx, calcel := context.WithTimeout(ctx, time.Minute*3)
defer calcel()
go func() {
ms.multiPushMessages(ctx, selectResult)
}()

fillMsg := selectResult.SelectMsg[0]
res, err := waitMsgWithTimeout(ctx, msh.ms, fillMsg.ID)
assert.NoError(t, err)
assert.Equal(t, fillMsg.ID, res.ID)
assert.Equal(t, types.OnChainMsg, res.State)
assert.Equal(t, fillMsg.UnsignedCid, res.UnsignedCid)
assert.Equal(t, fillMsg.SignedCid, res.SignedCid)
})
}

func TestUpdateMessageState(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions testhelper/mock_full_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,13 @@ func (f *MockFullNode) StateNetworkName(ctx context.Context) (types.NetworkName,
return types.NetworkNameMain, nil
}

func (f *MockFullNode) StateGetNetworkParams(ctx context.Context) (*types.NetworkParams, error) {
return &types.NetworkParams{
NetworkName: types.NetworkNameMain,
BlockDelaySecs: uint64(f.blockDelay / time.Second),
}, nil
}

func (f *MockFullNode) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]types.MessageCID, error) {
f.l.Lock()
defer f.l.Unlock()
Expand Down

0 comments on commit 78e65c9

Please sign in to comment.