Skip to content

Commit

Permalink
try own ttl map implementation for discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Jan 17, 2025
1 parent 59b1abc commit 04f69b3
Show file tree
Hide file tree
Showing 5 changed files with 668 additions and 16 deletions.
44 changes: 36 additions & 8 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sync/atomic"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/connmgr"
connmgrcore "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -283,7 +282,9 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
AddrInfo: proposal,
ConnectRetries: 0,
}
peers.DiscoveredPeersPool.Set(proposal.ID, discoveredPeer, ttlcache.DefaultTTL)
// TODO
peers.DiscoveredPeersPool.Set(proposal.ID, discoveredPeer)
//peers.DiscoveredPeersPool.Set(proposal.ID, discoveredPeer, ttlcache.DefaultTTL)

n.interfaceLogger.Debug(
"got proposal, discovered new peer",
Expand Down Expand Up @@ -319,27 +320,49 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {

// peersByPriority keeps track of best peers (by their peer score)
peersByPriority := lane.NewMaxPriorityQueue[peers.DiscoveredPeer, float64]()
peers.DiscoveredPeersPool.Range(func(item *ttlcache.Item[peer.ID, peers.DiscoveredPeer]) bool {
// TODO
peers.DiscoveredPeersPool.Range(func(key peer.ID, value peers.DiscoveredPeer) bool {
const retryLimit = 2
if item.Value().ConnectRetries >= retryLimit {
if value.ConnectRetries >= retryLimit {
// this discovered peer has been tried many times already, we'll ignore him but won't
// remove him from DiscoveredPeersPool since if we do - discovery might suggest this
// peer again (essentially resetting this peer's retry attempts counter to 0)
// TODO - comment out ??
// this log line is commented out as it is too spammy
n.interfaceLogger.Debug(
"Not gonna propose discovered peer: ran out of retries",
zap.String("peer_id", string(item.Key())),
zap.String("peer_id", string(key)),
)
return true
}
proposalScore := n.peerScore(item.Key())
proposalScore := n.peerScore(key)
if proposalScore <= 0 {
return true // we are not interested in this peer at all
}
peersByPriority.Push(item.Value(), proposalScore)
peersByPriority.Push(value, proposalScore)
return true
})
//peers.DiscoveredPeersPool.Range(func(item *ttlcache.Item[peer.ID, peers.DiscoveredPeer]) bool {
// const retryLimit = 2
// if item.Value().ConnectRetries >= retryLimit {
// // this discovered peer has been tried many times already, we'll ignore him but won't
// // remove him from DiscoveredPeersPool since if we do - discovery might suggest this
// // peer again (essentially resetting this peer's retry attempts counter to 0)
// // TODO - comment out ??
// // this log line is commented out as it is too spammy
// n.interfaceLogger.Debug(
// "Not gonna propose discovered peer: ran out of retries",
// zap.String("peer_id", string(item.Key())),
// )
// return true
// }
// proposalScore := n.peerScore(item.Key())
// if proposalScore <= 0 {
// return true // we are not interested in this peer at all
// }
// peersByPriority.Push(item.Value(), proposalScore)
// return true
//})

// propose only half as many peers as we have outbound slots available because this
// leaves some vacant slots for the next iteration - on the next iteration better
Expand All @@ -360,10 +383,15 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
// it's better to do this before we send this proposal on `connector` to minimize the chance of
// hitting the undesirable race condition (where we'll successfully connect to this peer but will
// keep retrying until last allowed retry attempt)
// TODO
peers.DiscoveredPeersPool.Set(peerCandidate.ID, peers.DiscoveredPeer{
AddrInfo: peerCandidate.AddrInfo,
ConnectRetries: peerCandidate.ConnectRetries + 1,
}, ttlcache.DefaultTTL)
})
//peers.DiscoveredPeersPool.Set(peerCandidate.ID, peers.DiscoveredPeer{
// AddrInfo: peerCandidate.AddrInfo,
// ConnectRetries: peerCandidate.ConnectRetries + 1,
//}, ttlcache.DefaultTTL)
connector <- peerCandidate.AddrInfo // try to connect to best peer
}

Expand Down
20 changes: 12 additions & 8 deletions network/peers/conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package peers

import (
"context"
"github.com/ssvlabs/ssv/utils/ttl"
"time"

"github.com/jellydator/ttlcache/v3"
connmgrcore "github.com/libp2p/go-libp2p/core/connmgr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -26,17 +26,19 @@ type DiscoveredPeer struct {
}

var (
// TODO - which implementation we should use ?
// DiscoveredPeersPool keeps track of recently discovered peers so we can rank them and choose
// the best candidates to connect to.
// TODO
DiscoveredPeersPool = ttlcache.New(ttlcache.WithTTL[peer.ID, DiscoveredPeer](10 * time.Minute))
//DiscoveredPeersPool = ttlcache.New(ttlcache.WithTTL[peer.ID, DiscoveredPeer](30 * time.Minute))
TrimmedRecently = ttlcache.New(ttlcache.WithTTL[peer.ID, struct{}](30 * time.Minute))
DiscoveredPeersPool = ttl.New[peer.ID, DiscoveredPeer](15*time.Minute, 5*time.Minute)
//DiscoveredPeersPool = ttlcache.New(ttlcache.WithTTL[peer.ID, DiscoveredPeer](10 * time.Minute))
TrimmedRecently = ttl.New[peer.ID, struct{}](30*time.Minute, 5*time.Minute)
//TrimmedRecently = ttlcache.New(ttlcache.WithTTL[peer.ID, struct{}](30 * time.Minute))
)

func init() {
go TrimmedRecently.Start() // start cleanup go-routine
go DiscoveredPeersPool.Start() // start cleanup go-routine
// TODO - for ttlcache we need to start cleanup go-routines here
//go TrimmedRecently.Start() // start cleanup go-routine
//go DiscoveredPeersPool.Start() // start cleanup go-routine
}

// ConnManager is a wrapper on top of go-libp2p/core/connmgr.ConnManager.
Expand Down Expand Up @@ -85,7 +87,9 @@ func (c connManager) TrimPeers(ctx context.Context, logger *zap.Logger, net libp
if err := c.disconnect(pid, net); err != nil {
logger.Debug("error closing peer", fields.PeerID(pid), zap.Error(err))
}
TrimmedRecently.Set(pid, struct{}{}, ttlcache.DefaultTTL) // record stats
// TODO
//TrimmedRecently.Set(pid, struct{}{}, ttlcache.DefaultTTL) // record stats
TrimmedRecently.Set(pid, struct{}{}) // record stats
trimmed = append(trimmed, pid)
if len(trimmed) >= maxTrims {
break
Expand Down
5 changes: 5 additions & 0 deletions utils/hashmap/hashmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ func New[Key comparable, Value any]() *Map[Key, Value] {
return &Map[Key, Value]{}
}

func (m *Map[Key, Value]) Has(key Key) bool {
_, ok := m.m.Load(key)
return ok
}

func (m *Map[Key, Value]) Get(key Key) (Value, bool) {
v, ok := m.m.Load(key)
if !ok {
Expand Down
59 changes: 59 additions & 0 deletions utils/ttl/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ttl

import (
"github.com/ssvlabs/ssv/utils/hashmap"
"time"
)

// Map implements a thread-safe map with a sync.Map under the hood.
type Map[Key comparable, Value any] struct {
*hashmap.Map[Key, Value]
idxLastUpdatedAt hashmap.Map[Key, time.Time]
}

func New[Key comparable, Value any](lifespan, cleanupInterval time.Duration) *Map[Key, Value] {
m := &Map[Key, Value]{
Map: hashmap.New[Key, Value](),
idxLastUpdatedAt: hashmap.Map[Key, time.Time]{},
}
go func() {
// TODO: use time.After when Go is updated to 1.23
timer := time.NewTicker(cleanupInterval)
defer timer.Stop()

// TODO - consider terminating with ctx.Done() to make this ttl map garbage-collectable
for range timer.C {
m.idxLastUpdatedAt.Range(func(key Key, t time.Time) bool {
if time.Since(t) > lifespan {
m.idxLastUpdatedAt.Delete(key)
m.Map.Delete(key)
}
return true
})
}
}()
return m
}

func (m *Map[Key, Value]) GetOrSet(key Key, value Value) (Value, bool) {
result, loaded := m.Map.GetOrSet(key, value)
if !loaded {
// gotta update timestamp sine we've just set value for this key
m.idxLastUpdatedAt.Set(key, time.Now())
}
return result, loaded
}

func (m *Map[Key, Value]) CompareAndSwap(key Key, old, new Value) (swapped bool) {
swapped = m.Map.CompareAndSwap(key, old, new)
if swapped {
// gotta update timestamp sine we've just set value for this key
m.idxLastUpdatedAt.Set(key, time.Now())
}
return swapped
}

func (m *Map[Key, Value]) Set(key Key, value Value) {
m.Map.Set(key, value)
m.idxLastUpdatedAt.Set(key, time.Now())
}
Loading

0 comments on commit 04f69b3

Please sign in to comment.