Skip to content

Commit

Permalink
fix: correctly prune peers from the seenTxSet when disconnecting (bac…
Browse files Browse the repository at this point in the history
…kport #1215) (#1217)
  • Loading branch information
mergify[bot] authored Feb 6, 2024
1 parent 1d221bc commit 6ea7f18
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
24 changes: 11 additions & 13 deletions mempool/cat/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,6 @@ func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) {
}
}

func (s *SeenTxSet) Pop(txKey types.TxKey) uint16 {
s.mtx.Lock()
defer s.mtx.Unlock()
seenSet, exists := s.set[txKey]
if exists {
for peer := range seenSet.peers {
delete(seenSet.peers, peer)
return peer
}
}
return 0
}

func (s *SeenTxSet) RemoveKey(txKey types.TxKey) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -162,6 +149,17 @@ func (s *SeenTxSet) Remove(txKey types.TxKey, peer uint16) {
}
}

func (s *SeenTxSet) RemovePeer(peer uint16) {
s.mtx.Lock()
defer s.mtx.Unlock()
for key, seenSet := range s.set {
delete(seenSet.peers, peer)
if len(seenSet.peers) == 0 {
delete(s.set, key)
}
}
}

func (s *SeenTxSet) Prune(limit time.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions mempool/cat/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestSeenTxSet(t *testing.T) {
)

seenSet := NewSeenTxSet()
require.Zero(t, seenSet.Pop(tx1Key))
require.Nil(t, seenSet.Get(tx1Key))

seenSet.Add(tx1Key, peer1)
seenSet.Add(tx1Key, peer1)
Expand All @@ -36,8 +36,8 @@ func TestSeenTxSet(t *testing.T) {
require.Equal(t, 3, seenSet.Len())
seenSet.RemoveKey(tx2Key)
require.Equal(t, 2, seenSet.Len())
require.Zero(t, seenSet.Pop(tx2Key))
require.Equal(t, peer1, seenSet.Pop(tx3Key))
require.Nil(t, seenSet.Get(tx2Key))
require.True(t, seenSet.Has(tx3Key, peer1))
}

func TestLRUTxCacheRemove(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
// peer it will find a new peer to rerequest the same transactions.
func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
peerID := memR.ids.Reclaim(peer.ID())
// clear all memory of seen txs by that peer
memR.mempool.seenByPeersSet.RemovePeer(peerID)

// remove and rerequest all pending outbound requests to that peer since we know
// we won't receive any responses from them.
outboundRequests := memR.requests.ClearAllRequestsFrom(peerID)
Expand Down
43 changes: 43 additions & 0 deletions mempool/cat/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,49 @@ func TestReactorBroadcastsSeenTxAfterReceivingTx(t *testing.T) {
peers[1].AssertExpectations(t)
}

func TestRemovePeerRequestFromOtherPeer(t *testing.T) {
reactor, _ := setupReactor(t)

tx := newDefaultTx("hello")
key := tx.Key()
peers := genPeers(2)
reactor.InitPeer(peers[0])
reactor.InitPeer(peers[1])

seenMsg := &protomem.SeenTx{TxKey: key[:]}

wantMsg := &protomem.Message{
Sum: &protomem.Message_WantTx{WantTx: &protomem.WantTx{TxKey: key[:]}},
}
wantMsgBytes, err := wantMsg.Marshal()
require.NoError(t, err)
peers[0].On("Send", MempoolStateChannel, wantMsgBytes).Return(true)
peers[1].On("Send", MempoolStateChannel, wantMsgBytes).Return(true)

reactor.ReceiveEnvelope(p2p.Envelope{
Src: peers[0],
Message: seenMsg,
ChannelID: MempoolStateChannel,
})
time.Sleep(100 * time.Millisecond)
reactor.ReceiveEnvelope(p2p.Envelope{
Src: peers[1],
Message: seenMsg,
ChannelID: MempoolStateChannel,
})

reactor.RemovePeer(peers[0], "test")

peers[0].AssertExpectations(t)
peers[1].AssertExpectations(t)

require.True(t, reactor.mempool.seenByPeersSet.Has(key, 2))
// we should have automatically sent another request out for peer 2
require.EqualValues(t, 2, reactor.requests.ForTx(key))
require.True(t, reactor.requests.Has(2, key))
require.False(t, reactor.mempool.seenByPeersSet.Has(key, 1))
}

func TestMempoolVectors(t *testing.T) {
testCases := []struct {
testName string
Expand Down
3 changes: 2 additions & 1 deletion mempool/cat/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ func (r *requestScheduler) ClearAllRequestsFrom(peer uint16) requestSet {
if !ok {
return requestSet{}
}
for _, timer := range requests {
for tx, timer := range requests {
timer.Stop()
delete(r.requestsByTx, tx)
}
delete(r.requestsByPeer, peer)
return requests
Expand Down

0 comments on commit 6ea7f18

Please sign in to comment.