Skip to content

Commit

Permalink
Merge pull request #331 from filecoin-project/chore/more-error-info
Browse files Browse the repository at this point in the history
chore: more detailed error information
  • Loading branch information
diwufeiwen authored and simlecode committed Mar 22, 2023
1 parent b811a8b commit de1b1bb
Showing 1 changed file with 31 additions and 37 deletions.
68 changes: 31 additions & 37 deletions service/message_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ const (

var msgSelectLog = logging.Logger("msg-select")

func logWithAddress(addr address.Address) *zap.SugaredLogger {
return msgSelectLog.With("address", addr.String())
}

type MsgSelectMgr struct {
ctx context.Context
repo repo.Repo
Expand Down Expand Up @@ -206,7 +202,7 @@ func recordMetric(ctx context.Context, addr address.Address, selectResult *MsgSe
stats.Record(ctx, metrics.ErrMsgNumOfLastRound.M(int64(len(selectResult.ErrMsg))))
}

var errSingMessage = errors.New("sign message faield")
var errSingMessage = errors.New("sign message failed")

type MsgSelectResult struct {
Address *types.Address
Expand Down Expand Up @@ -234,6 +230,8 @@ type work struct {

start time.Time
controlChan chan struct{}

log *zap.SugaredLogger
}

func newWork(ctx context.Context,
Expand All @@ -257,6 +255,7 @@ func newWork(ctx context.Context,
walletClient: walletClient,
msgReceiver: msgReceiver,
controlChan: make(chan struct{}, 1),
log: msgSelectLog.With("address", addr),
}
}

Expand All @@ -270,15 +269,15 @@ func (w *work) startSelectMessage(
// first check w.ctx, avoid w.controlChan closed
select {
case <-w.ctx.Done():
msgSelectLog.Infof("context done: %s, %s skip select message", w.ctx.Err(), w.addr)
w.log.Infof("context done: %s, skip select message", w.ctx.Err())
return
default:
}

select {
case w.controlChan <- struct{}{}:
default:
msgSelectLog.Infof("%s is already selecting message, had took %v", w.addr, time.Since(w.start))
w.log.Infof("already selecting message, had took %v", time.Since(w.start))
return
}

Expand All @@ -287,19 +286,18 @@ func (w *work) startSelectMessage(
defer w.finish()
defer cancel()

log := logWithAddress(w.addr)
selectResult, err := w.selectMessage(ctx, appliedNonce, addrInfo, ts, maxAllowPendingMessage, sharedParams)
if err != nil {
log.Errorf("select message failed %v", err)
w.log.Errorf("select message failed: %v", err)
return
}
log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg),
w.log.Infof("select message result | SelectMsg: %d | ToPushMsg: %d | ErrMsg: %d | took: %v", len(selectResult.SelectMsg),
len(selectResult.ToPushMsg), len(selectResult.ErrMsg), time.Since(w.start))

recordMetric(ctx, w.addr, selectResult)

if err := w.saveSelectedMessages(ctx, selectResult); err != nil {
log.Errorf("failed to save selected messages to db %v", err)
w.log.Errorf("failed to save selected messages to db %v", err)
return
}

Expand All @@ -315,19 +313,16 @@ func (w *work) startSelectMessage(
select {
case w.msgReceiver <- selectResult.ToPushMsg:
default:
log.Errorf("message receiver channel is full, skip message %v %v", w.addr, len(selectResult.ToPushMsg))
w.log.Errorf("message receiver channel is full, skip %d messages", len(selectResult.ToPushMsg))
}
}
}

func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap, addrInfo *types.Address, ts *venusTypes.TipSet, maxAllowPendingMessage uint64, sharedParams *types.SharedSpec) (*MsgSelectResult, error) {
log := logWithAddress(addrInfo.Addr)

// 没有绑定账号肯定无法签名
accounts, err := w.addressService.GetAccountsOfSigner(ctx, addrInfo.Addr)
if err != nil {
log.Errorf("get account failed %v", err)
return nil, err
return nil, fmt.Errorf("get account failed: %v", err)
}

// 判断是否需要推送消息
Expand All @@ -336,12 +331,12 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
return nil, err
}
if nonceInLatestTs > addrInfo.Nonce {
log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs)
w.log.Warnf("nonce in db %d is smaller than nonce on chain %d, update to latest", addrInfo.Nonce, nonceInLatestTs)
addrInfo.Nonce = nonceInLatestTs
addrInfo.UpdatedAt = time.Now()
err := w.repo.AddressRepo().UpdateNonce(ctx, addrInfo.Addr, addrInfo.Nonce)
if err != nil {
return nil, fmt.Errorf("update nonce failed %v", err)
return nil, fmt.Errorf("update nonce failed: %v", err)
}
}

Expand All @@ -350,24 +345,24 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
// calc the message needed
nonceGap := addrInfo.Nonce - nonceInLatestTs
if nonceGap >= maxAllowPendingMessage {
log.Errorf("there are %d message not to be package", len(toPushMessage), nonceGap)
w.log.Errorf("there are %d message not to be package, nonce gap: %d", len(toPushMessage), nonceGap)
return &MsgSelectResult{
ToPushMsg: toPushMessage,
Address: addrInfo,
}, nil
}
wantCount := maxAllowPendingMessage - nonceGap
log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount)
w.log.Infof("state actor nonce %d, latest nonce in ts %d, assigned nonce %d, nonce gap %d, want %d", actorNonce, nonceInLatestTs, addrInfo.Nonce, nonceGap, wantCount)

