Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add asc and by update at to msg query params #357

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 86 additions & 86 deletions go.mod

Large diffs are not rendered by default.

811 changes: 599 additions & 212 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions integration_test/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ func testListMessageByFromState(ctx context.Context, t *testing.T, api, apiSign
assert.NoError(t, err)
assert.Len(t, msgs, idsLen)
checkCreatedAt(msgs, isAsc)
for i, msg := range msgs {
assert.Equal(t, ids[idsLen-1-i], msg.ID)
for _, msg := range msgs {
assert.Contains(t, ids, msg.ID)
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions metrics/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ func SetupJaeger(lc fx.Lifecycle, tcfg *metrics.TraceConfig) error {
return nil
}

exporter, err := metrics.RegisterJaeger(tcfg.ServerName, tcfg)
exporter, err := metrics.SetupJaegerTracing(tcfg.ServerName, tcfg)
if err != nil {
return err
}
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
metrics.UnregisterJaeger(exporter)
return nil
return metrics.ShutdownJaeger(ctx, exporter)
},
})
return nil
Expand Down
64 changes: 49 additions & 15 deletions models/mysql/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,10 @@ func (m *mysqlMessageRepo) ListMessage() ([]*types.Message, error) {

func (m *mysqlMessageRepo) ListMessageByParams(params *repo.MsgQueryParams) ([]*types.Message, error) {
var sqlMsgs []*mysqlMessage
paramsMap := params.ToMap()
var err error
if params.IsPaged() {
err = m.DB.Where(paramsMap).Limit(params.Limit()).Offset(params.Offset()).Find(&sqlMsgs).Error
} else {
err = m.DB.Where(paramsMap).Find(&sqlMsgs).Error
}

query := parseQueryParams(m.DB, params)

err := query.Find(&sqlMsgs).Error
if err != nil {
return nil, err
}
Expand All @@ -425,30 +422,41 @@ func (m *mysqlMessageRepo) ListMessageByAddress(addr address.Address) ([]*types.
return result, nil
}

func (m *mysqlMessageRepo) ListFailedMessage(p *repo.MsgQueryParams) ([]*types.Message, error) {
func (m *mysqlMessageRepo) ListFailedMessage(params *repo.MsgQueryParams) ([]*types.Message, error) {
var sqlMsgs []*mysqlMessage
p.State = []types.MessageState{types.UnFillMsg}
paramsMap := p.ToMap()
err := m.DB.Order("created_at").Where(paramsMap).Find(&sqlMsgs, "error_msg != ?", "").Error
params.State = []types.MessageState{types.UnFillMsg}

query := m.DB.Order("created_at")
query.Where("error_msg != ?", "")
query = parseQueryParams(query, params)

err := query.Find(&sqlMsgs).Error
if err != nil {
return nil, err
}

result := make([]*types.Message, len(sqlMsgs))
for index, sqlMsg := range sqlMsgs {
result[index] = sqlMsg.Message()
}
return result, nil
}

func (m *mysqlMessageRepo) ListBlockedMessage(p *repo.MsgQueryParams, d time.Duration) ([]*types.Message, error) {
func (m *mysqlMessageRepo) ListBlockedMessage(params *repo.MsgQueryParams, d time.Duration) ([]*types.Message, error) {
var sqlMsgs []*mysqlMessage
t := time.Now().Add(-d)
p.State = []types.MessageState{types.FillMsg, types.UnFillMsg}
paramsMap := p.ToMap()
err := m.DB.Order("created_at").Where(paramsMap).Find(&sqlMsgs, "created_at < ?", t).Error
params.State = []types.MessageState{types.FillMsg, types.UnFillMsg}

query := parseQueryParams(m.DB, params)

query.Order("created_at")
query.Where("created_at < ?", t)

err := query.Find(&sqlMsgs).Error
if err != nil {
return nil, err
}

result := make([]*types.Message, len(sqlMsgs))
for index, sqlMsg := range sqlMsgs {
result[index] = sqlMsg.Message()
Expand Down Expand Up @@ -541,3 +549,29 @@ func (m *mysqlMessageRepo) UpdateErrMsg(id string, errMsg string) error {
}
return m.DB.Model((*mysqlMessage)(nil)).Where("id = ?", id).UpdateColumns(updateColumns).Error
}

func parseQueryParams(query *gorm.DB, params *repo.MsgQueryParams) *gorm.DB {
if !params.Asc {
query = query.Order("updated_at desc")
}
if len(params.From) > 0 {
temp := make([]string, len(params.From))
for i, addr := range params.From {
temp[i] = addr.String()
}
query = query.Where("from_addr IN ?", temp)
}
if len(params.State) > 0 {
query = query.Where("state IN ?", params.State)
}
if params.Offset != 0 {
query = query.Offset(int(params.Offset))
}
if params.Limit != 0 {
query = query.Limit(int(params.Limit))
}
if params.ByUpdateAt != nil {
query = query.Where("updated_at >= ?", params.ByUpdateAt)
}
return query
}
26 changes: 13 additions & 13 deletions models/mysql/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestListMessageByParams(t *testing.T) {
state := types.OnChainMsg

t.Run("by from", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `from_addr` = ?")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE from_addr IN (?) ORDER BY updated_at desc")).
WithArgs(from1.String()).
WillReturnRows(sqlmock.NewRows([]string{"id"}))

Expand All @@ -38,7 +38,7 @@ func TestListMessageByParams(t *testing.T) {
})

t.Run("by state", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `state` = ?")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE state IN (?) ORDER BY updated_at desc")).
WithArgs(state).
WillReturnRows(sqlmock.NewRows([]string{"id"}))

Expand All @@ -47,7 +47,7 @@ func TestListMessageByParams(t *testing.T) {
})

t.Run("by from and state", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `from_addr` = ? AND `state` = ?")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE from_addr IN (?) AND state IN (?) ORDER BY updated_at desc")).
WithArgs(from1.String(), state).
WillReturnRows(sqlmock.NewRows([]string{"id"}))

Expand All @@ -56,7 +56,7 @@ func TestListMessageByParams(t *testing.T) {
})

