Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discovery zero refresh timer #8661

Merged
merged 27 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cl/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) {
// Start stream handlers
handlers.NewConsensusHandlers(s.ctx, s.db, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start()

net, err := discover.ListenV5(s.ctx, conn, localNode, discCfg)
net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ func main() {
}

if *runv5 {
if _, err := discover.ListenV5(ctx, conn, ln, cfg); err != nil {
if _, err := discover.ListenV5(ctx, "any", conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
} else {
if _, err := discover.ListenUDP(ctx, conn, ln, cfg); err != nil {
if _, err := discover.ListenUDP(ctx, "any", conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/observer/observer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,5 @@ func (server *Server) Listen(ctx context.Context) (*discover.UDPv4, error) {

server.logger.Debug("Discovery UDP listener is up", "addr", realAddr)

return discover.ListenV4(ctx, conn, server.localNode, server.discConfig)
return discover.ListenV4(ctx, "any", conn, server.localNode, server.discConfig)
}
4 changes: 1 addition & 3 deletions erigon-lib/diagnostics/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package diagnostics

import "reflect"

func (p PeerStatistics) Type() Type {
return Type(reflect.TypeOf(p))
return TypeOf(p)
}
92 changes: 69 additions & 23 deletions erigon-lib/diagnostics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"reflect"
"sync"
"sync/atomic"

"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"
Expand All @@ -17,15 +18,43 @@ const (
ckChan ctxKey = iota
)

type Type reflect.Type
type Type interface {
reflect.Type
Context() context.Context
Err() error
}

type diagType struct {
reflect.Type
}

var cancelled = func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}()

func (t diagType) Context() context.Context {
providerMutex.Lock()
defer providerMutex.Unlock()
if reg := providers[t]; reg != nil {
return reg.context
}

return cancelled
}

func (t diagType) Err() error {
return t.Context().Err()
}

type Info interface {
Type() Type
}

func TypeOf(i Info) Type {
t := reflect.TypeOf(i)
return Type(t)
return diagType{t}
}

type Provider interface {
Expand All @@ -50,7 +79,7 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) {
providerMutex.Lock()
defer providerMutex.Unlock()

reg, _ := providers[infoType]
reg := providers[infoType]

if reg != nil {
for _, p := range reg.providers {
Expand All @@ -73,14 +102,16 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) {
func StartProviders(ctx context.Context, infoType Type, logger log.Logger) {
providerMutex.Lock()

reg, _ := providers[infoType]

toStart := make([]Provider, len(reg.providers))
reg := providers[infoType]

for i, provider := range reg.providers {
toStart[i] = provider
if reg == nil {
reg = &registry{}
providers[infoType] = reg
}

toStart := make([]Provider, len(reg.providers))
copy(toStart, reg.providers)

reg.context = ctx

providerMutex.Unlock()
Expand All @@ -105,18 +136,29 @@ func startProvider(ctx context.Context, infoType Type, provider Provider, logger
}
}

func Send[I Info](ctx context.Context, info I) error {
func Send[I Info](info I) error {
ctx := info.Type().Context()

if ctx.Err() != nil {
if !errors.Is(ctx.Err(), context.Canceled) {
// drop the diagnostic message if there is
// no active diagnostic context for the type
return nil
}

return ctx.Err()
}

cval := ctx.Value(ckChan)
if c, ok := cval.(chan I); ok {
select {
case c <- info:
default:
// drop the diagnostic message if the receiver is busy
// so the sender is not blocked on non critcal actions

if cp, ok := cval.(*atomic.Pointer[chan I]); ok {
if c := (*cp).Load(); c != nil {
select {
case *c <- info:
default:
// drop the diagnostic message if the receiver is busy
// so the sender is not blocked on non critcal actions
}
}
} else {
return fmt.Errorf("unexpected channel type: %T", cval)
Expand All @@ -126,16 +168,20 @@ func Send[I Info](ctx context.Context, info I) error {
}

func Context[I Info](ctx context.Context, buffer int) (context.Context, <-chan I, context.CancelFunc) {
ch := make(chan I, buffer)
ctx = context.WithValue(ctx, ckChan, ch)
c := make(chan I, buffer)
cp := atomic.Pointer[chan I]{}
cp.Store(&c)

ctx = context.WithValue(ctx, ckChan, &cp)
ctx, cancel := context.WithCancel(ctx)

return ctx, ch, func() {
if ch != nil {
toClose := ch
ch = nil
close(toClose)
}
return ctx, *cp.Load(), func() {
cancel()

if cp.CompareAndSwap(&c, nil) {
ch := c
c = nil
close(ch)
}
}
}
23 changes: 21 additions & 2 deletions erigon-lib/diagnostics/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (t *testProvider) StartDiagnostics(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-timer.C:
diagnostics.Send(ctx, testInfo{count})
diagnostics.Send(testInfo{count})
count++
}
}
Expand All @@ -54,6 +54,25 @@ func TestProviderRegistration(t *testing.T) {
}
}

func TestDelayedProviderRegistration(t *testing.T) {

time.AfterFunc(1*time.Second, func() {
// diagnostics provider
provider := &testProvider{}
diagnostics.RegisterProvider(provider, diagnostics.TypeOf(testInfo{}), log.Root())
})

// diagnostics receiver
ctx, ch, cancel := diagnostics.Context[testInfo](context.Background(), 1)
diagnostics.StartProviders(ctx, diagnostics.TypeOf(testInfo{}), log.Root())

for info := range ch {
if info.count == 3 {
cancel()
}
}
}

func TestProviderFuncRegistration(t *testing.T) {

// diagnostics provider
Expand All @@ -68,7 +87,7 @@ func TestProviderFuncRegistration(t *testing.T) {
case <-ctx.Done():
return nil
case <-timer.C:
diagnostics.Send(ctx, testInfo{count})
diagnostics.Send(testInfo{count})
count++
}
}
Expand Down
2 changes: 2 additions & 0 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ Loop:
if req != nil {
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
logger.Debug(fmt.Sprintf("[%s] Requested header", logPrefix), "from", req.Number, "length", req.Length)
cfg.hd.UpdateStats(req, false /* skeleton */, peer)
cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */)
}
Expand Down Expand Up @@ -233,6 +234,7 @@ Loop:
if req != nil {
peer, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
logger.Debug(fmt.Sprintf("[%s] Requested skeleton", logPrefix), "from", req.Number, "length", req.Length)
cfg.hd.UpdateStats(req, true /* skeleton */, peer)
lastSkeletonTime = time.Now()
}
Expand Down
42 changes: 27 additions & 15 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (

// Config for the "Looking for peers" message.
dialStatsLogInterval = 60 * time.Second // printed at most this often
dialStatsPeerLimit = 20 // but not if more than this many dialed peers

// Endpoint resolution is throttled with bounded backoff.
initialResolveDelay = 60 * time.Second
Expand Down Expand Up @@ -94,6 +93,7 @@ var (
// to create peer connections to nodes arriving through the iterator.
type dialScheduler struct {
dialConfig
mutex sync.Mutex
setupFunc dialSetupFunc
wg sync.WaitGroup
cancel context.CancelFunc
Expand Down Expand Up @@ -126,8 +126,8 @@ type dialScheduler struct {
historyTimerTime mclock.AbsTime

// for logStats
lastStatsLog mclock.AbsTime
doneSinceLastLog int
dialed int
errors map[string]uint
}

type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error
Expand Down Expand Up @@ -177,8 +177,9 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF
remPeerCh: make(chan *conn),

subProtocolVersion: subProtocolVersion,
errors: map[string]uint{},
}
d.lastStatsLog = d.clock.Now()

d.ctx, d.cancel = context.WithCancel(context.Background())
d.wg.Add(2)
go d.readNodes(it)
Expand Down Expand Up @@ -232,6 +233,9 @@ func (d *dialScheduler) loop(it enode.Iterator) {
historyExp = make(chan struct{}, 1)
)

logTimer := time.NewTicker(dialStatsLogInterval)
defer logTimer.Stop()

loop:
for {
// Launch new dials if slots are available.
Expand All @@ -243,13 +247,15 @@ loop:
nodesCh = nil
}
d.rearmHistoryTimer(historyExp)
//d.logStats()

select {
case <-d.ctx.Done():
it.Close()
break loop

case <-logTimer.C:
d.logStats()

case node := <-nodesCh:
if err := d.checkDial(node); err != nil {
d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err)
Expand All @@ -261,7 +267,7 @@ loop:
id := task.dest.ID()
delete(d.dialing, id)
d.updateStaticPool(id)
d.doneSinceLastLog++
d.dialed++

case c := <-d.addPeerCh:
if c.is(dynDialedConn) || c.is(staticDialedConn) {
Expand Down Expand Up @@ -337,15 +343,16 @@ func (d *dialScheduler) readNodes(it enode.Iterator) {
// or comes back online.
// nolint
func (d *dialScheduler) logStats() {
now := d.clock.Now()
if d.lastStatsLog.Add(dialStatsLogInterval) > now {
return
}
if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers {
d.log.Info("[p2p] Looking for peers", "protocol", d.subProtocolVersion, "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.doneSinceLastLog, "static", len(d.static))
vals := []interface{}{"protocol", d.subProtocolVersion,
"peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.dialed, "static", len(d.static)}

d.mutex.Lock()
for err, count := range d.errors {
vals = append(vals, err, count)
}
d.doneSinceLastLog = 0
d.lastStatsLog = now
d.mutex.Unlock()

d.log.Debug("[p2p] Dial scheduler", vals...)
}

// rearmHistoryTimer configures d.historyTimer to fire when the
Expand Down Expand Up @@ -543,7 +550,12 @@ func (t *dialTask) resolve(d *dialScheduler) bool {
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
fd, err := d.dialer.Dial(d.ctx, t.dest)
if err != nil {
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
cleanErr := cleanupDialErr(err)
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanErr)

d.mutex.Lock()
d.errors[cleanErr.Error()] = d.errors[cleanErr.Error()] + 1
d.mutex.Unlock()
return &dialError{err}
}
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
Expand Down
9 changes: 7 additions & 2 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func (cfg Config) withDefaults(defaultReplyTimeout time.Duration) Config {
}

// ListenUDP starts listening for discovery packets on the given UDP socket.
func ListenUDP(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(ctx, c, ln, cfg)
func ListenUDP(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(ctx, protocol, c, ln, cfg)
}

// ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled
Expand All @@ -96,3 +96,8 @@ type ReadPacket struct {
Data []byte
Addr *net.UDPAddr
}

type UnhandledPacket struct {
ReadPacket
Reason error
}
2 changes: 2 additions & 0 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (it *lookup) slowdown() {
func (it *lookup) query(n *node, reply chan<- []*node) {
fails := it.tab.db.FindFails(n.ID(), n.IP())
r, err := it.queryfunc(n)

if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
Expand All @@ -180,6 +181,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
for _, n := range r {
it.tab.addSeenNode(n)
}

reply <- r
}

Expand Down
Loading
Loading