Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: more detailed error information #331

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 33 additions & 39 deletions service/message_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ const (

var msgSelectLog = logging.Logger("msg-select")

func logWithAddress(addr address.Address) *zap.SugaredLogger {
return msgSelectLog.With("address", addr.String())
}

type MsgSelectMgr struct {
ctx context.Context
repo repo.Repo
Expand Down Expand Up @@ -212,7 +208,7 @@ func recordMetric(ctx context.Context, addr address.Address, selectResult *MsgSe
stats.Record(ctx, metrics.ErrMsgNumOfLastRound.M(int64(len(selectResult.ErrMsg))))
}

var errSingMessage = errors.New("sign message faield")
var errSingMessage = errors.New("sign message failed")

type MsgSelectResult struct {
Address *types.Address
Expand Down Expand Up @@ -242,6 +238,8 @@ type work struct {
controlChan chan struct{}

actorCache *lru.ARCCache

log *zap.SugaredLogger
}

func newWork(ctx context.Context,
Expand All @@ -267,6 +265,7 @@ func newWork(ctx context.Context,
msgReceiver: msgReceiver,
controlChan: make(chan struct{}, 1),
actorCache: cache,
log: msgSelectLog.With("address", addr),
}
}

Expand All @@ -280,15 +279,15 @@ func (w *work) startSelectMessage(
// first check w.ctx, avoid w.controlChan closed
select {
case <-w.ctx.Done():
msgSelectLog.Infof("context done: %s, %s skip select message", w.ctx.Err(), w.addr)
w.log.Infof("context done: %s, skip select message", w.ctx.Err())
return
default:
}

select {
case w.controlChan <- struct{}{}:
default:
msgSelectLog.Infof("%s is already selecting message, had took %v", w.addr, time.Since(w.start))
w.log.Infof("already selecting message, had took %v", time.Since(w.start))
return
}

Expand All @@ -297,19 +296,18 @@ func (w *work) startSelectMessage(
defer w.finish()
defer cancel()

log := logWithAddress(w.addr)
selectResult, err := w.selectMessage(ctx, appliedNonce, addrInfo, ts, maxAllowPendingMessage, sharedParams)
if err != nil {
log.Errorf("select message failed %v", err)
w.log.Errorf("select message failed: %v", err)
return
}
log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg),
w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg),
len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start))

recordMetric(ctx, w.addr, selectResult)

if err := w.saveSelectedMessages(ctx, selectResult); err != nil {
log.Errorf("failed to save selected messages to db %v", err)
w.log.Errorf("failed to save selected messages to db %v", err)
return
}

Expand All @@ -325,19 +323,16 @@ func (w *work) startSelectMessage(
select {
case w.msgReceiver <- selectResult.ToPushMsg:
default:
log.Errorf("message receiver channel is full, skip message %v %v", w.addr, len(selectResult.ToPushMsg))
w.log.Errorf("message receiver channel is full, skip %d messages", len(selectResult.ToPushMsg))
}
}
}

func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, addrInfo *types.Address, ts *venusTypes.TipSet, maxAllowPendingMessage uint64, sharedParams *types.SharedSpec) (*MsgSelectResult, error) {
log := logWithAddress(addrInfo.Addr)

// 没有绑定账号肯定无法签名
accounts, err := w.addressService.GetAccountsOfSigner(ctx, addrInfo.Addr)
if err != nil {
log.Errorf("get account failed %v", err)
return nil, err
return nil, fmt.Errorf("get account failed: %v", err)
}

// 判断是否需要推送消息
Expand All @@ -346,12 +341,12 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
return nil, err
}
if nonceInLatestTs > addrInfo.Nonce {
log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs)
w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs)
addrInfo.Nonce = nonceInLatestTs
addrInfo.UpdatedAt = time.Now()
err := w.repo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce)
if err != nil {
return nil, fmt.Errorf("update nonce failed %v", err)
return nil, fmt.Errorf("update nonce failed: %v", err)
}
}

Expand All @@ -360,24 +355,24 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
// calc the message needed
nonceGap := addrInfo.Nonce - nonceInLatestTs
if nonceGap >= maxAllowPendingMessage {
log.Errorf("there are %d message not to be package", len(toPushMessage), nonceGap)
w.log.Errorf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap)
return &MsgSelectResult{
ToPushMsg: toPushMessage,
Address: addrInfo,
}, nil
}
wantCount := maxAllowPendingMessage - nonceGap
log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount)
w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount)

// get unfill message
selectCount := mathutil.MinUint64(wantCount*2, 100)
messages, err := w.repo.MessageRepo().ListUnChainMessageByAddress(addrInfo.Addr, int(selectCount))
if err != nil {
return nil, fmt.Errorf("list unfill message error %v", err)
return nil, fmt.Errorf("list unfill message error: %v", err)
}

