From f1379d63c20c8c801708f5ef725c78e7900ee6f6 Mon Sep 17 00:00:00 2001 From: dvovk Date: Mon, 15 Jul 2024 18:00:08 +0100 Subject: [PATCH 1/3] refactor --- erigon-lib/diagnostics/network.go | 40 +++++++++++++-------- erigon-lib/diagnostics/network_test.go | 49 ++++++++++---------------- 2 files changed, 44 insertions(+), 45 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index e69e8e3c9ec..03bae0282b0 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -43,6 +43,13 @@ func NewPeerStats(peerLimit int) *PeerStats { } func (p *PeerStats) AddOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { + p.mu.Lock() + defer p.mu.Unlock() + + p.addOrUpdatePeer(peerID, peerInfo) +} + +func (p *PeerStats) addOrUpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate) { if value, ok := p.peersInfo.Load(peerID); ok { p.updatePeer(peerID, peerInfo, value) } else { @@ -61,7 +68,7 @@ func (p *PeerStats) AddPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { } func (p *PeerStats) addPeer(peerID string, peerInfo PeerStatisticMsgUpdate) { - pv := PeerStatisticsFromMsgUpdate(peerInfo, nil) + pv := peerStatisticsFromMsgUpdate(peerInfo, nil) p.peersInfo.Store(peerID, pv) p.recordsCount++ p.lastUpdateMap[peerID] = time.Now() @@ -74,13 +81,17 @@ func (p *PeerStats) UpdatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, p } func (p *PeerStats) updatePeer(peerID string, peerInfo PeerStatisticMsgUpdate, prevValue any) { - pv := PeerStatisticsFromMsgUpdate(peerInfo, prevValue) + pv := peerStatisticsFromMsgUpdate(peerInfo, prevValue) p.peersInfo.Store(peerID, pv) p.lastUpdateMap[peerID] = time.Now() } func PeerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) PeerStatistics { + return peerStatisticsFromMsgUpdate(msg, prevValue) +} + +func peerStatisticsFromMsgUpdate(msg PeerStatisticMsgUpdate, prevValue any) PeerStatistics { ps := PeerStatistics{ PeerType: msg.PeerType, BytesIn: 0, @@ -131,6 +142,10 @@ func (p *PeerStats) GetPeers() map[string]PeerStatistics { p.mu.Lock() defer p.mu.Unlock() + return p.getPeers() +} + +func (p *PeerStats) getPeers() map[string]PeerStatistics { stats := make(map[string]PeerStatistics) p.peersInfo.Range(func(key, value interface{}) bool { loadedKey, ok := key.(string) @@ -156,6 +171,10 @@ func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { p.mu.Lock() defer p.mu.Unlock() + return p.getPeerStatistics(peerID) +} + +func (p *PeerStats) getPeerStatistics(peerID string) PeerStatistics { if value, ok := p.peersInfo.Load(peerID); ok { if peerStats, ok := value.(PeerStatistics); ok { return peerStats.Clone() @@ -165,21 +184,14 @@ func (p *PeerStats) GetPeerStatistics(peerID string) PeerStatistics { return PeerStatistics{} } -func (p *PeerStats) GetLastUpdate(peerID string) time.Time { +func (p *PeerStats) RemovePeer(peerID string) { p.mu.Lock() defer p.mu.Unlock() - if lastUpdate, ok := p.lastUpdateMap[peerID]; ok { - return lastUpdate - } - - return time.Time{} + p.removePeer(peerID) } -func (p *PeerStats) RemovePeer(peerID string) { - p.mu.Lock() - defer p.mu.Unlock() - +func (p *PeerStats) removePeer(peerID string) { p.peersInfo.Delete(peerID) p.recordsCount-- delete(p.lastUpdateMap, peerID) @@ -224,7 +236,7 @@ func (p *PeerStats) removePeersWhichExceedLimit(limit int) { if peersToRemove > 0 { peers := p.getOldestUpdatedPeersWithSize(peersToRemove) for _, peer := range peers { - p.RemovePeer(peer.PeerID) + p.removePeer(peer.PeerID) } } } @@ -251,5 +263,5 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { } func (d *DiagnosticClient) Peers() map[string]PeerStatistics { - return d.peersStats.GetPeers() + return d.peersStats.getPeers() } diff --git a/erigon-lib/diagnostics/network_test.go b/erigon-lib/diagnostics/network_test.go index cce3c25bfbb..a2c121e339c 100644 --- a/erigon-lib/diagnostics/network_test.go +++ b/erigon-lib/diagnostics/network_test.go @@ -19,7 +19,6 @@ package diagnostics_test import ( "strconv" "testing" - "time" "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/stretchr/testify/require" @@ -162,36 +161,6 @@ func TestGetPeers(t *testing.T) { require.True(t, peers["test1"].Equal(mockInboundPeerStats)) } -func TestLastUpdated(t *testing.T) { - peerStats := diagnostics.NewPeerStats(1000) - - peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) - require.NotEmpty(t, peerStats.GetLastUpdate("test1")) - - for i := 1; i < 20; i++ { - pid := "test" + strconv.Itoa(i) - peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) - //wait for 1 milisecond to make sure that the last update time is different - time.Sleep(10 * time.Millisecond) - } - - require.True(t, peerStats.GetLastUpdate("test2").After(peerStats.GetLastUpdate("test1"))) - - oldestPeers := peerStats.GetOldestUpdatedPeersWithSize(10) - - // we have 100 peers, but we should get only 10 oldest - require.Equal(t, len(oldestPeers), 10) - // the oldest peer should be test1 - require.Equal(t, "test1", oldestPeers[0].PeerID) - - // update test1 to - peerStats.AddOrUpdatePeer("test1", mockInboundUpdMsg) - oldestPeers = peerStats.GetOldestUpdatedPeersWithSize(10) - - // the oldest peer should not be test1 - require.NotEqual(t, "test1", oldestPeers[0].PeerID) -} - func TestRemovePeersWhichExceedLimit(t *testing.T) { limit := 100 peerStats := diagnostics.NewPeerStats(limit) @@ -212,6 +181,24 @@ func TestRemovePeersWhichExceedLimit(t *testing.T) { require.Equal(t, 100, peerStats.GetPeersCount()) } +func TestRemovePeer(t *testing.T) { + limit := 10 + peerStats := diagnostics.NewPeerStats(limit) + + for i := 1; i < 11; i++ { + pid := "test" + strconv.Itoa(i) + peerStats.AddOrUpdatePeer(pid, mockInboundUpdMsg) + } + require.Equal(t, 10, peerStats.GetPeersCount()) + + peerStats.RemovePeer("test1") + + require.Equal(t, limit-1, peerStats.GetPeersCount()) + + firstPeerStats := peerStats.GetPeerStatistics("test1") + require.True(t, firstPeerStats.Equal(diagnostics.PeerStatistics{})) +} + func TestAddingPeersAboveTheLimit(t *testing.T) { limit := 100 peerStats := diagnostics.NewPeerStats(limit) From 9bc04b0e42ada859cfa7a830a5775878c82429e8 Mon Sep 17 00:00:00 2001 From: dvovk Date: Tue, 16 Jul 2024 09:23:00 +0100 Subject: [PATCH 2/3] fix --- erigon-lib/diagnostics/network.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 03bae0282b0..40eaf3c511e 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -263,5 +263,7 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { } func (d *DiagnosticClient) Peers() map[string]PeerStatistics { + d.mu.Lock() + defer d.mu.Unlock() return d.peersStats.getPeers() } From 2a11fd325d3e0a783acd694ed7cbf9c64e205c3c Mon Sep 17 00:00:00 2001 From: dvovk Date: Tue, 16 Jul 2024 14:37:12 +0100 Subject: [PATCH 3/3] fix --- erigon-lib/diagnostics/network.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 40eaf3c511e..be998de435f 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -263,7 +263,5 @@ func (d *DiagnosticClient) runCollectPeersStatistics(rootCtx context.Context) { } func (d *DiagnosticClient) Peers() map[string]PeerStatistics { - d.mu.Lock() - defer d.mu.Unlock() - return d.peersStats.getPeers() + return d.peersStats.GetPeers() }