t.Run("by multi address", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `from_addr` IN (?,?)")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE from_addr IN (?,?) ORDER BY updated_at desc")).
WithArgs(from1.String(), from2.String()).
WillReturnRows(sqlmock.NewRows([]string{"id"}))

Expand Down Expand Up @@ -399,17 +399,17 @@ func TestListFailedMessage(t *testing.T) {
ids := []string{"msg1", "msg2"}

t.Run("no param", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `state` = ? AND (error_msg != ?) ORDER BY created_at")).
WithArgs(types.UnFillMsg, "").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE (error_msg != ?) AND state IN (?) ORDER BY created_at,updated_at desc")).
WithArgs("", types.UnFillMsg).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))

res, err := r.MessageRepo().ListFailedMessage(&repo.MsgQueryParams{})
assert.NoError(t, err)
checkMsgWithIDs(t, res, ids)
})

t.Run("state cover", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `state` = ? AND (error_msg != ?) ORDER BY created_at")).
WithArgs(types.UnFillMsg, "").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE (error_msg != ?) AND state IN (?) ORDER BY created_at,updated_at desc")).
WithArgs("", types.UnFillMsg).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))

res, err := r.MessageRepo().ListFailedMessage(&repo.MsgQueryParams{State: []types.MessageState{types.OnChainMsg}})
assert.NoError(t, err)
Expand All @@ -418,8 +418,8 @@ func TestListFailedMessage(t *testing.T) {

t.Run("indicate from", func(t *testing.T) {
addr := testutil.AddressProvider()(t)
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `from_addr` = ? AND `state` = ? AND (error_msg != ?) ORDER BY created_at")).
WithArgs(addr.String(), types.UnFillMsg, "").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE (error_msg != ?) AND from_addr IN (?) AND state IN (?) ORDER BY created_at,updated_at desc")).
WithArgs("", addr.String(), types.UnFillMsg).WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))

res, err := r.MessageRepo().ListFailedMessage(&repo.MsgQueryParams{From: []address.Address{addr}})
assert.NoError(t, err)
Expand All @@ -436,7 +436,7 @@ func TestListBlockedMessage(t *testing.T) {
blocked := time.Second * 3

t.Run("no param", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `state` IN (?,?) AND created_at < ? ORDER BY created_at")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE state IN (?,?) AND created_at < ? ORDER BY updated_at desc,created_at")).
WithArgs(types.FillMsg, types.UnFillMsg, anyTime{}).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))