if len(messages) == 0 {
log.Infof("have no unfill message")
w.log.Infof("have no unfill message")
return &MsgSelectResult{
ToPushMsg: toPushMessage,
Address: addrInfo,
Expand All @@ -390,15 +385,15 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,

estimateResult, candidateMessages, err := w.estimateMessage(ctx, ts, messages, sharedParams, addrInfo)
if err != nil {
return nil, err
return nil, fmt.Errorf("estimate message failed: %v", err)
}

// sign
for index, msg := range candidateMessages {
// if error print error message
if len(estimateResult[index].Err) != 0 {
errMsg = append(errMsg, msgErrInfo{id: msg.ID, err: gasEstimate + estimateResult[index].Err})
log.Errorf("estimate message %s fail %s", msg.ID, estimateResult[index].Err)
w.log.Errorf("estimate message %s fail %s", msg.ID, estimateResult[index].Err)
continue
}
estimateMsg := estimateResult[index].Msg
Expand All @@ -420,10 +415,10 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
if err != nil {
if errors.Is(err, errSingMessage) {
errMsg = append(errMsg, msgErrInfo{id: msg.ID, err: fmt.Sprintf("%v%v", signMsg, errors.Unwrap(err))})
log.Errorf("sign message %s failed %v", msg.ID, err)
w.log.Errorf("sign message %s failed %v", msg.ID, err)
break
}
log.Error(err)
w.log.Error("signed message failed: %v", err)
continue
}

Expand Down Expand Up @@ -456,13 +451,13 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce
defer cancel()
actorI, err := handleTimeout(timeoutCtx, w.fullNode.StateGetActor, []interface{}{w.addr, ts.Key()})
if err != nil {
return 0, 0, err
return 0, 0, fmt.Errorf("get actor failed: %v", err)
}
actor := actorI.(*venusTypes.Actor)
nonceInLatestTs := actor.Nonce
// todo actor nonce maybe the latest ts. not need appliedNonce
if nonceInTs, ok := appliedNonce.Get(w.addr); ok {
msgSelectLog.Infof("update address %s nonce in ts %d nonce in actor %d", w.addr, nonceInTs, nonceInLatestTs)
w.log.Infof("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs)
nonceInLatestTs = nonceInTs
}

Expand All @@ -472,7 +467,7 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce
func (w *work) getFilledMessage(nonceInLatestTs uint64) []*venusTypes.SignedMessage {
filledMessage, err := w.repo.MessageRepo().ListFilledMessageByAddress(w.addr)
if err != nil {
msgSelectLog.Warnf("list filled message %v", err)
w.log.Warnf("list filled message %v", err)
}
msgs := make([]*venusTypes.SignedMessage, 0, len(filledMessage))
for _, msg := range filledMessage {
Expand All @@ -499,12 +494,12 @@ func (w *work) estimateMessage(ctx context.Context,

nv, err := w.fullNode.StateNetworkVersion(ctx, venusTypes.EmptyTSK)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("get network version failed: %v", err)
}
for _, msg := range msgs {
actorCfg, err := w.getActorCfg(ctx, msg, nv)
if err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("get actor config failed: %v", err)
}
newMsgMeta := mergeMsgSpec(sharedParams, msg.Meta, addrInfo, actorCfg, msg)

Expand All @@ -514,7 +509,7 @@ func (w *work) estimateMessage(ctx context.Context,

baseFee := ts.At(0).ParentBaseFee
if !newMsgMeta.BaseFee.NilOrZero() && baseFee.GreaterThan(newMsgMeta.BaseFee) {
msgSelectLog.Infof("skip msg %v, base fee too height %v(local) < %v(chain), height %v", msg.ID, newMsgMeta.BaseFee, baseFee, ts.Height())
w.log.Infof("skip msg %v, base fee too height %v(local) < %v(chain), height %v", msg.ID, newMsgMeta.BaseFee, baseFee, ts.Height())
continue
}

Expand All @@ -528,9 +523,9 @@ func (w *work) estimateMessage(ctx context.Context,
},
})

msgSelectLog.Infof("estimate message %s, gas fee cap %s, gas limit %v, gas premium: %s, "+
"meta maxfee %s, over estimation %f, gas over premium %f", msg.ID, msg.GasFeeCap, msg.GasLimit, msg.GasPremium,
newMsgMeta.MaxFee, newMsgMeta.GasOverEstimation, newMsgMeta.GasOverPremium)
w.log.Infof("estimate message %s, gas fee cap %s, gas limit %v, gas premium: %s, "+
"meta maxfee %s, over estimation %f, gas over premium %f, gas fee cap %v", msg.ID, msg.GasFeeCap, msg.GasLimit, msg.GasPremium,
newMsgMeta.MaxFee, newMsgMeta.GasOverEstimation, newMsgMeta.GasOverPremium, newMsgMeta.GasFeeCap)
}

estimateMsgCtx, estimateMsgCancel := context.WithTimeout(ctx, w.cfg.EstimateMessageTimeout)
Expand Down Expand Up @@ -567,8 +562,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s

func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelectResult) error {
startSaveDB := time.Now()
log := msgSelectLog.With("address", selectResult.Address.Addr.String())
log.Infof("start save messages to database")
w.log.Infof("start save messages to database")
err := w.repo.Transaction(func(txRepo repo.TxRepo) error {
if len(selectResult.SelectMsg) > 0 {
if err := txRepo.MessageRepo().BatchSaveMessage(selectResult.SelectMsg); err != nil {
Expand All @@ -582,14 +576,14 @@ func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelect
}

for _, m := range selectResult.ErrMsg {
msgSelectLog.Infof("update message %s error info with error %s", m.id, m.err)
w.log.Infof("update message %s error info with error %s", m.id, m.err)
if err := txRepo.MessageRepo().UpdateErrMsg(m.id, m.err); err != nil {
return err
}
}
return nil
})
log.Infof("end save messages to database, took %v, err %v", time.Since(startSaveDB), err)
w.log.Infof("end save messages to database, took %v, err %v", time.Since(startSaveDB), err)

return err
}
Expand Down