// get unfill message
selectCount := mathutil.MinUint64(wantCount*2, 100)
messages, err := w.repo.MessageRepo().ListUnChainMessageByAddress(addrInfo.Addr, int(selectCount))
if err != nil {
return nil, fmt.Errorf("list unfill message error %v", err)
return nil, fmt.Errorf("list unfill message error: %v", err)
}

if len(messages) == 0 {
log.Infof("have no unfill message")
w.log.Infof("have no unfill message")
return &MsgSelectResult{
ToPushMsg: toPushMessage,
Address: addrInfo,
Expand All @@ -380,15 +375,15 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,

estimateResult, candidateMessages, err := w.estimateMessage(ctx, ts, messages, sharedParams, addrInfo)
if err != nil {
return nil, err
return nil, fmt.Errorf("estimate message failed: %v", err)
}

// sign
for index, msg := range candidateMessages {
// if error print error message
if len(estimateResult[index].Err) != 0 {
errMsg = append(errMsg, msgErrInfo{id: msg.ID, err: gasEstimate + estimateResult[index].Err})
log.Errorf("estimate message %s fail %s", msg.ID, estimateResult[index].Err)
w.log.Errorf("estimate message %s fail %s", msg.ID, estimateResult[index].Err)
continue
}
estimateMsg := estimateResult[index].Msg
Expand All @@ -410,10 +405,10 @@ func (w *work) selectMessage(ctx context.Context, appliedNonce *utils.NonceMap,
if err != nil {
if errors.Is(err, errSingMessage) {
errMsg = append(errMsg, msgErrInfo{id: msg.ID, err: fmt.Sprintf("%v%v", signMsg, errors.Unwrap(err))})
log.Errorf("sign message %s failed %v", msg.ID, err)
w.log.Errorf("sign message %s failed %v", msg.ID, err)
break
}
log.Error(err)
w.log.Error("signed message failed: %v", err)
continue
}

Expand Down Expand Up @@ -446,13 +441,13 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce
defer cancel()
actorI, err := handleTimeout(timeoutCtx, w.fullNode.StateGetActor, []interface{}{w.addr, ts.Key()})
if err != nil {
return 0, 0, err
return 0, 0, fmt.Errorf("get actor failed: %v", err)
}
actor := actorI.(*venusTypes.Actor)
nonceInLatestTs := actor.Nonce
// todo actor nonce maybe the latest ts. not need appliedNonce
if nonceInTs, ok := appliedNonce.Get(w.addr); ok {
msgSelectLog.Infof("update address %s nonce in ts %d nonce in actor %d", w.addr, nonceInTs, nonceInLatestTs)
w.log.Infof("nonce in ts %d, nonce in actor %d", nonceInTs, nonceInLatestTs)
nonceInLatestTs = nonceInTs
}

Expand All @@ -462,7 +457,7 @@ func (w *work) getNonce(ctx context.Context, ts *venusTypes.TipSet, appliedNonce
func (w *work) getFilledMessage(nonceInLatestTs uint64) []*venusTypes.SignedMessage {
filledMessage, err := w.repo.MessageRepo().ListFilledMessageByAddress(w.addr)
if err != nil {
msgSelectLog.Warnf("list filled message %v", err)
w.log.Warnf("list filled message %v", err)
}
msgs := make([]*venusTypes.SignedMessage, 0, len(filledMessage))
for _, msg := range filledMessage {
Expand Down Expand Up @@ -497,7 +492,7 @@ func (w *work) estimateMessage(ctx context.Context,

baseFee := ts.At(0).ParentBaseFee
if !newMsgMeta.BaseFee.NilOrZero() && baseFee.GreaterThan(newMsgMeta.BaseFee) {
msgSelectLog.Infof("skip msg %v, base fee too height %v(local) < %v(chain), height %v", msg.ID, newMsgMeta.BaseFee, baseFee, ts.Height())
w.log.Infof("skip msg %v, base fee too height %v(local) < %v(chain), height %v", msg.ID, newMsgMeta.BaseFee, baseFee, ts.Height())
continue
}

Expand All @@ -511,9 +506,9 @@ func (w *work) estimateMessage(ctx context.Context,
},
})

msgSelectLog.Infof("estimate message %s, gas fee cap %s, gas limit %v, gas premium: %s, "+
"meta maxfee %s, over estimation %f, gas over premium %f", msg.ID, msg.GasFeeCap, msg.GasLimit, msg.GasPremium,
newMsgMeta.MaxFee, newMsgMeta.GasOverEstimation, newMsgMeta.GasOverPremium)
w.log.Infof("estimate message %s, gas fee cap %s, gas limit %v, gas premium: %s, "+
"meta maxfee %s, over estimation %f, gas over premium %f, gas fee cap %v", msg.ID, msg.GasFeeCap, msg.GasLimit, msg.GasPremium,
newMsgMeta.MaxFee, newMsgMeta.GasOverEstimation, newMsgMeta.GasOverPremium, newMsgMeta.GasFeeCap)
}

estimateMsgCtx, estimateMsgCancel := context.WithTimeout(ctx, w.cfg.EstimateMessageTimeout)
Expand Down Expand Up @@ -550,8 +545,7 @@ func (w *work) signMessage(ctx context.Context, msg *types.Message, accounts []s

func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelectResult) error {
startSaveDB := time.Now()
log := msgSelectLog.With("address", selectResult.Address.Addr.String())
log.Infof("start save messages to database")
w.log.Infof("start save messages to database")
err := w.repo.Transaction(func(txRepo repo.TxRepo) error {
if len(selectResult.SelectMsg) > 0 {
if err := txRepo.MessageRepo().BatchSaveMessage(selectResult.SelectMsg); err != nil {
Expand All @@ -565,14 +559,14 @@ func (w *work) saveSelectedMessages(ctx context.Context, selectResult *MsgSelect
}

for _, m := range selectResult.ErrMsg {
msgSelectLog.Infof("update message %s error info with error %s", m.id, m.err)
w.log.Infof("update message %s error info with error %s", m.id, m.err)
if err := txRepo.MessageRepo().UpdateErrMsg(m.id, m.err); err != nil {
return err
}
}
return nil
})
log.Infof("end save messages to database, took %v, err %v", time.Since(startSaveDB), err)
w.log.Infof("end save messages to database, took %v, err %v", time.Since(startSaveDB), err)

return err
}
Expand Down

0 comments on commit de1b1bb

Please sign in to comment.