diff --git a/floodsub_test.go b/floodsub_test.go index cc15ead3..952855fc 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -610,7 +610,7 @@ func TestPeerTopicReporting(t *testing.T) { t.Fatal(err) } - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 200) peers := psubs[0].ListPeers("ipfs") assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID()) diff --git a/score.go b/score.go index 6c5738b6..b22f4e9b 100644 --- a/score.go +++ b/score.go @@ -78,6 +78,7 @@ type peerScore struct { // debugging inspection inspect PeerScoreInspectFn + inspectEx ExtendedPeerScoreInspectFn inspectPeriod time.Duration } @@ -113,13 +114,34 @@ const ( deliveryThrottled // we can't tell if it is valid because validation throttled ) -type PeerScoreInspectFn func(map[peer.ID]float64) +type PeerScoreInspectFn = func(map[peer.ID]float64) +type ExtendedPeerScoreInspectFn = func(map[peer.ID]*PeerScoreSnapshot) + +type PeerScoreSnapshot struct { + Score float64 + Topics map[string]*TopicScoreSnapshot + AppSpecificScore float64 + IPColocationFactor float64 + BehaviourPenalty float64 +} + +type TopicScoreSnapshot struct { + TimeInMesh time.Duration + FirstMessageDeliveries float64 + MeshMessageDeliveries float64 + InvalidMessageDeliveries float64 +} // WithPeerScoreInspect is a gossipsub router option that enables peer score debugging. // When this option is enabled, the supplied function will be invoked periodically to allow -// the application to inspec or dump the scores for connected peers. +// the application to inspect or dump the scores for connected peers. +// The supplied function can have one of two signatures: +// - PeerScoreInspectFn, which takes a map of peer IDs to score. +// - ExtendedPeerScoreInspectFn, which takes a map of peer IDs to +// PeerScoreSnapshots and allows inspection of individual score +// components for debugging peer scoring. // This option must be passed _after_ the WithPeerScore option. -func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Option { +func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { @@ -130,7 +152,19 @@ func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Opti return fmt.Errorf("peer scoring is not enabled") } - gs.score.inspect = inspect + if gs.score.inspect != nil || gs.score.inspectEx != nil { + return fmt.Errorf("duplicate peer score inspector") + } + + switch i := inspect.(type) { + case PeerScoreInspectFn: + gs.score.inspect = i + case ExtendedPeerScoreInspectFn: + gs.score.inspectEx = i + default: + return fmt.Errorf("unknown peer score insector type: %v", inspect) + } + gs.score.inspectPeriod = period return nil @@ -236,6 +270,23 @@ func (ps *peerScore) score(p peer.ID) float64 { score += p5 * ps.params.AppSpecificWeight // P6: IP collocation factor + p6 := ps.ipColocationFactor(p) + score += p6 * ps.params.IPColocationFactorWeight + + // P7: behavioural pattern penalty + p7 := pstats.behaviourPenalty * pstats.behaviourPenalty + score += p7 * ps.params.BehaviourPenaltyWeight + + return score +} + +func (ps *peerScore) ipColocationFactor(p peer.ID) float64 { + pstats, ok := ps.peerStats[p] + if !ok { + return 0 + } + + var result float64 for _, ip := range pstats.ips { _, whitelisted := ps.params.IPColocationFactorWhitelist[ip] if whitelisted { @@ -249,16 +300,11 @@ func (ps *peerScore) score(p peer.ID) float64 { peersInIP := len(ps.peerIPs[ip]) if peersInIP > ps.params.IPColocationFactorThreshold { surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold) - p6 := surpluss * surpluss - score += p6 * ps.params.IPColocationFactorWeight + result += surpluss * surpluss } } - // P7: behavioural pattern penalty - p7 := pstats.behaviourPenalty * pstats.behaviourPenalty - score += p7 * ps.params.BehaviourPenaltyWeight - - return score + return result } // behavioural pattern penalties @@ -290,7 +336,7 @@ func (ps *peerScore) background(ctx context.Context) { defer gcDeliveryRecords.Stop() var inspectScores <-chan time.Time - if ps.inspect != nil { + if ps.inspect != nil || ps.inspectEx != nil { ticker := time.NewTicker(ps.inspectPeriod) defer ticker.Stop() // also dump at exit for one final sample @@ -320,6 +366,15 @@ func (ps *peerScore) background(ctx context.Context) { // inspectScores dumps all tracked scores into the inspect function. func (ps *peerScore) inspectScores() { + if ps.inspect != nil { + ps.inspectScoresSimple() + } + if ps.inspectEx != nil { + ps.inspectScoresExtended() + } +} + +func (ps *peerScore) inspectScoresSimple() { ps.Lock() scores := make(map[peer.ID]float64, len(ps.peerStats)) for p := range ps.peerStats { @@ -334,6 +389,36 @@ func (ps *peerScore) inspectScores() { go ps.inspect(scores) } +func (ps *peerScore) inspectScoresExtended() { + ps.Lock() + scores := make(map[peer.ID]*PeerScoreSnapshot, len(ps.peerStats)) + for p, pstats := range ps.peerStats { + pss := new(PeerScoreSnapshot) + pss.Score = ps.score(p) + if len(pstats.topics) > 0 { + pss.Topics = make(map[string]*TopicScoreSnapshot, len(pstats.topics)) + for t, ts := range pstats.topics { + tss := &TopicScoreSnapshot{ + FirstMessageDeliveries: ts.firstMessageDeliveries, + MeshMessageDeliveries: ts.meshMessageDeliveries, + InvalidMessageDeliveries: ts.invalidMessageDeliveries, + } + if ts.inMesh { + tss.TimeInMesh = ts.meshTime + } + pss.Topics[t] = tss + } + } + pss.AppSpecificScore = ps.params.AppSpecificScore(p) + pss.IPColocationFactor = ps.ipColocationFactor(p) + pss.BehaviourPenalty = pstats.behaviourPenalty + scores[p] = pss + } + ps.Unlock() + + go ps.inspectEx(scores) +} + // refreshScores decays scores, and purges score records for disconnected peers, // once their expiry has elapsed. func (ps *peerScore) refreshScores() {