diff --git a/service/message_selector.go b/service/message_selector.go index 731ac19d..c4a1ce20 100644 --- a/service/message_selector.go +++ b/service/message_selector.go @@ -347,37 +347,6 @@ func (messageSelector *MessageSelector) getNonceInTipset(ctx context.Context, ts return applied, nil } -func (messageSelector *MessageSelector) GasEstimateMessageGas(ctx context.Context, msg *venusTypes.Message, meta *types.SendSpec, tsk venusTypes.TipSetKey) (*venusTypes.Message, error) { - if msg.GasLimit == 0 { - gasLimitI, err := handleTimeout(ctx, messageSelector.nodeClient.GasEstimateGasLimit, []interface{}{msg, venusTypes.EmptyTSK}) - if err != nil { - return nil, fmt.Errorf("estimating gas used: %w", err) - } - gasLimit := gasLimitI.(int64) - //GasOverEstimation default value should be 1.25 - msg.GasLimit = int64(float64(gasLimit) * meta.GasOverEstimation) - } - - if msg.GasPremium == venusTypes.EmptyInt || venusTypes.BigCmp(msg.GasPremium, venusTypes.NewInt(0)) == 0 { - gasPremiumI, err := handleTimeout(ctx, messageSelector.nodeClient.GasEstimateGasPremium, []interface{}{uint64(10), msg.From, msg.GasLimit, venusTypes.EmptyTSK}) - if err != nil { - return nil, fmt.Errorf("estimating gas price: %w", err) - } - msg.GasPremium = gasPremiumI.(big.Int) - } - - if msg.GasFeeCap == venusTypes.EmptyInt || venusTypes.BigCmp(msg.GasFeeCap, venusTypes.NewInt(0)) == 0 { - feeCapI, err := handleTimeout(ctx, messageSelector.nodeClient.GasEstimateFeeCap, []interface{}{msg, int64(20), venusTypes.EmptyTSK}) - if err != nil { - return nil, fmt.Errorf("estimating fee cap: %w", err) - } - msg.GasFeeCap = feeCapI.(big.Int) - } - - CapGasFee(msg, meta.MaxFee) - - return msg, nil -} func (messageSelector *MessageSelector) uniqAddresses(addrList []*types.Address) []*types.Address { uniqAddr := make(map[address.Address]struct{}, len(addrList)) diff --git a/service/message_selector_test.go b/service/message_selector_test.go index 8f372021..de7ba0e0 100644 --- a/service/message_selector_test.go +++ b/service/message_selector_test.go @@ -8,8 +8,8 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/venus/pkg/constants" "github.com/filecoin-project/venus/venus-shared/testutil" + shared "github.com/filecoin-project/venus/venus-shared/types" types "github.com/filecoin-project/venus/venus-shared/types/messager" "github.com/stretchr/testify/assert" "go.uber.org/fx/fxtest" @@ -148,6 +148,7 @@ func TestAddrSelectMsgNum(t *testing.T) { } func TestSelectMessage(t *testing.T) { + // stm: @MESSENGER_SELECTOR_SELECT_MESSAGE_001, @MESSENGER_SELECTOR_SELECT_MESSAGE_002 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -177,6 +178,10 @@ func TestSelectMessage(t *testing.T) { assert.NoError(t, err) assert.Equal(t, &MsgSelectResult{}, res) + // If an error occurs retrieving nonce in tipset, return that error + _, err = ms.messageSelector.SelectMessage(ctx, &shared.TipSet{}) + assert.Error(t, err) + totalMsg := len(addrs) * 10 msgs := genMessages(addrs, account, totalMsg) assert.NoError(t, pushMessage(ctx, ms, msgs)) @@ -276,6 +281,7 @@ func TestSelectNum(t *testing.T) { } func TestEstimateMessageGas(t *testing.T) { + // stm: @MESSENGER_SELECTOR_ESTIMATE_MESSAGE_GAS_001 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -362,6 +368,25 @@ func TestEstimateMessageGas(t *testing.T) { assert.NoError(t, saveAndPushMsgs(ctx, ms, selectResult)) } +func TestCapGasFee(t *testing.T) { + // stm: @MESSENGER_SELECTOR_CAP_MESSAGE_GAS_001 + msg := testhelper.NewMessage().Message + maxfee := func(msg *shared.Message) big.Int { + return big.Mul(big.NewInt(msg.GasLimit), msg.GasFeeCap) + } + oldFeeCap := big.NewInt(1000) + oldGasPremium := oldFeeCap + msg.GasLimit = 10000 + msg.GasFeeCap = oldFeeCap + msg.GasPremium = oldGasPremium + oldMaxFee := maxfee(&msg) + descedMaxFee := big.Div(oldMaxFee, big.NewInt(10)) + CapGasFee(&msg, descedMaxFee) + newMaxFee := maxfee(&msg) + assert.Less(t, big.Cmp(msg.GasPremium, oldGasPremium), 0) + assert.Less(t, big.Cmp(newMaxFee, oldMaxFee), 0) +} + func TestSignMessageFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -474,7 +499,7 @@ func newMessageServiceHelper(ctx context.Context, cfg *config.Config, blockDelay func pushMessage(ctx context.Context, ms *MessageService, msgs []*types.Message) error { for _, msg := range msgs { - // avoid being modified + // avoid been modified msgCopy := *msg if err := ms.pushMessage(ctx, &msgCopy); err != nil { return err @@ -509,7 +534,7 @@ func checkMsgs(ctx context.Context, t *testing.T, ms *MessageService, srcMsgs [] addrInfos := make(map[address.Address]*types.Address) idMsgMap := testhelper.SliceToMap(srcMsgs) for _, msg := range selectedMsgs { - res := waitMsgAndCheck(ctx, t, msg, ms) + res := waitMsgAndCheck(ctx, t, msg.ID, ms) addrInfo, ok := addrInfos[msg.From] if !ok { @@ -531,7 +556,7 @@ func waitMsgWithTimeout(ctx context.Context, ms *MessageService, msgID string) ( resChan := make(chan *waitMsgRes) go func() { - res, err := ms.WaitMessage(ctx, msgID, constants.MessageConfidence) + res, err := ms.WaitMessage(ctx, msgID, 2) resChan <- &waitMsgRes{ msg: res, err: err, @@ -547,10 +572,10 @@ func waitMsgWithTimeout(ctx context.Context, ms *MessageService, msgID string) ( } } -func waitMsgAndCheck(ctx context.Context, t *testing.T, msg *types.Message, ms *MessageService) *types.Message { - res, err := waitMsgWithTimeout(ctx, ms, msg.ID) +func waitMsgAndCheck(ctx context.Context, t *testing.T, msgUID string, ms *MessageService) *types.Message { + res, err := waitMsgWithTimeout(ctx, ms, msgUID) assert.NoError(t, err) - assert.Equal(t, msg.ID, res.ID) + assert.Equal(t, msgUID, res.ID) assert.Equal(t, types.OnChainMsg, res.State) assert.Greater(t, res.Height, int64(0)) assert.NotEmpty(t, res.TipSetKey.String()) diff --git a/service/message_service.go b/service/message_service.go index 3d9cb1f1..579e3af9 100644 --- a/service/message_service.go +++ b/service/message_service.go @@ -200,20 +200,7 @@ func (ms *MessageService) pushMessage(ctx context.Context, msg *types.Message) e } func (ms *MessageService) PushMessage(ctx context.Context, account string, msg *venusTypes.Message, meta *types.SendSpec) (string, error) { - newId := venusTypes.NewUUID() - if err := ms.pushMessage(ctx, &types.Message{ - ID: newId.String(), - Message: *msg, - Meta: meta, - State: types.UnFillMsg, - WalletName: account, - FromUser: account, - }); err != nil { - ms.log.Errorf("push message %s failed %v", newId.String(), err) - return newId.String(), err - } - - return newId.String(), nil + return ms.PushMessageWithId(ctx, account, venusTypes.NewUUID().String(), msg, meta) } func (ms *MessageService) PushMessageWithId(ctx context.Context, account string, id string, msg *venusTypes.Message, meta *types.SendSpec) (string, error) { diff --git a/service/message_service_test.go b/service/message_service_test.go index 343e9df8..27ff1bab 100644 --- a/service/message_service_test.go +++ b/service/message_service_test.go @@ -1,13 +1,18 @@ +//stm: #unit package service import ( "context" "fmt" + "sync" "testing" "time" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/venus/pkg/constants" + "github.com/filecoin-project/venus/venus-shared/testutil" shared "github.com/filecoin-project/venus/venus-shared/types" types "github.com/filecoin-project/venus/venus-shared/types/messager" "github.com/stretchr/testify/assert" @@ -48,6 +53,7 @@ func TestVerifyNetworkName(t *testing.T) { } func TestReplaceMessage(t *testing.T) { + // stm: @MESSENGER_SERVICE_REPLACE_MESSAGE_001, @MESSENGER_SERVICE_REPLACE_MESSAGE_002, @MESSENGER_SERVICE_REPLACE_MESSAGE_003, @MESSENGER_SERVICE_REPLACE_MESSAGE_004 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -128,6 +134,17 @@ func TestReplaceMessage(t *testing.T) { assert.Equal(t, msg.GasFeeCap, res.GasFeeCap) assert.Equal(t, msg.GasPremium, res.GasPremium) } + + failedMessageReplace := func(*types.ReplacMessageParams) { + _, err = ms.ReplaceMessage(ctx, nil) + assert.Error(t, err) + } + // param is nil, expect an error + failedMessageReplace(nil) + // message can't find, expect an error + failedMessageReplace(&types.ReplacMessageParams{ID: shared.NewUUID().String(), Auto: true}) + // message is already on chain, expect an error + failedMessageReplace(&types.ReplacMessageParams{ID: replacedMsgs[0].ID, Auto: true}) } func TestReconnectCheck(t *testing.T) { @@ -244,6 +261,7 @@ func TestReconnectCheck(t *testing.T) { } func TestMessageService_ProcessNewHead(t *testing.T) { + // stm: @MESSENGER_SERVICE_LIST_MESSAGE_BY_ADDRESS_001 ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -395,6 +413,103 @@ func TestMessageService_ProcessNewHead(t *testing.T) { }) } +func TestMessageService_PushMessage(t *testing.T) { + // stm: @MESSENGER_SERVICE_PUSH_MESSAGE_001, @MESSENGER_SERVICE_PUSH_MESSAGE_002, + // stm: @MESSENGER_SERVICE_PUSH_MESSAGE_WITH_ID_001, @MESSENGER_SERVICE_PUSH_MESSAGE_WITH_ID_002 + // stm: @MESSENGER_SERVICE_GET_MESSAGE_BY_UID_001, @MESSENGER_SERVICE_GET_MESSAGE_BY_UID_002 + // stm: @MESSENGER_SERVICE_WAIT_MESSAGE_001, @MESSENGER_SERVICE_WAIT_MESSAGE_002 + // stm: @MESSENGER_SERVICE_LIST_MESSAGE_001 + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg := config.DefaultConfig() + cfg.MessageService.WaitingChainHeadStableDuration = time.Second * 2 + blockDelay := cfg.MessageService.WaitingChainHeadStableDuration * 2 + fsRepo := filestore.NewMockFileStore(t.TempDir()) + msh, err := newMessageServiceHelper(ctx, cfg, blockDelay, fsRepo) + assert.NoError(t, err) + + account := defaultLocalToken + addr := testutil.BlsAddressProvider()(t) + assert.NoError(t, msh.fullNode.AddActors([]address.Address{addr})) + assert.NoError(t, msh.walletProxy.AddAddress(account, []address.Address{addr})) + + lc := fxtest.NewLifecycle(t) + _ = StartNodeEvents(lc, msh.fullNode, msh.ms, msh.ms.log) + assert.NoError(t, lc.Start(ctx)) + defer lc.RequireStop() + + var pushedMsg *types.Message + + t.Run("push message:", func(t *testing.T) { + rawMsg := testhelper.NewUnsignedMessage() + rawMsg.From = addr + uidStr, err := msh.ms.PushMessage(ctx, account, &rawMsg, nil) + assert.NoError(t, err) + _, err = shared.ParseUUID(uidStr) + assert.NoError(t, err) + + { + // pushing message would be failed + pushFailedMsg := testhelper.NewUnsignedMessage() + _, err = msh.ms.PushMessage(ctx, "invalid account", &pushFailedMsg, nil) + assert.Error(t, err) + // msg with uuid not exists, expect an error + _, err = msh.ms.GetMessageByUid(ctx, shared.NewUUID().String()) + assert.Error(t, err) + } + + pushedMsg, err = msh.ms.GetMessageByUid(ctx, uidStr) + assert.NoError(t, err) + assert.Equal(t, pushedMsg.ID, uidStr) + + { // list messages + msgs, err := msh.ms.ListMessage(ctx) + assert.NoError(t, err) + assert.Equal(t, len(msgs), 1) + assert.Equal(t, msgs[0].ID, uidStr) + + msgs, err = msh.ms.ListMessageByAddress(ctx, addr) + assert.NoError(t, err) + assert.Equal(t, len(msgs), 1) + assert.Equal(t, msgs[0].ID, uidStr) + } + + }) + + t.Run("wait message:", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*3) + defer cancel() + _, err := waitMsgWithTimeout(ctx, msh.ms, shared.NewUUID().String()) + assert.Error(t, err) + + wg := sync.WaitGroup{} + + waitOneMsg := func(msgID string, expectErr bool) { + wg.Add(1) + go func() { + defer wg.Done() + res, err := waitMsgWithTimeout(ctx, msh.ms, msgID) + if expectErr { + assert.Error(t, err) + return + } + assert.Equal(t, res.ID, msgID) + msgLookup, err := msh.fullNode.StateSearchMsg(ctx, shared.EmptyTSK, *res.SignedCid, constants.LookbackNoLimit, true) + assert.NoError(t, err) + assert.Equal(t, msgLookup.Height, abi.ChainEpoch(res.Height)) + assert.Equal(t, msgLookup.TipSet, res.TipSetKey) + assert.Equal(t, msgLookup.Receipt, *res.Receipt) + }() + } + + waitOneMsg(pushedMsg.ID, false) + waitOneMsg(shared.NewUUID().String(), true) + wg.Wait() + }) +} + func newMessageService(msh *messageServiceHelper, fsRepo filestore.FSRepo) *MessageService { return &MessageService{ repo: msh.ms.repo, diff --git a/service/message_state_refresh_test.go b/service/message_state_refresh_test.go index 3c41365a..15ffde7f 100644 --- a/service/message_state_refresh_test.go +++ b/service/message_state_refresh_test.go @@ -20,7 +20,7 @@ import ( "github.com/filecoin-project/venus-messager/testhelper" ) -func TestDoRefershMessageState(t *testing.T) { +func TestDoRefreshMessageState(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -54,7 +54,7 @@ func TestDoRefershMessageState(t *testing.T) { defer wg.Done() for _, msg := range msgs { - res := waitMsgAndCheck(ctx, t, msg, msh.ms) + res := waitMsgAndCheck(ctx, t, msg.ID, msh.ms) msgLookup, err := msh.fullNode.StateSearchMsg(ctx, shared.EmptyTSK, *res.SignedCid, constants.LookbackNoLimit, true) assert.NoError(t, err) diff --git a/service/message_state_test.go b/service/message_state_test.go index c29e0556..0b54010f 100644 --- a/service/message_state_test.go +++ b/service/message_state_test.go @@ -1,3 +1,4 @@ +//stm: #unit package service import ( @@ -14,6 +15,9 @@ import ( ) func TestMessageStateCache(t *testing.T) { + //stm: @MESSENGER_STATE_GET_MESSAGE_001, @MESSENGER_STATE_SET_MESSAGE_ID_001, @MESSENGER_STATE_GET_MESSAGE_STATE_BY_CID_001 + //stm: @MESSENGER_STATE_UPDATE_MESSAGE_BY_CID_001, @MESSENGER_STATE_SET_MESSAGE_ID_001, @MESSENGER_STATE_DELETE_MESSAGE_001 + //stm: @MESSENGER_STATE_MUTATE_MESSAGE_001 fs := filestore.NewMockFileStore(t.TempDir()) db, err := sqlite.OpenSqlite(fs) assert.NoError(t, err) @@ -51,4 +55,11 @@ func TestMessageStateCache(t *testing.T) { state, flag = msgState.GetMessageStateByCid(msgs[1].Cid().String()) assert.True(t, flag) assert.Equal(t, types.OnChainMsg, state) + + msgState.DeleteMessage(msgs[0].ID) + + // Since `msg[0]` has already been removed, `GetMessageStateByCid` should returns `(Unknown, false)` + state, flag = msgState.GetMessageStateByCid(msgs[0].Cid().String()) + assert.Equal(t, state, types.UnKnown) + assert.False(t, flag) }