Skip to content

Commit

Permalink
Merge pull request #370 from ipfs-force-community/feat/record-push-error
Browse files Browse the repository at this point in the history
feat: record push message error info to db
  • Loading branch information
LinZexiao authored Dec 4, 2023
2 parents e5f8371 + 75b0d97 commit a254162
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ jobs:
with:
has_ffi: false
test_timeout: 20
log_level: error
7 changes: 6 additions & 1 deletion integration_test/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"math/rand"
"path/filepath"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -595,6 +596,10 @@ func testUpdateFilledMessageByID(ctx context.Context, t *testing.T, api, apiSign
assert.NoError(t, err)
checkUnsignedMsg(t, &msg.Message, &res.Message)
}

// wait message onchain
time.Sleep(blockDelay * 2)

ctx, cancel := context.WithTimeout(ctx, blockDelay*4)
defer cancel()
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -844,7 +849,7 @@ func prepare(t *testing.T) *testParams {
cfg.MessageService.WaitingChainHeadStableDuration = 1 * time.Second
blockDelay := cfg.MessageService.WaitingChainHeadStableDuration * 2
authClient := testhelper.NewMockAuthClient(t)
ms, err := mockMessagerServer(ctx, t.TempDir(), cfg, authClient)
ms, err := mockMessagerServer(ctx, filepath.Join(t.TempDir(), time.Now().String()), cfg, authClient)
assert.NoError(t, err)

go ms.start(ctx)
Expand Down
12 changes: 8 additions & 4 deletions publisher/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ type MessageReceiver chan []*types.SignedMessage

func Options() fx.Option {
return fx.Options(
fx.Provide(NewMessageReciver),
fx.Provide(NewMessageReceiver),
fx.Provide(NewIMsgPublisher),
fx.Provide(NewP2pPublisher),
fx.Provide(newRpcPublisher),
)
}

func NewMessageReciver(ctx context.Context, p IMsgPublisher) (MessageReceiver, error) {
func NewMessageReceiver(ctx context.Context, p IMsgPublisher) (MessageReceiver, error) {
msgReceiver := make(MessageReceiver, 100)
go func() {
for {
Expand Down Expand Up @@ -87,6 +87,10 @@ func NewIMsgPublisher(ctx context.Context, netParams *types.NetworkParams, cfg *
return ret, nil
}

func newRpcPublisher(ctx context.Context, nodeClient v1.FullNode, nodeProvider repo.INodeProvider, cfg *config.PublisherConfig) *RpcPublisher {
return NewRpcPublisher(ctx, nodeClient, nodeProvider, cfg.EnableMultiNode)
func newRpcPublisher(ctx context.Context,
nodeClient v1.FullNode,
r repo.Repo,
cfg *config.PublisherConfig,
) *RpcPublisher {
return NewRpcPublisher(ctx, nodeClient, r.NodeRepo(), cfg.EnableMultiNode, r.MessageRepo())
}
47 changes: 41 additions & 6 deletions publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type RpcPublisher struct {
ctx context.Context
mainNodeThread *nodeThread
nodeProvider repo.INodeProvider
msgRepo repo.MessageRepo
enableMultiNode bool

nodeThreads map[types.UUID]struct {
Expand All @@ -68,12 +69,18 @@ type RpcPublisher struct {
lk sync.Mutex
}

func NewRpcPublisher(ctx context.Context, nodeClient v1.FullNode, nodeProvider repo.INodeProvider, enableMultiNode bool) *RpcPublisher {
nThread := newNodeThread(ctx, "mainNode", nodeClient)
func NewRpcPublisher(ctx context.Context,
nodeClient v1.FullNode,
nodeProvider repo.INodeProvider,
enableMultiNode bool,
msgRepo repo.MessageRepo,
) *RpcPublisher {
nThread := newNodeThread(ctx, "mainNode", nodeClient, msgRepo)
return &RpcPublisher{
ctx: ctx,
mainNodeThread: nThread,
nodeProvider: nodeProvider,
msgRepo: msgRepo,
enableMultiNode: enableMultiNode,
nodeThreads: make(map[types.UUID]struct {
nodeThread *nodeThread
Expand Down Expand Up @@ -104,10 +111,11 @@ func (p *RpcPublisher) PublishMessages(ctx context.Context, msgs []*types.Signed
threadStruct, ok := p.nodeThreads[node.ID]
nodesRemain[node.ID] = struct{}{}
if !ok {
thrCtx, cancel := context.WithCancel(p.ctx) // nolint ignore lostcancel
thrCtx, cancel := context.WithCancel(p.ctx)
cli, closer, err := v1.DialFullNodeRPC(thrCtx, node.URL, node.Token, nil)
if err != nil {
log.Warnf("connect node(%s) fail %v", node.Name, err)
cancel()
continue
}

Expand All @@ -116,7 +124,7 @@ func (p *RpcPublisher) PublishMessages(ctx context.Context, msgs []*types.Signed
nodeThread *nodeThread
close func()
}{
nodeThread: newNodeThread(thrCtx, nodeName, cli),
nodeThread: newNodeThread(thrCtx, nodeName, cli, p.msgRepo),
close: func() {
cancel()
closer()
Expand All @@ -135,19 +143,21 @@ func (p *RpcPublisher) PublishMessages(ctx context.Context, msgs []*types.Signed
}
}

return nil // nolint ignore lostcancel
return nil
}

type nodeThread struct {
name string
nodeClient v1.FullNode
msgRepo repo.MessageRepo
msgChan chan []*types.SignedMessage
}

func newNodeThread(ctx context.Context, name string, nodeClient v1.FullNode) *nodeThread {
func newNodeThread(ctx context.Context, name string, nodeClient v1.FullNode, msgRepo repo.MessageRepo) *nodeThread {
t := &nodeThread{
name: name,
nodeClient: nodeClient,
msgRepo: msgRepo,
msgChan: make(chan []*types.SignedMessage, 30),
}
go t.run(ctx)
Expand All @@ -170,6 +180,10 @@ func (n *nodeThread) run(ctx context.Context) {
}
log.Errorf("failed to push message to node, address: %v, error: %v, msgs: %v",
msgs[0].Message.From, err, failedMsg)

for _, msg := range msgs {
n.recordPushMessageError(msg.Cid(), err)
}
} else {
log.Debugf("failed to push message: %v", err)
}
Expand All @@ -183,6 +197,27 @@ func (n *nodeThread) HandleMsg(msgs []*types.SignedMessage) {
n.msgChan <- msgs
}

func (n *nodeThread) recordPushMessageError(msgCid cid.Cid, err error) {
msg, dbErr := n.msgRepo.GetMessageByCid(msgCid)
if dbErr != nil {
log.Warnf("failed to get message from db, cid: %v, error: %v", msgCid, dbErr)
return
}

if len(msg.ErrorMsg) != 0 {
// already recorded
if msg.ErrorMsg == err.Error() {
return
}
log.Infof("update message error info, msg id: %v, old error: %s, new error: %v", msg.ID, msg.ErrorMsg, err)
}

dbErr = n.msgRepo.UpdateErrMsg(msg.ID, err.Error())
if dbErr != nil {
log.Warnf("failed to update message error info, msg id: %v, error: %v", msg.ID, dbErr)
}
}

type MergePublisher struct {
ctx context.Context
subPublishers []IMsgPublisher
Expand Down
65 changes: 62 additions & 3 deletions publisher/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"testing"
"time"

"github.com/ipfs-force-community/sophon-messager/filestore"
"github.com/ipfs-force-community/sophon-messager/mocks"
"github.com/ipfs-force-community/sophon-messager/models/sqlite"
"github.com/ipfs-force-community/sophon-messager/testhelper"

mockV1 "github.com/filecoin-project/venus/venus-shared/api/chain/v1/mock"
Expand All @@ -25,12 +27,17 @@ func TestMainNodePublishMessage(t *testing.T) {
ctrl := gomock.NewController(t)
mainNode := mockV1.NewMockFullNode(ctrl)

rpcPublisher := NewRpcPublisher(ctx, mainNode, nil, false)
fs := filestore.NewMockFileStore(t.TempDir())
sqliteRepo, err := sqlite.OpenSqlite(fs)
assert.NoError(t, err)
assert.NoError(t, sqliteRepo.AutoMigrate())

rpcPublisher := NewRpcPublisher(ctx, mainNode, nil, false, sqliteRepo.MessageRepo())
publisher := NewMergePublisher(ctx, rpcPublisher)
msgs := testhelper.NewShareSignedMessages(10)

mainNode.EXPECT().MpoolBatchPushUntrusted(ctx, msgs).Return(nil, nil).Times(1)
err := publisher.PublishMessages(ctx, msgs)
err = publisher.PublishMessages(ctx, msgs)
assert.NoError(t, err)
runtime.Gosched()
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -62,8 +69,12 @@ func TestMultiNodePublishMessage(t *testing.T) {
}
}

fs := filestore.NewMockFileStore(t.TempDir())
sqliteRepo, err := sqlite.OpenSqlite(fs)
assert.NoError(t, err)
assert.NoError(t, sqliteRepo.AutoMigrate())
nodeProvider := mocks.NewMockNodeRepo(ctrl)
rpcPublisher := NewRpcPublisher(ctx, mainNode, nodeProvider, true)
rpcPublisher := NewRpcPublisher(ctx, mainNode, nodeProvider, true, sqliteRepo.MessageRepo())

t.Run("publish message to multi node", func(t *testing.T) {
nodeProvider.EXPECT().ListNode().Return(nodes[:3], nil).Times(1)
Expand Down Expand Up @@ -192,3 +203,51 @@ func TestIntergrate(t *testing.T) {
runtime.Gosched()
time.Sleep(1 * time.Second)
}

func TestPublishMessageFailed(t *testing.T) {
ctx := context.Background()
// mock api
ctrl := gomock.NewController(t)
mainNode := mockV1.NewMockFullNode(ctrl)

fs := filestore.NewMockFileStore(t.TempDir())
sqliteRepo, err := sqlite.OpenSqlite(fs)
assert.NoError(t, err)
assert.NoError(t, sqliteRepo.AutoMigrate())

rpcPublisher := NewRpcPublisher(ctx, mainNode, nil, false, sqliteRepo.MessageRepo())
publisher := NewMergePublisher(ctx, rpcPublisher)

msgs := testhelper.NewShareSignedMessages(10)
form := msgs[0].Message.From
for _, msg := range msgs {
msgCid := msg.Cid()
msg.Message.From = form
m := &mtypes.Message{
ID: types.NewUUID().String(),
UnsignedCid: &msgCid,
SignedCid: &msgCid,
Message: msg.Message,
}
assert.NoError(t, sqliteRepo.MessageRepo().CreateMessage(m))
}

balanceToLowErr := fmt.Errorf("not enough funds (required: 0.08343657656301909 FIL, balance: 0.003413734154635385 FIL): not enough funds to execute transaction")
mainNode.EXPECT().MpoolBatchPushUntrusted(ctx, msgs).Return(nil, balanceToLowErr).Times(1)
err = publisher.PublishMessages(ctx, msgs)
assert.NoError(t, err)

for i := 0; i < 10; i++ {
msg, err := sqliteRepo.MessageRepo().GetMessageByCid(msgs[0].Cid())
if err == nil && len(msg.ErrorMsg) != 0 {
assert.Equal(t, balanceToLowErr.Error(), msg.ErrorMsg)
break
}

if i == 9 {
assert.Fail(t, "failed to get message error")
}

time.Sleep(1 * time.Second)
}
}
4 changes: 2 additions & 2 deletions service/message_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,12 @@ func newMessageServiceHelper(ctx context.Context, t *testing.T, opts ...opt) *me
sharedParamsService, err := NewSharedParamsService(ctx, repo)
assert.NoError(t, err)

rpcPublisher := publisher.NewRpcPublisher(ctx, fullNode, repo.NodeRepo(), false)
rpcPublisher := publisher.NewRpcPublisher(ctx, fullNode, repo.NodeRepo(), false, repo.MessageRepo())
networkParams := &shared.NetworkParams{BlockDelaySecs: 30}
msgPublisher, err := publisher.NewIMsgPublisher(ctx, networkParams, cfg.Publisher, nil, rpcPublisher)
assert.NoError(t, err)

msgReceiver, err := publisher.NewMessageReciver(ctx, msgPublisher)
msgReceiver, err := publisher.NewMessageReceiver(ctx, msgPublisher)
assert.NoError(t, err)
ms, err := NewMessageService(ctx, repo, fullNode, fsRepo, addressService, sharedParamsService,
walletProxy, msgReceiver)
Expand Down

0 comments on commit a254162

Please sign in to comment.