Expand All @@ -446,7 +446,7 @@ func TestListBlockedMessage(t *testing.T) {
})

t.Run("param with address", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `from_addr` = ? AND `state` IN (?,?) AND created_at < ? ORDER BY created_at")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE from_addr IN (?) AND state IN (?,?) AND created_at < ? ORDER BY updated_at desc,created_at")).
WithArgs(from.String(), types.FillMsg, types.UnFillMsg, anyTime{}).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))

Expand All @@ -456,7 +456,7 @@ func TestListBlockedMessage(t *testing.T) {
})

t.Run("param with addresses", func(t *testing.T) {
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE `from_addr` IN (?,?) AND `state` IN (?,?) AND created_at < ? ORDER BY created_at")).
mock.ExpectQuery(regexp.QuoteMeta("SELECT * FROM `messages` WHERE from_addr IN (?,?) AND state IN (?,?) AND created_at < ? ORDER BY updated_at desc,created_at")).
WithArgs(from.String(), from.String(), types.FillMsg, types.UnFillMsg, anyTime{}).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(ids[0]).AddRow(ids[1]))

Expand Down
62 changes: 47 additions & 15 deletions models/sqlite/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,10 @@ func (m *sqliteMessageRepo) ExpireMessage(msgs []*types.Message) error {

func (m *sqliteMessageRepo) ListMessageByParams(params *repo.MsgQueryParams) ([]*types.Message, error) {
var sqlMsgs []*sqliteMessage
paramsMap := params.ToMap()
var err error
if params.IsPaged() {
err = m.DB.Where(paramsMap).Limit(params.Limit()).Offset(params.Offset()).Find(&sqlMsgs).Error
} else {
err = m.DB.Where(paramsMap).Find(&sqlMsgs).Error
}

query := parseQueryParams(m.DB, params)

err := query.Find(&sqlMsgs).Error
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -427,11 +424,15 @@ func (m *sqliteMessageRepo) ListMessageByFromState(from address.Address, state t
return result, err
}

func (m *sqliteMessageRepo) ListFailedMessage(p *repo.MsgQueryParams) ([]*types.Message, error) {
func (m *sqliteMessageRepo) ListFailedMessage(params *repo.MsgQueryParams) ([]*types.Message, error) {
var sqlMsgs []*sqliteMessage
p.State = []types.MessageState{types.UnFillMsg}
paramsMap := p.ToMap()
err := m.DB.Order("created_at").Where(paramsMap).Find(&sqlMsgs, "error_msg != ?", "").Error
params.State = []types.MessageState{types.UnFillMsg}

query := m.DB.Order("created_at")
query.Where("error_msg != ?", "")
query = parseQueryParams(query, params)

err := query.Find(&sqlMsgs).Error
if err != nil {
return nil, err
}
Expand All @@ -442,12 +443,17 @@ func (m *sqliteMessageRepo) ListFailedMessage(p *repo.MsgQueryParams) ([]*types.
return result, nil
}

func (m *sqliteMessageRepo) ListBlockedMessage(p *repo.MsgQueryParams, d time.Duration) ([]*types.Message, error) {
func (m *sqliteMessageRepo) ListBlockedMessage(params *repo.MsgQueryParams, d time.Duration) ([]*types.Message, error) {
var sqlMsgs []*sqliteMessage
t := time.Now().Add(-d)
p.State = []types.MessageState{types.FillMsg, types.UnFillMsg}
paramsMap := p.ToMap()
err := m.DB.Order("created_at").Where(paramsMap).Find(&sqlMsgs, "created_at < ?", t).Error
params.State = []types.MessageState{types.FillMsg, types.UnFillMsg}

query := parseQueryParams(m.DB, params)

query.Order("created_at")
query.Where("created_at < ?", t)

err := query.Find(&sqlMsgs).Error
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -543,3 +549,29 @@ func (m *sqliteMessageRepo) UpdateErrMsg(id string, errMsg string) error {
}
return m.DB.Model(&sqliteMessage{}).Where("id = ?", id).UpdateColumns(updateColumns).Error
}

func parseQueryParams(query *gorm.DB, params *repo.MsgQueryParams) *gorm.DB {
if !params.Asc {
query = query.Order("updated_at desc")
}
if len(params.From) > 0 {
temp := make([]string, len(params.From))
for i, addr := range params.From {
temp[i] = addr.String()
}
query = query.Where("from_addr IN ?", temp)
}
if len(params.State) > 0 {
query = query.Where("state IN ?", params.State)
}
if params.Offset != 0 {
query = query.Offset(int(params.Offset))
}
if params.Limit != 0 {
query = query.Limit(int(params.Limit))
}
if params.ByUpdateAt != nil {
query = query.Where("updated_at >= ?", params.ByUpdateAt)
}
return query
}
35 changes: 17 additions & 18 deletions models/sqlite/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,21 @@ func TestListMessageByParams(t *testing.T) {
addrCases = append(addrCases, addr)
}

msgList, err := messageRepo.ListMessageByParams(&repo.MsgQueryParams{State: []types.MessageState{types.UnFillMsg}, PageIndex: 1, PageSize: 100})
assert.NoError(t, err)
assert.Len(t, msgList, 0)

msgList, err = messageRepo.ListMessageByParams(&repo.MsgQueryParams{State: []types.MessageState{types.UnFillMsg}, PageIndex: 0, PageSize: 100})
assert.NoError(t, err)
assert.Len(t, msgList, 0)

msgCount := 100
onChainMsgCount := 0
unFillMsgCount := 0
addr0Count := 0
addr1Count := 0
addr0onChainMsgCount := 0
stamp := time.Now()
beforeStamp := 50

msgs := testhelper.NewMessages(msgCount)
for _, msg := range msgs {
for i, msg := range msgs {
if i == beforeStamp {
stamp = time.Now()
}

msg.State = types.MessageState(rand.Intn(7))
msg.From = addrCases[rand.Intn(len(addrCases))]
if msg.State == types.OnChainMsg {
Expand All @@ -272,16 +270,12 @@ func TestListMessageByParams(t *testing.T) {
assert.NoError(t, messageRepo.CreateMessage(msg))
}

msgList, err = messageRepo.ListMessageByParams(&repo.MsgQueryParams{PageIndex: 1, PageSize: msgCount})
msgList, err := messageRepo.ListMessageByParams(&repo.MsgQueryParams{Limit: uint(msgCount)})
assert.NoError(t, err)
assert.Len(t, msgList, msgCount)

// invalid page index (page size) will be ignored
msgList, err = messageRepo.ListMessageByParams(&repo.MsgQueryParams{PageIndex: 0, PageSize: msgCount / 2})
assert.NoError(t, err)
assert.Len(t, msgList, msgCount)

msgList, err = messageRepo.ListMessageByParams(&repo.MsgQueryParams{PageIndex: 1, PageSize: msgCount / 2})
// limit
msgList, err = messageRepo.ListMessageByParams(&repo.MsgQueryParams{Limit: uint(msgCount / 2)})
assert.NoError(t, err)
assert.Len(t, msgList, msgCount/2)

Expand Down Expand Up @@ -310,6 +304,11 @@ func TestListMessageByParams(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, msgList, addr0onChainMsgCount)

// by UpdateAt
msgList, err = messageRepo.ListMessageByParams(&repo.MsgQueryParams{ByUpdateAt: &stamp})
assert.NoError(t, err)
assert.Len(t, msgList, msgCount-beforeStamp)

}

func TestListMessageByFromState(t *testing.T) {
Expand Down Expand Up @@ -459,8 +458,8 @@ func TestListBlockedMessage(t *testing.T) {
&repo.MsgQueryParams{From: []address.Address{msgState[types.FillMsg][0].From, msgState[types.UnFillMsg][0].From}}, time.Second*2)
assert.NoError(t, err)
assert.Equal(t, 2, len(msgList))
testhelper.Equal(t, msgState[types.FillMsg][0], msgList[0])
testhelper.Equal(t, msgState[types.UnFillMsg][0], msgList[1])
testhelper.Equal(t, msgState[types.UnFillMsg][0], msgList[0])
testhelper.Equal(t, msgState[types.FillMsg][0], msgList[1])
}

func TestListUnChainMessageByAddress(t *testing.T) {
Expand Down