diff --git a/api/messager_impl.go b/api/messager_impl.go index 65e925ee..c043b540 100644 --- a/api/messager_impl.go +++ b/api/messager_impl.go @@ -179,8 +179,8 @@ func (m *MessageImp) ListMessage(ctx context.Context, p *types.MsgQueryParams) ( return m.MessageSrv.ListMessage(ctx, p) } -func (m *MessageImp) ListMessageByFromState(ctx context.Context, from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int) ([]*types.Message, error) { - return m.MessageSrv.ListMessageByFromState(ctx, from, state, isAsc, pageIndex, pageSize) +func (m *MessageImp) ListMessageByFromState(ctx context.Context, from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int, d time.Duration) ([]*types.Message, error) { + return m.MessageSrv.ListMessageByFromState(ctx, from, state, isAsc, pageIndex, pageSize, d) } func (m *MessageImp) ListMessageByAddress(ctx context.Context, addr address.Address) ([]*types.Message, error) { diff --git a/cli/msg.go b/cli/msg.go index aaeb7842..8b575b92 100644 --- a/cli/msg.go +++ b/cli/msg.go @@ -177,6 +177,11 @@ var listCmd = &cli.Command{ Usage: "pagination size, default tob 100", Value: 100, }, + &cli.StringFlag{ + Name: "time", + Usage: "exceeding residence time, eg. 3s,3m,3h", + Aliases: []string{"t"}, + }, FromFlag, outputTypeFlag, verboseFlag, @@ -220,11 +225,18 @@ state: } state := types.MessageState(ctx.Int("state")) + var d time.Duration + if timeStr := ctx.String("time"); len(timeStr) > 0 { + d, err = time.ParseDuration(timeStr) + if err != nil { + return err + } + } pageIndex := ctx.Int("page-index") pageSize := ctx.Int("page-size") - msgs, err := client.ListMessageByFromState(ctx.Context, from, state, false, pageIndex, pageSize) + msgs, err := client.ListMessageByFromState(ctx.Context, from, state, false, pageIndex, pageSize, d) if err != nil { return err } diff --git a/go.mod b/go.mod index 1959c93a..cc977b50 100644 --- a/go.mod +++ b/go.mod @@ -11,9 +11,9 @@ require ( github.com/filecoin-project/go-address v1.1.0 github.com/filecoin-project/go-bitfield v0.2.4 github.com/filecoin-project/go-jsonrpc v0.1.5 - github.com/filecoin-project/go-state-types v0.12.5 + github.com/filecoin-project/go-state-types v0.12.8 github.com/filecoin-project/specs-actors/v5 v5.0.6 - github.com/filecoin-project/venus v1.14.0 + github.com/filecoin-project/venus v1.14.1-0.20231214030417-05ff684e265b github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 github.com/hunjixin/automapper v0.0.0-20191127090318-9b979ce72ce2 diff --git a/go.sum b/go.sum index b4a7a8ea..02b9d197 100644 --- a/go.sum +++ b/go.sum @@ -250,8 +250,8 @@ github.com/filecoin-project/go-state-types v0.0.0-20201102161440-c8033295a1fc/go github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= -github.com/filecoin-project/go-state-types v0.12.5 h1:VQ2N2T3JeUDdIHEo/xhjnT7Q218Wl0UYIyglqT7Z9Ck= -github.com/filecoin-project/go-state-types v0.12.5/go.mod h1:iJTqGdWDvzXhuVf64Lw0hzt4TIoitMo0VgHdxdjNDZI= +github.com/filecoin-project/go-state-types v0.12.8 h1:W/UObdAsv+LbB9EfyLg92DSYoatzUWmlfV8FGyh30VA= +github.com/filecoin-project/go-state-types v0.12.8/go.mod h1:gR2NV0CSGSQwopxF+3In9nDh1sqvoYukLcs5vK0AHCA= github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4= github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= github.com/filecoin-project/specs-actors v0.9.15-0.20220514164640-94e0d5e123bd/go.mod h1:pjGEe3QlWtK20ju/aFRsiArbMX6Cn8rqEhhsiCM9xYE= @@ -269,8 +269,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.2 h1:K1xPRJoW5PBvb08QF9+4w1Ajcn github.com/filecoin-project/specs-actors/v6 v6.0.2/go.mod h1:wnfVvPnYmzPZilNvSqCSSA/ZQX3rdV/U/Vf9EIoQhrI= github.com/filecoin-project/specs-actors/v7 v7.0.1 h1:w72xCxijK7xs1qzmJiw+WYJaVt2EPHN8oiwpA1Ay3/4= github.com/filecoin-project/specs-actors/v7 v7.0.1/go.mod h1:tPLEYXoXhcpyLh69Ccq91SOuLXsPWjHiY27CzawjUEk= -github.com/filecoin-project/venus v1.14.0 h1:h2m5D+cpXJ618PmqbWeLcZmSLOaLx2bRBKRzAB+Fqyo= -github.com/filecoin-project/venus v1.14.0/go.mod h1:8dsvkGM89g+3tDduyWDEFxp8Fzc0cOAief2Riv6Q1ZY= +github.com/filecoin-project/venus v1.14.1-0.20231214030417-05ff684e265b h1:QEiYUW1o4IhszZpd6+ETM+3gobsJfAjmYNvn55LJP/g= +github.com/filecoin-project/venus v1.14.1-0.20231214030417-05ff684e265b/go.mod h1:tfYDNY0fKpB81KFl1h8TdmY08TudaArT8G5cptyj63U= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= diff --git a/integration_test/message_test.go b/integration_test/message_test.go index a3839b18..214e9f50 100644 --- a/integration_test/message_test.go +++ b/integration_test/message_test.go @@ -352,14 +352,14 @@ func testListMessageByFromState(ctx context.Context, t *testing.T, api, apiSign } tmpMsgs := make([]*types.Message, pageSize*2) - msgs, err := api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize) + msgs, err := api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize, 0) assert.NoError(t, err) assert.Len(t, msgs, pageSize) checkCreatedAt(msgs, isAsc) copy(tmpMsgs, msgs) pageIndex = 2 - msgs, err = api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize) + msgs, err = api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize, 0) assert.NoError(t, err) assert.Len(t, msgs, pageSize) checkCreatedAt(msgs, isAsc) @@ -368,7 +368,7 @@ func testListMessageByFromState(ctx context.Context, t *testing.T, api, apiSign pageSize = 40 pageIndex = 1 - msgs, err = api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize) + msgs, err = api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize, 0) assert.NoError(t, err) assert.Len(t, msgs, pageSize) checkCreatedAt(msgs, isAsc) @@ -380,7 +380,7 @@ func testListMessageByFromState(ctx context.Context, t *testing.T, api, apiSign } isAsc = false - msgs, err = api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize) + msgs, err = api.ListMessageByFromState(ctx, address.Undef, state, isAsc, pageIndex, pageSize, 0) assert.NoError(t, err) assert.Len(t, msgs, pageSize) checkCreatedAt(msgs, isAsc) @@ -393,7 +393,7 @@ func testListMessageByFromState(ctx context.Context, t *testing.T, api, apiSign } for addr, ids := range msgIDs { idsLen := len(ids) - msgs, err = api.ListMessageByFromState(ctx, addr, state, isAsc, pageIndex, idsLen) + msgs, err = api.ListMessageByFromState(ctx, addr, state, isAsc, pageIndex, idsLen, 0) assert.NoError(t, err) assert.Len(t, msgs, idsLen) checkCreatedAt(msgs, isAsc) diff --git a/models/mysql/message.go b/models/mysql/message.go index 1f299402..bf100050 100644 --- a/models/mysql/message.go +++ b/models/mysql/message.go @@ -152,7 +152,7 @@ func newMysqlMessageRepo(db *gorm.DB) *mysqlMessageRepo { return &mysqlMessageRepo{DB: db} } -func (m *mysqlMessageRepo) ListMessageByFromState(from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int) ([]*types.Message, error) { +func (m *mysqlMessageRepo) ListMessageByFromState(from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int, d time.Duration) ([]*types.Message, error) { query := m.DB.Table("messages").Offset((pageIndex - 1) * pageSize).Limit(pageSize) if from != address.Undef { @@ -165,6 +165,11 @@ func (m *mysqlMessageRepo) ListMessageByFromState(from address.Address, state ty query = query.Order("created_at DESC") } + if d != 0 { + t := time.Now().Add(-d) + query = query.Where("created_at < ?", t) + } + query = query.Where("state = ?", state) var sqlMsgs []*mysqlMessage diff --git a/models/mysql/message_test.go b/models/mysql/message_test.go index da085864..0ac8481b 100644 --- a/models/mysql/message_test.go +++ b/models/mysql/message_test.go @@ -367,16 +367,16 @@ func testListMessageByFromState(t *testing.T, r repo.Repo, mock sqlmock.Sqlmock) WithArgs(from.String(), state). WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg1")) - _, err := r.MessageRepo().ListMessageByFromState(from, state, isAsc, pageIndex, pageSize) + _, err := r.MessageRepo().ListMessageByFromState(from, state, isAsc, pageIndex, pageSize, 0) assert.NoError(t, err) - _, err = r.MessageRepo().ListMessageByFromState(address.Undef, state, isAsc, pageIndex, pageSize) + _, err = r.MessageRepo().ListMessageByFromState(address.Undef, state, isAsc, pageIndex, pageSize, 0) assert.NoError(t, err) - _, err = r.MessageRepo().ListMessageByFromState(from, state, true, pageIndex, pageSize) + _, err = r.MessageRepo().ListMessageByFromState(from, state, true, pageIndex, pageSize, 0) assert.NoError(t, err) - res, err := r.MessageRepo().ListMessageByFromState(from, state, isAsc, 2, 2) + res, err := r.MessageRepo().ListMessageByFromState(from, state, isAsc, 2, 2, 0) assert.NoError(t, err) checkMsgWithIDs(t, res, []string{"msg1"}) } diff --git a/models/repo/message_repo.go b/models/repo/message_repo.go index 861c275b..8b535840 100644 --- a/models/repo/message_repo.go +++ b/models/repo/message_repo.go @@ -33,7 +33,7 @@ type MessageRepo interface { GetSignedMessageFromFailedMsg(addr address.Address) ([]*types.Message, error) ListMessage() ([]*types.Message, error) - ListMessageByFromState(from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int) ([]*types.Message, error) + ListMessageByFromState(from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int, d time.Duration) ([]*types.Message, error) ListMessageByAddress(addr address.Address) ([]*types.Message, error) ListFailedMessage(*MsgQueryParams) ([]*types.Message, error) // ListBlockedMessage returns filled messages and unfill messages diff --git a/models/sqlite/message.go b/models/sqlite/message.go index e1fdbb46..75d20f03 100644 --- a/models/sqlite/message.go +++ b/models/sqlite/message.go @@ -398,7 +398,7 @@ func (m *sqliteMessageRepo) ListMessageByAddress(addr address.Address) ([]*types return result, nil } -func (m *sqliteMessageRepo) ListMessageByFromState(from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int) ([]*types.Message, error) { +func (m *sqliteMessageRepo) ListMessageByFromState(from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int, d time.Duration) ([]*types.Message, error) { query := m.DB.Debug().Table("messages").Offset((pageIndex - 1) * pageSize).Limit(pageSize) if from != address.Undef { @@ -409,6 +409,12 @@ func (m *sqliteMessageRepo) ListMessageByFromState(from address.Address, state t } else { query = query.Order("created_at DESC") } + + if d != 0 { + t := time.Now().Add(-d) + query = query.Where("created_at < ?", t) + } + query = query.Where("state=?", state) var sqlMsgs []*sqliteMessage diff --git a/models/sqlite/message_test.go b/models/sqlite/message_test.go index c403ada7..ee31ce15 100644 --- a/models/sqlite/message_test.go +++ b/models/sqlite/message_test.go @@ -317,11 +317,11 @@ func TestListMessageByFromState(t *testing.T) { addr, err := address.NewActorAddress(uuid.New().NodeID()) assert.NoError(t, err) - msgList, err := messageRepo.ListMessageByFromState(addr, types.UnFillMsg, false, 1, 100) + msgList, err := messageRepo.ListMessageByFromState(addr, types.UnFillMsg, false, 1, 100, 0) assert.NoError(t, err) assert.Len(t, msgList, 0) - msgList, err = messageRepo.ListMessageByFromState(addr, types.UnFillMsg, false, 0, 100) + msgList, err = messageRepo.ListMessageByFromState(addr, types.UnFillMsg, false, 0, 100, 0) assert.NoError(t, err) assert.Len(t, msgList, 0) @@ -338,7 +338,7 @@ func TestListMessageByFromState(t *testing.T) { assert.NoError(t, messageRepo.CreateMessage(msg)) } - msgList, err = messageRepo.ListMessageByFromState(addr, types.OnChainMsg, isAsc, 1, onChainMsgCount) + msgList, err = messageRepo.ListMessageByFromState(addr, types.OnChainMsg, isAsc, 1, onChainMsgCount, 0) assert.NoError(t, err) assert.Equal(t, onChainMsgCount, len(msgList)) sorted := sort.SliceIsSorted(msgList, func(i, j int) bool { @@ -347,7 +347,7 @@ func TestListMessageByFromState(t *testing.T) { assert.True(t, sorted) checkMsgList(t, msgList, testhelper.SliceToMap(msgs)) - msgList, err = messageRepo.ListMessageByFromState(addr, types.OnChainMsg, isAsc, 1, onChainMsgCount/2) + msgList, err = messageRepo.ListMessageByFromState(addr, types.OnChainMsg, isAsc, 1, onChainMsgCount/2, 0) assert.NoError(t, err) assert.Equal(t, onChainMsgCount/2, len(msgList)) } diff --git a/service/message_service.go b/service/message_service.go index d405a906..45c4b581 100644 --- a/service/message_service.go +++ b/service/message_service.go @@ -47,7 +47,7 @@ type IMessageService interface { GetMessageBySignedCid(ctx context.Context, signedCid cid.Cid) (*types.Message, error) GetMessageByUnsignedCid(ctx context.Context, unsignedCid cid.Cid) (*types.Message, error) ListMessage(ctx context.Context, params *repo.MsgQueryParams) ([]*types.Message, error) - ListMessageByFromState(ctx context.Context, from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int) ([]*types.Message, error) + ListMessageByFromState(ctx context.Context, from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int, t time.Duration) ([]*types.Message, error) ListMessageByAddress(ctx context.Context, addr address.Address) ([]*types.Message, error) ListFailedMessage(ctx context.Context, params *repo.MsgQueryParams) ([]*types.Message, error) ListBlockedMessage(ctx context.Context, params *repo.MsgQueryParams, d time.Duration) ([]*types.Message, error) @@ -385,12 +385,12 @@ func (ms *MessageService) GetMessageByFromAndNonce(ctx context.Context, from add return msg, nil } -func (ms *MessageService) ListMessageByFromState(ctx context.Context, from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int) ([]*types.Message, error) { +func (ms *MessageService) ListMessageByFromState(ctx context.Context, from address.Address, state types.MessageState, isAsc bool, pageIndex, pageSize int, d time.Duration) ([]*types.Message, error) { ts, err := ms.nodeClient.ChainHead(ctx) if err != nil { return nil, err } - msgs, err := ms.repo.MessageRepo().ListMessageByFromState(from, state, isAsc, pageIndex, pageSize) + msgs, err := ms.repo.MessageRepo().ListMessageByFromState(from, state, isAsc, pageIndex, pageSize, d) if err != nil { return nil, err }