Skip to content

Commit

Permalink
eth/protocols/snap: throttle trie heal requests when peers DoS us (#2…
Browse files Browse the repository at this point in the history
…5666)

* eth/protocols/snap: throttle trie heal requests when peers DoS us

* eth/protocols/snap: lower heal throttle log to debug

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth/protocols/snap: fix comment

Co-authored-by: Martin Holst Swende <martin@swende.se>
  • Loading branch information
2 people authored and kyrie-yl committed Dec 13, 2022
1 parent 9fb3b67 commit ae7512c
Showing 1 changed file with 100 additions and 7 deletions.
107 changes: 100 additions & 7 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -79,6 +81,29 @@ const (
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512

// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005

// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
minTrienodeHealThrottle = 1

// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount

// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33

// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)

var (
Expand Down Expand Up @@ -432,6 +457,11 @@ type Syncer struct {
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running

trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated

trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed
Expand Down Expand Up @@ -477,9 +507,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),

trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
stateWriter: db.NewBatch(),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),

extProgress: new(SyncProgress),
}
Expand Down Expand Up @@ -1324,6 +1355,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
if cap > maxTrieRequestCount {
cap = maxTrieRequestCount
}
cap = int(float64(cap) / s.trienodeHealThrottle)
if cap <= 0 {
cap = 1
}
var (
hashes = make([]common.Hash, 0, cap)
paths = make([]trie.SyncPath, 0, cap)
Expand Down Expand Up @@ -2094,6 +2129,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
)
for i, hash := range res.hashes {
node := res.nodes[i]

Expand All @@ -2102,6 +2141,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
res.task.trieTasks[hash] = res.paths[i]
continue
}
fills++

// Push the trie node into the state syncer
s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node))
Expand All @@ -2125,6 +2166,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))

// Calculate the processing rate of one filled trie node
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))

// Update the currently measured trienode queueing and processing throughput.
//
// The processing rate needs to be updated uniformly independent if we've
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
// the face of varying network packets. As such, we cannot just measure the
// time it took to process N trie nodes and update once, we need one update
// per trie node.
//
// Naively, that would be:
//
// for i:=0; i<fills; i++ {
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
// }
//
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
//
// We can expand that formula for the Nth item as:
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
//
// The above is a geometric sequence that can be summed to:
// HR(N) = (1-MI)^N*(OR-NR) + NR
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate

pending := atomic.LoadUint64(&s.trienodeHealPend)
if time.Since(s.trienodeHealThrottled) > time.Second {
// Periodically adjust the trie node throttler
if float64(pending) > 2*s.trienodeHealRate {
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
} else {
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
}
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
s.trienodeHealThrottle = maxTrienodeHealThrottle
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
s.trienodeHealThrottle = minTrienodeHealThrottle
}
s.trienodeHealThrottled = time.Now()

log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
}
}

// processBytecodeHealResponse integrates an already validated bytecode response
Expand Down Expand Up @@ -2659,10 +2744,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error

// Cross reference the requested trienodes with the response to find gaps
// that the serving node is missing
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash := make([]byte, 32)

nodes := make([][]byte, len(req.hashes))
var (
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
nodes = make([][]byte, len(req.hashes))
fills uint64
)
for i, j := 0, 0; i < len(trienodes); i++ {
// Find the next hash that we've been served, leaving misses with nils
hasher.Reset()
Expand All @@ -2674,16 +2761,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
}
if j < len(req.hashes) {
nodes[j] = trienodes[i]
fills++
j++
continue
}
// We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode")
}
// Response validated, send it to the scheduler for filling
atomic.AddUint64(&s.trienodeHealPend, fills)
defer func() {
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
}()
response := &trienodeHealResponse{
task: req.task,
hashes: req.hashes,
Expand Down

0 comments on commit ae7512c

Please sign in to comment.