diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index cc26be0b56..79ab572ba9 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -432,9 +432,14 @@ func (mp *MessagePool) runLoop() { } } -func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error { +func (mp *MessagePool) addLocal(m *types.SignedMessage) error { mp.localAddrs[m.Message.From] = struct{}{} + msgb, err := m.Serialize() + if err != nil { + return xerrors.Errorf("error serializing message: %w", err) + } + if err := mp.localMsgs.Put(datastore.NewKey(string(m.Cid().Bytes())), msgb); err != nil { return xerrors.Errorf("persisting local message: %w", err) } @@ -507,11 +512,6 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { <-mp.addSema }() - msgb, err := m.Serialize() - if err != nil { - return cid.Undef, err - } - mp.curTsLk.Lock() publish, err := mp.addTs(m, mp.curTs, true, false) if err != nil { @@ -520,18 +520,19 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { } mp.curTsLk.Unlock() - mp.lk.Lock() - if err := mp.addLocal(m, msgb); err != nil { - mp.lk.Unlock() - return cid.Undef, err - } - mp.lk.Unlock() - if publish { + msgb, err := m.Serialize() + if err != nil { + return cid.Undef, xerrors.Errorf("error serializing message: %w", err) + } + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + if err != nil { + return cid.Undef, xerrors.Errorf("error publishing message: %w", err) + } } - return m.Cid(), err + return m.Cid(), nil } func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { @@ -670,7 +671,19 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, return false, err } - return publish, mp.addLocked(m, !local, untrusted) + err = mp.addLocked(m, !local, untrusted) + if err != nil { + return false, err + } + + if local { + err = mp.addLocal(m) + if err != nil { + return false, xerrors.Errorf("error persisting local message: %w", err) + } + } + + return publish, nil } func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { @@ -837,11 +850,6 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { <-mp.addSema }() - msgb, err := m.Serialize() - if err != nil { - return cid.Undef, err - } - mp.curTsLk.Lock() publish, err := mp.addTs(m, mp.curTs, false, true) if err != nil { @@ -850,18 +858,19 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { } mp.curTsLk.Unlock() - mp.lk.Lock() - if err := mp.addLocal(m, msgb); err != nil { - mp.lk.Unlock() - return cid.Undef, err - } - mp.lk.Unlock() - if publish { + msgb, err := m.Serialize() + if err != nil { + return cid.Undef, xerrors.Errorf("error serializing message: %w", err) + } + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + if err != nil { + return cid.Undef, xerrors.Errorf("error publishing message: %w", err) + } } - return m.Cid(), err + return m.Cid(), nil } func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {