Skip to content

Commit

Permalink
Fix/batch push message (#192)
Browse files Browse the repository at this point in the history
* batch push messages
  • Loading branch information
hunjixin authored Apr 26, 2022
1 parent ea31a39 commit e596bc8
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions service/message_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,16 +646,26 @@ func (ms *MessageService) pushMessageToPool(ctx context.Context, ts *venusTypes.
//broad cast push to node in config ,push to multi node in db config
go func() {
tPush := time.Now()
ms.log.Infof("start to push message %d to mpool", len(selectResult.ToPushMsg))
pushMsgByAddr := make(map[address.Address][]*venusTypes.SignedMessage)
for _, msg := range selectResult.ToPushMsg {
if _, pushErr := ms.nodeClient.MpoolPush(ctx, msg); pushErr != nil {
if val, ok := pushMsgByAddr[msg.Message.From]; ok {
pushMsgByAddr[msg.Message.From] = append(val, msg)
} else {
pushMsgByAddr[msg.Message.From] = []*venusTypes.SignedMessage{msg}
}
}
ms.log.Infof("start to push message %d to mpool", len(selectResult.ToPushMsg))
for addr, msgs := range pushMsgByAddr {
//use batchpush instead of push one by one, push single may cause messsage send to different nodes when through chain-co
//issue https://github.com/filecoin-project/venus/issues/4860
if _, pushErr := ms.nodeClient.MpoolBatchPush(ctx, msgs); pushErr != nil {
if !strings.Contains(pushErr.Error(), errMinimumNonce.Error()) && !strings.Contains(pushErr.Error(), errAlreadyInMpool.Error()) {
ms.log.Errorf("push message %s to node failed %v", msg.Message.Cid().String(), pushErr)
ms.log.Errorf("push message in address %s to node failed %v", addr, pushErr)
}
}
}

ms.multiNodeToPush(ctx, selectResult.ToPushMsg)
ms.multiNodeToPush(ctx, pushMsgByAddr)

ms.log.Infof("Push message select spent:%d , save db spent:%d ,update cache spent:%d, push to node spent: %d",
time.Since(tSelect).Milliseconds(),
Expand All @@ -673,8 +683,8 @@ type nodeClient struct {
close jsonrpc.ClientCloser
}

func (ms *MessageService) multiNodeToPush(ctx context.Context, msgs []*venusTypes.SignedMessage) {
if len(msgs) == 0 {
func (ms *MessageService) multiNodeToPush(ctx context.Context, msgsByAddr map[address.Address][]*venusTypes.SignedMessage) {
if len(msgsByAddr) == 0 {
return
}

Expand All @@ -699,24 +709,19 @@ func (ms *MessageService) multiNodeToPush(ctx context.Context, msgs []*venusType
return
}

fromMap := make(map[address.Address]struct{})
for _, msg := range msgs {
if _, ok := fromMap[msg.Message.From]; !ok {
fromMap[msg.Message.From] = struct{}{}
}
}

for _, node := range nc {
for _, msg := range msgs {
if _, err := node.cli.MpoolPush(ctx, msg); err != nil {
for addr, msgs := range msgsByAddr {
//use batchpush instead of push one by one, push single may cause messsage send to different nodes when through chain-co
//issue https://github.com/filecoin-project/venus/issues/4860
if _, err := node.cli.MpoolBatchPush(ctx, msgs); err != nil {
//skip error
if !strings.Contains(err.Error(), errMinimumNonce.Error()) && !strings.Contains(err.Error(), errAlreadyInMpool.Error()) {
ms.log.Errorf("push message %s to node %s %v", msg.Cid(), node.name, err)
ms.log.Errorf("push message from %s to node %s %v", addr, node.name, err)
}
}
}
ms.log.Infof("start to broadcast message of address")
for fromAddr := range fromMap {
for fromAddr := range msgsByAddr {
if err := node.cli.MpoolPublishByAddr(ctx, fromAddr); err != nil {
ms.log.Errorf("publish message of address %s to node %s failed %v", fromAddr, node.name, err)
}
Expand Down Expand Up @@ -959,8 +964,9 @@ func (ms *MessageService) RepublishMessage(ctx context.Context, id string) error
if _, err := ms.nodeClient.MpoolPush(ctx, signedMsg); err != nil {
return err
}
ms.multiNodeToPush(ctx, []*venusTypes.SignedMessage{signedMsg})

toPush := make(map[address.Address][]*venusTypes.SignedMessage)
toPush[signedMsg.Message.From] = []*venusTypes.SignedMessage{signedMsg}
ms.multiNodeToPush(ctx, toPush)
return nil
}

Expand Down

0 comments on commit e596bc8

Please sign in to comment.