From 9105afdc71b1769d9e23114bd88f66c90cb052fb Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 1 Nov 2023 18:18:41 +0000 Subject: [PATCH 01/26] Add refresh on shorter timer if discovery table is empty --- erigon-lib/diagnostics/provider.go | 49 ++++++++++++++++++------------ p2p/discover/table.go | 15 +++++++++ 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/erigon-lib/diagnostics/provider.go b/erigon-lib/diagnostics/provider.go index c1c2ae756c7..71431e92890 100644 --- a/erigon-lib/diagnostics/provider.go +++ b/erigon-lib/diagnostics/provider.go @@ -6,6 +6,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "github.com/ledgerwatch/erigon-lib/common/dbg" "github.com/ledgerwatch/log/v3" @@ -50,7 +51,7 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) { providerMutex.Lock() defer providerMutex.Unlock() - reg, _ := providers[infoType] + reg := providers[infoType] if reg != nil { for _, p := range reg.providers { @@ -73,13 +74,10 @@ func RegisterProvider(provider Provider, infoType Type, logger log.Logger) { func StartProviders(ctx context.Context, infoType Type, logger log.Logger) { providerMutex.Lock() - reg, _ := providers[infoType] + reg := providers[infoType] toStart := make([]Provider, len(reg.providers)) - - for i, provider := range reg.providers { - toStart[i] = provider - } + copy(toStart, reg.providers) reg.context = ctx @@ -107,16 +105,23 @@ func startProvider(ctx context.Context, infoType Type, provider Provider, logger func Send[I Info](ctx context.Context, info I) error { if ctx.Err() != nil { + if !errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return ctx.Err() } cval := ctx.Value(ckChan) - if c, ok := cval.(chan I); ok { - select { - case c <- info: - default: - // drop the diagnostic message if the receiver is busy - // so the sender is not blocked on non critcal actions + + if cp, ok := cval.(*atomic.Pointer[chan I]); ok { + if c := (*cp).Load(); c != nil { + select { + case *c <- info: + default: + // drop the diagnostic message if the receiver is busy + // so the sender is not blocked on non critcal actions + } } } else { return fmt.Errorf("unexpected channel type: %T", cval) @@ -126,16 +131,20 @@ func Send[I Info](ctx context.Context, info I) error { } func Context[I Info](ctx context.Context, buffer int) (context.Context, <-chan I, context.CancelFunc) { - ch := make(chan I, buffer) - ctx = context.WithValue(ctx, ckChan, ch) + c := make(chan I, buffer) + cp := atomic.Pointer[chan I]{} + cp.Store(&c) + + ctx = context.WithValue(ctx, ckChan, &cp) ctx, cancel := context.WithCancel(ctx) - return ctx, ch, func() { - if ch != nil { - toClose := ch - ch = nil - close(toClose) - } + return ctx, *cp.Load(), func() { cancel() + + if cp.CompareAndSwap(&c, nil) { + ch := c + c = nil + close(ch) + } } } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index eaa79403447..222ce319dc6 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -55,6 +55,7 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 + minRefreshInterval = 30 * time.Second refreshInterval = 30 * time.Minute revalidateInterval = 5 * time.Second copyNodesInterval = 30 * time.Second @@ -245,6 +246,14 @@ func (tab *Table) loop() { // Start initial refresh. go tab.doRefresh(refreshDone) + var minRefreshTimer *time.Timer + + defer func() { + if minRefreshTimer != nil { + minRefreshTimer.Stop() + } + }() + loop: for { select { @@ -270,6 +279,12 @@ loop: go tab.doRevalidate(revalidateDone) case <-revalidateDone: revalidate.Reset(tab.revalidateInterval) + if tab.len() == 0 && len(waiting) == 0 && minRefreshTimer == nil { + minRefreshTimer = time.AfterFunc(minRefreshInterval, func() { + minRefreshTimer = nil + tab.refresh() + }) + } revalidateDone = nil case <-copyNodes.C: go tab.copyLiveNodes() From df47dbc7c40a10c9328e4d742f40809120ca67b8 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 1 Nov 2023 20:29:46 +0000 Subject: [PATCH 02/26] added header request logging --- turbo/stages/headerdownload/header_algos.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index e7b8a29e8fe..f561bd6b035 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -8,13 +8,14 @@ import ( "encoding/base64" "errors" "fmt" - "github.com/ledgerwatch/erigon-lib/kv/dbutils" "io" "math/big" "sort" "strings" "time" + "github.com/ledgerwatch/erigon-lib/kv/dbutils" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/etl" "github.com/ledgerwatch/erigon-lib/kv" @@ -432,6 +433,8 @@ func (hd *HeaderDownload) requestMoreHeadersForPOS(currentTime time.Time) (timeo return } + hd.logger.Debug("[downloader] Request header", "numer", anchor.blockHeight-1, "length", 192) + // Request ancestors request = &HeaderRequest{ Anchor: anchor, @@ -482,7 +485,7 @@ func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime time.T func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { hd.lock.RLock() defer hd.lock.RUnlock() - hd.logger.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb) + var stride uint64 if hd.initialCycle { stride = 192 @@ -495,6 +498,9 @@ func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { } else { from-- } + + hd.logger.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb, "from", from, "length", length) + return &HeaderRequest{Number: from, Length: length, Skip: stride, Reverse: false} } From 0259fd92031dcc9dc10312cb936ae92f6131c454 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 1 Nov 2023 20:45:50 +0000 Subject: [PATCH 03/26] Log on request send --- eth/stagedsync/stage_headers.go | 2 ++ turbo/stages/headerdownload/header_algos.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index ea99b6c3943..e68fbc49e59 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -204,6 +204,7 @@ Loop: if req != nil { peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { + logger.Debug(fmt.Sprintf("[%s] Requested header", logPrefix), "from", req.Number, "length", req.Length) cfg.hd.UpdateStats(req, false /* skeleton */, peer) cfg.hd.UpdateRetryTime(req, currentTime, 5*time.Second /* timeout */) } @@ -233,6 +234,7 @@ Loop: if req != nil { peer, sentToPeer = cfg.headerReqSend(ctx, req) if sentToPeer { + logger.Debug(fmt.Sprintf("[%s] Requested skeleton", logPrefix), "from", req.Number, "length", req.Length) cfg.hd.UpdateStats(req, true /* skeleton */, peer) lastSkeletonTime = time.Now() } diff --git a/turbo/stages/headerdownload/header_algos.go b/turbo/stages/headerdownload/header_algos.go index f561bd6b035..c98b0a969e4 100644 --- a/turbo/stages/headerdownload/header_algos.go +++ b/turbo/stages/headerdownload/header_algos.go @@ -499,8 +499,6 @@ func (hd *HeaderDownload) RequestSkeleton() *HeaderRequest { from-- } - hd.logger.Debug("[downloader] Request skeleton", "anchors", len(hd.anchors), "highestInDb", hd.highestInDb, "from", from, "length", length) - return &HeaderRequest{Number: from, Length: length, Skip: stride, Reverse: false} } From 1ab225be10777fdf84766b7fd7266cb10bb732ac Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 1 Nov 2023 22:02:00 +0000 Subject: [PATCH 04/26] trace log peer messages --- p2p/peer.go | 19 +++++++++++++++---- p2p/peer_test.go | 2 +- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 0adf711d765..dfb40e034f7 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -223,7 +223,9 @@ func (p *Peer) Inbound() bool { } func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byte, metricsEnabled bool) *Peer { - protomap := matchProtocols(protocols, conn.caps, conn) + log := logger.New("id", conn.node.ID(), "conn", conn.flags) + + protomap := matchProtocols(protocols, conn.caps, conn, log) p := &Peer{ rw: conn, running: protomap, @@ -232,7 +234,7 @@ func newPeer(logger log.Logger, conn *conn, protocols []Protocol, pubkey [64]byt protoErr: make(chan *PeerError, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), pingRecv: make(chan struct{}, 16), - log: logger.New("id", conn.node.ID(), "conn", conn.flags), + log: log, pubkey: pubkey, metricsEnabled: metricsEnabled, CapBytesIn: make(map[string]uint64), @@ -438,7 +440,7 @@ func countMatchingProtocols(protocols []Protocol, caps []Cap) int { } // matchProtocols creates structures for matching named subprotocols. -func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter) map[string]*protoRW { +func matchProtocols(protocols []Protocol, caps []Cap, rw MsgReadWriter, logger log.Logger) map[string]*protoRW { sort.Sort(capsByNameAndVersion(caps)) offset := baseProtocolLength result := make(map[string]*protoRW) @@ -452,7 +454,7 @@ outer: offset -= old.Length } // Assign the new match - result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw} + result[cap.Name] = &protoRW{Protocol: proto, offset: offset, in: make(chan Msg), w: rw, logger: logger} offset += proto.Length continue outer @@ -506,6 +508,7 @@ type protoRW struct { werr chan<- error // for write results offset uint64 w MsgWriter + logger log.Logger } func (rw *protoRW) WriteMsg(msg Msg) (err error) { @@ -520,6 +523,13 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { select { case <-rw.wstart: err = rw.w.WriteMsg(msg) + + if err != nil { + rw.logger.Trace("Write failed", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size, "err", err) + } else { + rw.logger.Trace("Wrote", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size) + } + // Report write status back to Peer.run. It will initiate // shutdown if the error is non-nil and unblock the next write // otherwise. The calling protocol code should exit for errors @@ -536,6 +546,7 @@ func (rw *protoRW) ReadMsg() (Msg, error) { select { case msg := <-rw.in: msg.Code -= rw.offset + rw.logger.Trace("Read", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size) return msg, nil case <-rw.closed: return Msg{}, io.EOF diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 45b0e89f655..c8409764592 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -331,7 +331,7 @@ func TestMatchProtocols(t *testing.T) { } for i, tt := range tests { - result := matchProtocols(tt.Local, tt.Remote, nil) + result := matchProtocols(tt.Local, tt.Remote, nil, log.Root()) if len(result) != len(tt.Match) { t.Errorf("test %d: negotiation mismatch: have %v, want %v", i, len(result), len(tt.Match)) continue From fb2d2ab883901cc374d45362ecea4f715d6875c3 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 1 Nov 2023 22:13:42 +0000 Subject: [PATCH 05/26] sub message offset --- p2p/peer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index dfb40e034f7..734d0c19f45 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -525,9 +525,9 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { err = rw.w.WriteMsg(msg) if err != nil { - rw.logger.Trace("Write failed", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size, "err", err) + rw.logger.Trace("Write failed", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size, "err", err) } else { - rw.logger.Trace("Wrote", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size) + rw.logger.Trace("Wrote", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size) } // Report write status back to Peer.run. It will initiate From 13bd1d02581ecdf6f1e719a378d997f7377f9564 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Wed, 1 Nov 2023 23:57:56 +0000 Subject: [PATCH 06/26] Extend reply timeout for discovery --- p2p/discover/v4_udp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 38687292d3f..293d98127c6 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -48,7 +48,7 @@ var ( ) const ( - respTimeout = 500 * time.Millisecond + respTimeout = 1500 * time.Millisecond expiration = 20 * time.Second bondExpiration = 24 * time.Hour From b2a0c9825c11c2a514a864270831e06a56550885 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 13:04:20 +0000 Subject: [PATCH 07/26] Handle unsolicited responnses in v4 table --- p2p/discover/common.go | 5 +++++ p2p/discover/v4_udp.go | 44 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/p2p/discover/common.go b/p2p/discover/common.go index 6ee5c4c0bd1..f222bb36c53 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -96,3 +96,8 @@ type ReadPacket struct { Data []byte Addr *net.UDPAddr } + +type UnhandledPacket struct { + ReadPacket + Reason error +} diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 293d98127c6..5f71b431229 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -48,7 +48,7 @@ var ( ) const ( - respTimeout = 1500 * time.Millisecond + respTimeout = 750 * time.Millisecond expiration = 20 * time.Second bondExpiration = 24 * time.Hour @@ -77,6 +77,7 @@ type UDPv4 struct { addReplyMatcher chan *replyMatcher gotreply chan reply + unhandled chan UnhandledPacket replyTimeout time.Duration pingBackDelay time.Duration closeCtx context.Context @@ -141,6 +142,7 @@ func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) ( db: ln.Database(), gotreply: make(chan reply), addReplyMatcher: make(chan *replyMatcher), + unhandled: make(chan UnhandledPacket), replyTimeout: cfg.ReplyTimeout, pingBackDelay: cfg.PingBackDelay, closeCtx: closeCtx, @@ -159,7 +161,8 @@ func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) ( t.wg.Add(2) go t.loop() - go t.readLoop(cfg.Unhandled) + go t.readLoop() + go t.handleUnhandled(cfg.Unhandled) return t, nil } @@ -175,6 +178,11 @@ func (t *UDPv4) Close() { t.conn.Close() t.wg.Wait() t.tab.close() + if t.unhandled != nil { + ch := t.unhandled + t.unhandled = nil + close(ch) + } }) } @@ -542,12 +550,9 @@ func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet [] } // readLoop runs in its own goroutine. it handles incoming UDP packets. -func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { +func (t *UDPv4) readLoop() { defer t.wg.Done() defer debug.LogPanic() - if unhandled != nil { - defer close(unhandled) - } buf := make([]byte, maxPacketSize) for { @@ -563,15 +568,38 @@ func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { } return } - if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil { + if err := t.handlePacket(from, buf[:nbytes]); err != nil && t.unhandled != nil { select { - case unhandled <- ReadPacket{buf[:nbytes], from}: + case t.unhandled <- UnhandledPacket{ReadPacket: ReadPacket{buf[:nbytes], from}, Reason: err}: default: } } } } +func (t *UDPv4) handleUnhandled(unhandled chan<- ReadPacket) { + if unhandled != nil { + defer close(unhandled) + } + + for u := range t.unhandled { + switch { + case errors.Is(u.Reason, errUnsolicitedReply): + _, fromKey, _, err := v4wire.Decode(u.Data) + if err != nil { + fromId := enode.PubkeyEncoded(fromKey).ID() + + t.log.Trace("Unsolicited packet", "from", fromId, "addr", u.Addr) + t.sendPing(fromId, u.Addr, nil) + } + default: + if unhandled != nil { + unhandled <- u.ReadPacket + } + } + } +} + func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error { rawpacket, fromKey, hash, err := v4wire.Decode(buf) if err != nil { From 59e36aea3307c4ad27c9f0903a86453859682900 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 13:42:15 +0000 Subject: [PATCH 08/26] fix error condition --- p2p/discover/v4_udp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 5f71b431229..dc3b7cf570a 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -586,11 +586,13 @@ func (t *UDPv4) handleUnhandled(unhandled chan<- ReadPacket) { switch { case errors.Is(u.Reason, errUnsolicitedReply): _, fromKey, _, err := v4wire.Decode(u.Data) - if err != nil { + if err == nil { fromId := enode.PubkeyEncoded(fromKey).ID() t.log.Trace("Unsolicited packet", "from", fromId, "addr", u.Addr) t.sendPing(fromId, u.Addr, nil) + } else { + t.log.Trace("Unsolicited packet handling fialed", "addr", u.Addr, "err", err) } default: if unhandled != nil { From 734f52c47ac7823c3f4f5064e792fefeaad02845 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 14:24:26 +0000 Subject: [PATCH 09/26] added lookup for pinged key after unsolcited --- p2p/discover/v4_udp.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index dc3b7cf570a..8b1f9cc818a 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -590,7 +590,11 @@ func (t *UDPv4) handleUnhandled(unhandled chan<- ReadPacket) { fromId := enode.PubkeyEncoded(fromKey).ID() t.log.Trace("Unsolicited packet", "from", fromId, "addr", u.Addr) - t.sendPing(fromId, u.Addr, nil) + t.sendPing(fromId, u.Addr, func() { + if key, err := v4wire.DecodePubkey(crypto.S256(), fromKey); err == nil { + t.LookupPubkey(key) + } + }) } else { t.log.Trace("Unsolicited packet handling fialed", "addr", u.Addr, "err", err) } From 1c577a4b1604888ad467ef84b50ca763465bbd7a Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 16:53:03 +0000 Subject: [PATCH 10/26] print discovery stats --- p2p/discover/table.go | 46 +++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 222ce319dc6..cd7e4fa0640 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -55,13 +55,13 @@ const ( bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24 tableIPLimit, tableSubnet = 10, 24 - minRefreshInterval = 30 * time.Second - refreshInterval = 30 * time.Minute - revalidateInterval = 5 * time.Second - copyNodesInterval = 30 * time.Second - seedMinTableTime = 5 * time.Minute - seedCount = 30 - seedMaxAge = 5 * 24 * time.Hour + minRefreshInterval = 30 * time.Second + refreshInterval = 30 * time.Minute + revalidateInterval = 5 * time.Second + maintenanceInterval = 30 * time.Second + seedMinTableTime = 5 * time.Minute + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) // Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps @@ -231,17 +231,17 @@ func (tab *Table) refresh() <-chan struct{} { // loop schedules runs of doRefresh, doRevalidate and copyLiveNodes. func (tab *Table) loop() { var ( - revalidate = time.NewTimer(tab.revalidateInterval) - refresh = time.NewTicker(refreshInterval) - copyNodes = time.NewTicker(copyNodesInterval) - refreshDone = make(chan struct{}) // where doRefresh reports completion - revalidateDone chan struct{} // where doRevalidate reports completion - waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs + revalidate = time.NewTimer(tab.revalidateInterval) + refresh = time.NewTicker(refreshInterval) + tableMainenance = time.NewTicker(maintenanceInterval) + refreshDone = make(chan struct{}) // where doRefresh reports completion + revalidateDone chan struct{} // where doRevalidate reports completion + waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs ) defer debug.LogPanic() defer refresh.Stop() defer revalidate.Stop() - defer copyNodes.Stop() + defer tableMainenance.Stop() // Start initial refresh. go tab.doRefresh(refreshDone) @@ -286,7 +286,8 @@ loop: }) } revalidateDone = nil - case <-copyNodes.C: + case <-tableMainenance.C: + tab.log.Trace("[p2p] V4 Discovery table", "len", tab.len(), tab.live()) go tab.copyLiveNodes() case <-tab.closeReq: break loop @@ -459,6 +460,21 @@ func (tab *Table) len() (n int) { return n } +func (tab *Table) live() (n int) { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + for _, b := range &tab.buckets { + for _, e := range b.entries { + if e.livenessChecks > 0 { + n++ + } + } + } + + return n +} + // bucketLen returns the number of nodes in the bucket for the given ID. func (tab *Table) bucketLen(id enode.ID) int { tab.mutex.Lock() From 29646557a8a16359e23ee6440950dda570e41de3 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 17:16:52 +0000 Subject: [PATCH 11/26] adjust logs --- p2p/discover/table.go | 2 +- p2p/peer.go | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index cd7e4fa0640..29e1c89273c 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -287,7 +287,7 @@ loop: } revalidateDone = nil case <-tableMainenance.C: - tab.log.Trace("[p2p] V4 Discovery table", "len", tab.len(), tab.live()) + tab.log.Debug("[p2p] V4 Discovery table", "len", tab.len(), tab.live()) go tab.copyLiveNodes() case <-tab.closeReq: break loop diff --git a/p2p/peer.go b/p2p/peer.go index 734d0c19f45..43767f42786 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -511,6 +511,8 @@ type protoRW struct { logger log.Logger } +var traceMsg = false + func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return NewPeerError(PeerErrorInvalidMessageCode, DiscProtocolError, nil, fmt.Sprintf("not handled code=%d", msg.Code)) @@ -524,10 +526,12 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { case <-rw.wstart: err = rw.w.WriteMsg(msg) - if err != nil { - rw.logger.Trace("Write failed", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size, "err", err) - } else { - rw.logger.Trace("Wrote", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size) + if traceMsg { + if err != nil { + rw.logger.Trace("Write failed", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size, "err", err) + } else { + rw.logger.Trace("Wrote", "cap", rw.cap(), "msg", msg.Code-rw.offset, "size", msg.Size) + } } // Report write status back to Peer.run. It will initiate @@ -546,7 +550,9 @@ func (rw *protoRW) ReadMsg() (Msg, error) { select { case msg := <-rw.in: msg.Code -= rw.offset - rw.logger.Trace("Read", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size) + if traceMsg { + rw.logger.Trace("Read", "cap", rw.cap(), "msg", msg.Code, "size", msg.Size) + } return msg, nil case <-rw.closed: return Msg{}, io.EOF From d799a5c4a40627d3a04fd95cb91c2173a64c380c Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 17:30:18 +0000 Subject: [PATCH 12/26] add log desc --- p2p/discover/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 29e1c89273c..fc26cc917b5 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -287,7 +287,7 @@ loop: } revalidateDone = nil case <-tableMainenance.C: - tab.log.Debug("[p2p] V4 Discovery table", "len", tab.len(), tab.live()) + tab.log.Debug("[p2p] Discovery table", "len", tab.len(), "live", tab.live()) go tab.copyLiveNodes() case <-tab.closeReq: break loop From 4ce93da7da51adc769ac25d25b81a37e52c1c1f3 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 17:36:36 +0000 Subject: [PATCH 13/26] added refresh on maint --- p2p/discover/table.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index fc26cc917b5..a80d07c4afd 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -287,8 +287,14 @@ loop: } revalidateDone = nil case <-tableMainenance.C: + live := tab.live() tab.log.Debug("[p2p] Discovery table", "len", tab.len(), "live", tab.live()) - go tab.copyLiveNodes() + if live == 0 { + tab.seedRand() + tab.refresh() + } else { + go tab.copyLiveNodes() + } case <-tab.closeReq: break loop } From b65b5cfd3fe4ce7b615069faf1739c86f72c00fe Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 17:50:20 +0000 Subject: [PATCH 14/26] revalidate on maint --- p2p/discover/table.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index a80d07c4afd..a46494312af 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -275,13 +275,16 @@ loop: } waiting, refreshDone = nil, nil case <-revalidate.C: - revalidateDone = make(chan struct{}) - go tab.doRevalidate(revalidateDone) + if revalidateDone == nil { + revalidateDone = make(chan struct{}) + go tab.doRevalidate(revalidateDone) + } case <-revalidateDone: revalidate.Reset(tab.revalidateInterval) - if tab.len() == 0 && len(waiting) == 0 && minRefreshTimer == nil { + if tab.live() == 0 && len(waiting) == 0 && minRefreshTimer == nil { minRefreshTimer = time.AfterFunc(minRefreshInterval, func() { minRefreshTimer = nil + tab.seedRand() tab.refresh() }) } @@ -289,9 +292,11 @@ loop: case <-tableMainenance.C: live := tab.live() tab.log.Debug("[p2p] Discovery table", "len", tab.len(), "live", tab.live()) - if live == 0 { - tab.seedRand() - tab.refresh() + if live != 0 { + if revalidateDone == nil { + revalidateDone = make(chan struct{}) + go tab.doRevalidate(revalidateDone) + } } else { go tab.copyLiveNodes() } From f5c74be9219cdcb05748e95d2c46d3cbecdbf302 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 18:03:47 +0000 Subject: [PATCH 15/26] lookup rand in maint --- p2p/discover/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index a46494312af..c0d60fdea4f 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -284,7 +284,7 @@ loop: if tab.live() == 0 && len(waiting) == 0 && minRefreshTimer == nil { minRefreshTimer = time.AfterFunc(minRefreshInterval, func() { minRefreshTimer = nil - tab.seedRand() + tab.net.lookupRandom() tab.refresh() }) } From c65148cd764515e852325d30a4ee8c94f7df0393 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 19:36:29 +0000 Subject: [PATCH 16/26] remove tmp wriggle --- consensus/bor/bor.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/consensus/bor/bor.go b/consensus/bor/bor.go index 103e9d32aa3..b9434fa9e2b 100644 --- a/consensus/bor/bor.go +++ b/consensus/bor/bor.go @@ -1214,12 +1214,6 @@ func (c *Bor) Seal(chain consensus.ChainHeaderReader, block *types.Block, result // wiggle was already accounted for in header.Time, this is just for logging wiggle := time.Duration(successionNumber) * time.Duration(c.config.CalculateBackupMultiplier(number)) * time.Second - // temp for testing - if wiggle > 0 { - wiggle = 500 * time.Millisecond - } - // temp for testing - // Sign all the things! sighash, err := signFn(signer, accounts.MimetypeBor, BorRLP(header, c.config)) if err != nil { From 796b8b943a86f726adfc253a2ef1b77e3c8a45d2 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Thu, 2 Nov 2023 21:02:22 +0000 Subject: [PATCH 17/26] added node error counts --- p2p/discover/node.go | 8 +++++++- p2p/discover/table.go | 44 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 59c20b1bdb3..7ec70336789 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -29,10 +29,11 @@ type node struct { enode.Node addedAt time.Time // time when the node was added to the table livenessChecks uint // how often liveness was checked + errors map[string]int } func wrapNode(n *enode.Node) *node { - return &node{Node: *n} + return &node{Node: *n, errors: map[string]int{}} } func wrapNodes(ns []*enode.Node) []*node { @@ -59,6 +60,11 @@ func (n *node) addr() *net.UDPAddr { return &net.UDPAddr{IP: n.IP(), Port: n.UDP()} } +func (n *node) addError(err error) { + str := err.Error() + n.errors[str] = n.errors[str] + 1 +} + func (n *node) String() string { return n.Node.String() } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index c0d60fdea4f..862b89d121f 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -291,7 +291,16 @@ loop: revalidateDone = nil case <-tableMainenance.C: live := tab.live() - tab.log.Debug("[p2p] Discovery table", "len", tab.len(), "live", tab.live()) + errs := tab.errors() + + vals := []interface{}{"len", tab.len(), "live", tab.live()} + + for err, count := range errs { + vals = append(vals, err, count) + } + + tab.log.Debug("[p2p] Discovery table", vals...) + if live != 0 { if revalidateDone == nil { revalidateDone = make(chan struct{}) @@ -370,11 +379,14 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { remoteSeq, rErr := tab.net.ping(unwrapNode(last)) // Also fetch record if the node replied and returned a higher sequence number. - if last.Seq() < remoteSeq { - if n, err := tab.net.RequestENR(unwrapNode(last)); err != nil { - tab.log.Trace("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) - } else { - last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} + if rErr == nil { + if last.Seq() < remoteSeq { + if n, err := tab.net.RequestENR(unwrapNode(last)); err != nil { + rErr = err + tab.log.Trace("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) + } else { + last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks, errors: last.errors} + } } } @@ -387,7 +399,10 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { tab.log.Trace("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks) tab.bumpInBucket(b, last) return + } else { + last.addError(rErr) } + // No reply received, pick a replacement or delete the node if there aren't // any replacements. if r := tab.replace(b, last); r != nil { @@ -486,6 +501,23 @@ func (tab *Table) live() (n int) { return n } +func (tab *Table) errors() map[string]int { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + errs := map[string]int{} + + for _, b := range &tab.buckets { + for _, e := range b.entries { + for err, count := range e.errors { + errs[err] = errs[err] + count + } + } + } + + return errs +} + // bucketLen returns the number of nodes in the bucket for the given ID. func (tab *Table) bucketLen(id enode.ID) int { tab.mutex.Lock() From ad2ab7d90c2cc268eb7609c9acadea7a1e53aebb Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Fri, 3 Nov 2023 20:37:09 +0000 Subject: [PATCH 18/26] added summary discovery logging --- cl/sentinel/sentinel.go | 2 +- cmd/bootnode/main.go | 4 +- cmd/observer/observer/server.go | 2 +- p2p/dial.go | 33 +++++++++----- p2p/discover/common.go | 4 +- p2p/discover/node.go | 8 +--- p2p/discover/table.go | 77 +++++++++++++++++-------------- p2p/discover/table_util_test.go | 4 +- p2p/discover/v4_udp.go | 80 ++++++++++++++++++++++----------- p2p/discover/v4_udp_test.go | 4 +- p2p/discover/v5_udp.go | 18 ++++++-- p2p/discover/v5_udp_test.go | 4 +- p2p/server.go | 42 +++++++++++++++-- 13 files changed, 186 insertions(+), 96 deletions(-) diff --git a/cl/sentinel/sentinel.go b/cl/sentinel/sentinel.go index 51fab5101fb..839906fb1bb 100644 --- a/cl/sentinel/sentinel.go +++ b/cl/sentinel/sentinel.go @@ -168,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) { // Start stream handlers handlers.NewConsensusHandlers(s.ctx, s.db, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start() - net, err := discover.ListenV5(s.ctx, conn, localNode, discCfg) + net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg) if err != nil { return nil, err } diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 7339cf06ab1..eedde266ad4 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -131,11 +131,11 @@ func main() { } if *runv5 { - if _, err := discover.ListenV5(ctx, conn, ln, cfg); err != nil { + if _, err := discover.ListenV5(ctx, "any", conn, ln, cfg); err != nil { utils.Fatalf("%v", err) } } else { - if _, err := discover.ListenUDP(ctx, conn, ln, cfg); err != nil { + if _, err := discover.ListenUDP(ctx, "any", conn, ln, cfg); err != nil { utils.Fatalf("%v", err) } } diff --git a/cmd/observer/observer/server.go b/cmd/observer/observer/server.go index 4c017f3379d..99c2cb4bbc2 100644 --- a/cmd/observer/observer/server.go +++ b/cmd/observer/observer/server.go @@ -183,5 +183,5 @@ func (server *Server) Listen(ctx context.Context) (*discover.UDPv4, error) { server.logger.Debug("Discovery UDP listener is up", "addr", realAddr) - return discover.ListenV4(ctx, conn, server.localNode, server.discConfig) + return discover.ListenV4(ctx, "any", conn, server.localNode, server.discConfig) } diff --git a/p2p/dial.go b/p2p/dial.go index cadb821d5ef..83053bce19a 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -43,7 +43,6 @@ const ( // Config for the "Looking for peers" message. dialStatsLogInterval = 60 * time.Second // printed at most this often - dialStatsPeerLimit = 20 // but not if more than this many dialed peers // Endpoint resolution is throttled with bounded backoff. initialResolveDelay = 60 * time.Second @@ -126,8 +125,8 @@ type dialScheduler struct { historyTimerTime mclock.AbsTime // for logStats - lastStatsLog mclock.AbsTime doneSinceLastLog int + errors map[string]uint } type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error @@ -177,8 +176,9 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF remPeerCh: make(chan *conn), subProtocolVersion: subProtocolVersion, + errors: map[string]uint{}, } - d.lastStatsLog = d.clock.Now() + d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(2) go d.readNodes(it) @@ -232,6 +232,9 @@ func (d *dialScheduler) loop(it enode.Iterator) { historyExp = make(chan struct{}, 1) ) + logTimer := time.NewTicker(dialStatsLogInterval) + defer logTimer.Stop() + loop: for { // Launch new dials if slots are available. @@ -243,13 +246,15 @@ loop: nodesCh = nil } d.rearmHistoryTimer(historyExp) - //d.logStats() select { case <-d.ctx.Done(): it.Close() break loop + case <-logTimer.C: + d.logStats() + case node := <-nodesCh: if err := d.checkDial(node); err != nil { d.log.Trace("Discarding dial candidate", "id", node.ID(), "ip", node.IP(), "reason", err) @@ -337,15 +342,16 @@ func (d *dialScheduler) readNodes(it enode.Iterator) { // or comes back online. // nolint func (d *dialScheduler) logStats() { - now := d.clock.Now() - if d.lastStatsLog.Add(dialStatsLogInterval) > now { - return - } - if d.dialPeers < dialStatsPeerLimit && d.dialPeers < d.maxDialPeers { - d.log.Info("[p2p] Looking for peers", "protocol", d.subProtocolVersion, "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.doneSinceLastLog, "static", len(d.static)) + vals := []interface{}{"protocol", d.subProtocolVersion, + "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.doneSinceLastLog, "static", len(d.static)} + + for err, count := range d.errors { + vals = append(vals, err, count) } + + d.log.Debug("[p2p] Dial scheduler", vals...) + d.doneSinceLastLog = 0 - d.lastStatsLog = now } // rearmHistoryTimer configures d.historyTimer to fire when the @@ -543,7 +549,10 @@ func (t *dialTask) resolve(d *dialScheduler) bool { func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error { fd, err := d.dialer.Dial(d.ctx, t.dest) if err != nil { - d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err)) + cleanErr := cleanupDialErr(err) + d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanErr) + + d.errors[cleanErr.Error()] = d.errors[cleanErr.Error()] + 1 return &dialError{err} } mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) diff --git a/p2p/discover/common.go b/p2p/discover/common.go index f222bb36c53..da45e7b6d0d 100644 --- a/p2p/discover/common.go +++ b/p2p/discover/common.go @@ -86,8 +86,8 @@ func (cfg Config) withDefaults(defaultReplyTimeout time.Duration) Config { } // ListenUDP starts listening for discovery packets on the given UDP socket. -func ListenUDP(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { - return ListenV4(ctx, c, ln, cfg) +func ListenUDP(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { + return ListenV4(ctx, protocol, c, ln, cfg) } // ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 7ec70336789..59c20b1bdb3 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -29,11 +29,10 @@ type node struct { enode.Node addedAt time.Time // time when the node was added to the table livenessChecks uint // how often liveness was checked - errors map[string]int } func wrapNode(n *enode.Node) *node { - return &node{Node: *n, errors: map[string]int{}} + return &node{Node: *n} } func wrapNodes(ns []*enode.Node) []*node { @@ -60,11 +59,6 @@ func (n *node) addr() *net.UDPAddr { return &net.UDPAddr{IP: n.IP(), Port: n.UDP()} } -func (n *node) addError(err error) { - str := err.Error() - n.errors[str] = n.errors[str] + 1 -} - func (n *node) String() string { return n.Node.String() } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 862b89d121f..f26f5adbe58 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -58,7 +58,7 @@ const ( minRefreshInterval = 30 * time.Second refreshInterval = 30 * time.Minute revalidateInterval = 5 * time.Second - maintenanceInterval = 30 * time.Second + maintenanceInterval = 60 * time.Second seedMinTableTime = 5 * time.Minute seedCount = 30 seedMaxAge = 5 * 24 * time.Hour @@ -85,6 +85,12 @@ type Table struct { closed chan struct{} nodeAddedHook func(*node) // for testing + + // diagnostics + errors map[string]uint + dbseeds int + revalidates int + protocol string } // transport is implemented by the UDP transports. @@ -94,6 +100,8 @@ type transport interface { lookupRandom() []*enode.Node lookupSelf() []*enode.Node ping(*enode.Node) (seq uint64, err error) + Version() string + Errors() map[string]uint } // bucket contains nodes, ordered by their last activity. the entry @@ -106,24 +114,25 @@ type bucket struct { func newTable( t transport, + protocol string, db *enode.DB, bootnodes []*enode.Node, revalidateInterval time.Duration, logger log.Logger, ) (*Table, error) { tab := &Table{ - net: t, - db: db, - refreshReq: make(chan chan struct{}), - initDone: make(chan struct{}), - closeReq: make(chan struct{}), - closed: make(chan struct{}), - rand: mrand.New(mrand.NewSource(0)), // nolint: gosec - ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, - + net: t, + db: db, + refreshReq: make(chan chan struct{}), + initDone: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), + rand: mrand.New(mrand.NewSource(0)), // nolint: gosec + ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit}, + errors: map[string]uint{}, revalidateInterval: revalidateInterval, - - log: logger, + protocol: protocol, + log: logger, } if err := tab.setFallbackNodes(bootnodes); err != nil { return nil, err @@ -291,13 +300,20 @@ loop: revalidateDone = nil case <-tableMainenance.C: live := tab.live() - errs := tab.errors() - vals := []interface{}{"len", tab.len(), "live", tab.live()} + vals := []interface{}{"protocol", tab.protocol, "version", tab.net.Version(), "len", tab.len(), "live", tab.live(), "ips", tab.ips.Len(), "db", tab.dbseeds, "reval", tab.revalidates} - for err, count := range errs { - vals = append(vals, err, count) - } + func() { + tab.mutex.Lock() + defer tab.mutex.Unlock() + for err, count := range tab.errors { + vals = append(vals, err, count) + } + + for err, count := range tab.net.Errors() { + vals = append(vals, err, count) + } + }() tab.log.Debug("[p2p] Discovery table", vals...) @@ -352,7 +368,10 @@ func (tab *Table) doRefresh(done chan struct{}) { } func (tab *Table) loadSeedNodes() { - seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge)) + dbseeds := tab.db.QuerySeeds(seedCount, seedMaxAge) + tab.dbseeds = len(dbseeds) + + seeds := wrapNodes(dbseeds) tab.log.Debug("QuerySeeds read nodes from the node DB", "count", len(seeds)) seeds = append(seeds, tab.nursery...) for i := range seeds { @@ -369,6 +388,8 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { defer debug.LogPanic() defer func() { done <- struct{}{} }() + tab.revalidates++ + last, bi := tab.nodeToRevalidate() if last == nil { // No non-empty bucket found. @@ -385,7 +406,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { rErr = err tab.log.Trace("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err) } else { - last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks, errors: last.errors} + last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks} } } } @@ -400,7 +421,7 @@ func (tab *Table) doRevalidate(done chan<- struct{}) { tab.bumpInBucket(b, last) return } else { - last.addError(rErr) + tab.addError(rErr) } // No reply received, pick a replacement or delete the node if there aren't @@ -501,21 +522,11 @@ func (tab *Table) live() (n int) { return n } -func (tab *Table) errors() map[string]int { +func (tab *Table) addError(err error) { tab.mutex.Lock() defer tab.mutex.Unlock() - - errs := map[string]int{} - - for _, b := range &tab.buckets { - for _, e := range b.entries { - for err, count := range e.errors { - errs[err] = errs[err] + count - } - } - } - - return errs + str := err.Error() + tab.errors[str] = tab.errors[str] + 1 } // bucketLen returns the number of nodes in the bucket for the given ID. diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 50cff8aebe4..14b473e40fa 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -48,7 +48,7 @@ func newTestTable(t transport, tmpDir string) (*Table, *enode.DB) { if err != nil { panic(err) } - tab, _ := newTable(t, db, nil, time.Hour, log.Root()) + tab, _ := newTable(t, "test", db, nil, time.Hour, log.Root()) go tab.loop() return tab, db } @@ -156,6 +156,8 @@ func (t *pingRecorder) updateRecord(n *enode.Node) { // Stubs to satisfy the transport interface. func (t *pingRecorder) Self() *enode.Node { return nullNode } +func (t *pingRecorder) Version() string { return "none" } +func (t *pingRecorder) Errors() map[string]uint { return nil } func (t *pingRecorder) lookupSelf() []*enode.Node { return nil } func (t *pingRecorder) lookupRandom() []*enode.Node { return nil } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 8b1f9cc818a..04351576899 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -47,6 +47,12 @@ var ( errLowPort = errors.New("low port") ) +var ( + errExpiredStr = errExpired.Error() + errUnsolicitedReplyStr = errUnsolicitedReply.Error() + errUnknownNodeStr = errUnknownNode.Error() +) + const ( respTimeout = 750 * time.Millisecond expiration = 20 * time.Second @@ -75,14 +81,14 @@ type UDPv4 struct { closeOnce sync.Once wg sync.WaitGroup - addReplyMatcher chan *replyMatcher - gotreply chan reply - unhandled chan UnhandledPacket - replyTimeout time.Duration - pingBackDelay time.Duration - closeCtx context.Context - cancelCloseCtx context.CancelFunc - + addReplyMatcher chan *replyMatcher + gotreply chan reply + unhandled chan UnhandledPacket + replyTimeout time.Duration + pingBackDelay time.Duration + closeCtx context.Context + cancelCloseCtx context.CancelFunc + errors map[string]uint privateKeyGenerator func() (*ecdsa.PrivateKey, error) } @@ -131,28 +137,28 @@ type reply struct { matched chan<- bool } -func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { +func ListenV4(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { cfg = cfg.withDefaults(respTimeout) closeCtx, cancel := context.WithCancel(ctx) t := &UDPv4{ - conn: c, - priv: cfg.PrivateKey, - netrestrict: cfg.NetRestrict, - localNode: ln, - db: ln.Database(), - gotreply: make(chan reply), - addReplyMatcher: make(chan *replyMatcher), - unhandled: make(chan UnhandledPacket), - replyTimeout: cfg.ReplyTimeout, - pingBackDelay: cfg.PingBackDelay, - closeCtx: closeCtx, - cancelCloseCtx: cancel, - log: cfg.Log, - + conn: c, + priv: cfg.PrivateKey, + netrestrict: cfg.NetRestrict, + localNode: ln, + db: ln.Database(), + gotreply: make(chan reply), + addReplyMatcher: make(chan *replyMatcher), + unhandled: make(chan UnhandledPacket), + replyTimeout: cfg.ReplyTimeout, + pingBackDelay: cfg.PingBackDelay, + closeCtx: closeCtx, + cancelCloseCtx: cancel, + log: cfg.Log, + errors: map[string]uint{}, privateKeyGenerator: cfg.PrivateKeyGenerator, } - tab, err := newTable(t, ln.Database(), cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) + tab, err := newTable(t, protocol, ln.Database(), cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) if err != nil { return nil, err } @@ -171,6 +177,14 @@ func (t *UDPv4) Self() *enode.Node { return t.localNode.Node() } +func (t *UDPv4) Version() string { + return "v4" +} + +func (t *UDPv4) Errors() map[string]uint { + return t.errors +} + // Close shuts down the socket and aborts any running queries. func (t *UDPv4) Close() { t.closeOnce.Do(func() { @@ -711,9 +725,11 @@ func (t *UDPv4) verifyPing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I senderKey, err := v4wire.DecodePubkey(crypto.S256(), fromKey) if err != nil { + t.errors[err.Error()] = t.errors[err.Error()] + 1 return err } if v4wire.Expired(req.Expiration) { + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } h.senderKey = senderKey @@ -753,9 +769,11 @@ func (t *UDPv4) verifyPong(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I req := h.Packet.(*v4wire.Pong) if v4wire.Expired(req.Expiration) { + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } if !t.handleReply(fromID, from.IP, req) { + t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 return errUnsolicitedReply } t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) @@ -769,6 +787,7 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno req := h.Packet.(*v4wire.Findnode) if v4wire.Expired(req.Expiration) { + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } if !t.checkBond(fromID, from.IP) { @@ -778,6 +797,7 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno // and UDP port of the target as the source address. The recipient of the findnode // packet would then send a neighbors packet (which is a much bigger packet than // findnode) to the victim. + t.errors[errUnknownNodeStr] = t.errors[errUnknownNodeStr] + 1 return errUnknownNode } return nil @@ -815,9 +835,11 @@ func (t *UDPv4) verifyNeighbors(h *packetHandlerV4, from *net.UDPAddr, fromID en req := h.Packet.(*v4wire.Neighbors) if v4wire.Expired(req.Expiration) { + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } if !t.handleReply(fromID, from.IP, h.Packet) { + t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 return errUnsolicitedReply } return nil @@ -829,26 +851,32 @@ func (t *UDPv4) verifyENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e req := h.Packet.(*v4wire.ENRRequest) if v4wire.Expired(req.Expiration) { + t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } if !t.checkBond(fromID, from.IP) { + t.errors[errUnknownNodeStr] = t.errors[errUnknownNodeStr] + 1 return errUnknownNode } return nil } func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, mac []byte) { - //nolint:errcheck - t.send(from, fromID, &v4wire.ENRResponse{ + _, err := t.send(from, fromID, &v4wire.ENRResponse{ ReplyTok: mac, Record: *t.localNode.Node().Record(), }) + + if err != nil { + t.errors[err.Error()] = t.errors[err.Error()] + 1 + } } // ENRRESPONSE/v4 func (t *UDPv4) verifyENRResponse(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { if !t.handleReply(fromID, from.IP, h.Packet) { + t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 return errUnsolicitedReply } return nil diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 289bd2715e0..44118a58858 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -87,7 +87,7 @@ func newUDPTestContext(ctx context.Context, t *testing.T, logger log.Logger) *ud panic(err) } ln := enode.NewLocalNode(test.db, test.localkey, logger) - test.udp, err = ListenV4(ctx, test.pipe, ln, Config{ + test.udp, err = ListenV4(ctx, "test", test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), @@ -643,7 +643,7 @@ func startLocalhostV4(ctx context.Context, t *testing.T, cfg Config, logger log. realaddr := socket.LocalAddr().(*net.UDPAddr) ln.SetStaticIP(realaddr.IP) ln.SetFallbackUDP(realaddr.Port) - udp, err := ListenV4(ctx, socket, ln, cfg) + udp, err := ListenV4(ctx, "test", socket, ln, cfg) if err != nil { t.Fatal(err) } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 686bd267879..c5c44bb074b 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -97,6 +97,7 @@ type UDPv5 struct { closeCtx context.Context cancelCloseCtx context.CancelFunc wg sync.WaitGroup + errors map[string]uint } // TalkRequestHandler callback processes a talk request and optionally returns a reply @@ -125,8 +126,8 @@ type callTimeout struct { } // ListenV5 listens on the given connection. -func ListenV5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { - t, err := newUDPv5(ctx, conn, ln, cfg) +func ListenV5(ctx context.Context, protocol string, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { + t, err := newUDPv5(ctx, protocol, conn, ln, cfg) if err != nil { return nil, err } @@ -138,7 +139,7 @@ func ListenV5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config } // newUDPv5 creates a UDPv5 transport, but doesn't start any goroutines. -func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { +func newUDPv5(ctx context.Context, protocol string, conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { closeCtx, cancelCloseCtx := context.WithCancel(ctx) cfg = cfg.withDefaults(respTimeoutV5) t := &UDPv5{ @@ -167,8 +168,9 @@ func newUDPv5(ctx context.Context, conn UDPConn, ln *enode.LocalNode, cfg Config // shutdown closeCtx: closeCtx, cancelCloseCtx: cancelCloseCtx, + errors: map[string]uint{}, } - tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) + tab, err := newTable(t, protocol, t.db, cfg.Bootnodes, cfg.TableRevalidateInterval, cfg.Log) if err != nil { return nil, err } @@ -181,6 +183,14 @@ func (t *UDPv5) Self() *enode.Node { return t.localNode.Node() } +func (t *UDPv5) Version() string { + return "v5" +} + +func (t *UDPv5) Errors() map[string]uint { + return t.errors +} + // Close shuts down packet processing. func (t *UDPv5) Close() { t.closeOnce.Do(func() { diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index 09c8a21107c..5ca080e0435 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -67,7 +67,7 @@ func startLocalhostV5(t *testing.T, cfg Config, logger log.Logger) *UDPv5 { ln.SetFallbackUDP(realaddr.Port) ctx := context.Background() ctx = disableLookupSlowdown(ctx) - udp, err := ListenV5(ctx, socket, ln, cfg) + udp, err := ListenV5(ctx, "test", socket, ln, cfg) if err != nil { t.Fatal(err) } @@ -581,7 +581,7 @@ func newUDPV5TestContext(ctx context.Context, t *testing.T, logger log.Logger) * ln := enode.NewLocalNode(test.db, test.localkey, logger) ln.SetStaticIP(net.IP{10, 0, 0, 1}) ln.Set(enr.UDP(30303)) - test.udp, err = ListenV5(ctx, test.pipe, ln, Config{ + test.udp, err = ListenV5(ctx, "test", test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), ValidSchemes: enode.ValidSchemesForTesting, diff --git a/p2p/server.go b/p2p/server.go index b9b3ed456a5..3066cb0de6e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,7 @@ import ( "net" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -68,6 +69,8 @@ const ( // Maximum amount of time allowed for writing a complete message. frameWriteTimeout = 20 * time.Second + + serverStatsLogInterval = 60 * time.Second ) var errServerStopped = errors.New("server stopped") @@ -229,6 +232,7 @@ type Server struct { // State of run loop and listenLoop. inboundHistory expHeap + errors map[string]uint } type peerOpFunc func(map[enode.ID]*Peer) @@ -651,7 +655,7 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { Unhandled: unhandled, Log: srv.logger, } - ntab, err := discover.ListenV4(ctx, conn, srv.localnode, cfg) + ntab, err := discover.ListenV4(ctx, fmt.Sprint(srv.Config.Protocols[0].Version), conn, srv.localnode, cfg) if err != nil { return err } @@ -669,9 +673,9 @@ func (srv *Server) setupDiscovery(ctx context.Context) error { } var err error if sconn != nil { - srv.DiscV5, err = discover.ListenV5(ctx, sconn, srv.localnode, cfg) + srv.DiscV5, err = discover.ListenV5(ctx, fmt.Sprint(srv.Config.Protocols[0].Version), sconn, srv.localnode, cfg) } else { - srv.DiscV5, err = discover.ListenV5(ctx, conn, srv.localnode, cfg) + srv.DiscV5, err = discover.ListenV5(ctx, fmt.Sprint(srv.Config.Protocols[0].Version), conn, srv.localnode, cfg) } if err != nil { return err @@ -789,6 +793,9 @@ func (srv *Server) run() { trusted[n.ID()] = true } + logTimer := time.NewTicker(serverStatsLogInterval) + defer logTimer.Stop() + running: for { select { @@ -852,6 +859,18 @@ running: if pd.Inbound() { inboundCount-- } + case <-logTimer.C: + vals := []interface{}{"protocol", srv.Config.Protocols[0].Version, "peers", len(peers), "trusted", len(trusted), "inbound", inboundCount} + + func() { + srv.lock.Lock() + defer srv.lock.Unlock() + for err, count := range srv.errors { + vals = append(vals, err, count) + } + }() + + srv.logger.Debug("[p2p] Server", vals...) } } @@ -903,6 +922,8 @@ func (srv *Server) listenLoop(ctx context.Context) { // The slots limit accepts of new connections. slots := semaphore.NewWeighted(int64(srv.MaxPendingPeers)) + srv.errors = map[string]uint{} + // Wait for slots to be returned on exit. This ensures all connection goroutines // are down before listenLoop returns. defer func() { @@ -1005,6 +1026,17 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) return err } +func cleanError(err string) string { + switch { + case strings.HasSuffix(err, "i/o timeout"): + return "i/o timeout" + case strings.HasSuffix(err, "closed by the remote host."): + return "closed by remote" + default: + return err + } +} + func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() @@ -1028,6 +1060,8 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Run the RLPx handshake. remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { + errStr := cleanError(err.Error()) + srv.errors[errStr] = srv.errors[errStr] + 1 srv.logger.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } @@ -1047,6 +1081,8 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Run the capability negotiation handshake. phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { + errStr := cleanError(err.Error()) + srv.errors[errStr] = srv.errors[errStr] + 1 clog.Trace("Failed p2p handshake", "err", err) return err } From 4241fec5f536b338013566cc08e9a945b4a42fd6 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sun, 5 Nov 2023 12:46:52 +0000 Subject: [PATCH 19/26] updated v4_udp to handle timeouts without hanging --- p2p/discover/lookup.go | 2 + p2p/discover/table.go | 8 +- p2p/discover/v4_udp.go | 335 ++++++++++++++++++++++++------------ p2p/discover/v4_udp_test.go | 2 +- p2p/server.go | 2 + 5 files changed, 237 insertions(+), 112 deletions(-) diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index 0e03daa30f2..87ba2c2d55e 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -155,6 +155,7 @@ func (it *lookup) slowdown() { func (it *lookup) query(n *node, reply chan<- []*node) { fails := it.tab.db.FindFails(n.ID(), n.IP()) r, err := it.queryfunc(n) + if err == errClosed { // Avoid recording failures on shutdown. reply <- nil @@ -180,6 +181,7 @@ func (it *lookup) query(n *node, reply chan<- []*node) { for _, n := range r { it.tab.addSeenNode(n) } + reply <- r } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index f26f5adbe58..c0011727a5c 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -157,8 +157,8 @@ func (tab *Table) seedRand() { crand.Read(b[:]) tab.mutex.Lock() + defer tab.mutex.Unlock() tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:]))) - tab.mutex.Unlock() } // ReadRandomNodes fills the given slice with random nodes from the table. The results @@ -167,6 +167,7 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) { if !tab.isInitDone() { return 0 } + tab.mutex.Lock() defer tab.mutex.Unlock() @@ -306,6 +307,7 @@ loop: func() { tab.mutex.Lock() defer tab.mutex.Unlock() + for err, count := range tab.errors { vals = append(vals, err, count) } @@ -523,8 +525,6 @@ func (tab *Table) live() (n int) { } func (tab *Table) addError(err error) { - tab.mutex.Lock() - defer tab.mutex.Unlock() str := err.Error() tab.errors[str] = tab.errors[str] + 1 } @@ -562,6 +562,7 @@ func (tab *Table) addSeenNode(n *node) { tab.mutex.Lock() defer tab.mutex.Unlock() + b := tab.bucket(n.ID()) if contains(b.entries, n.ID()) { // Already in bucket, don't add. @@ -604,6 +605,7 @@ func (tab *Table) addVerifiedNode(n *node) { tab.mutex.Lock() defer tab.mutex.Unlock() + b := tab.bucket(n.ID()) if tab.bumpInBucket(b, n) { // Already in bucket, moved to front. diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 04351576899..faf75bc2f93 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -28,6 +28,7 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/ledgerwatch/erigon/common/debug" "github.com/ledgerwatch/erigon/crypto" "github.com/ledgerwatch/erigon/p2p/discover/v4wire" @@ -83,12 +84,14 @@ type UDPv4 struct { addReplyMatcher chan *replyMatcher gotreply chan reply - unhandled chan UnhandledPacket + gotkey chan v4wire.Pubkey + gotnodes chan nodes replyTimeout time.Duration pingBackDelay time.Duration closeCtx context.Context cancelCloseCtx context.CancelFunc errors map[string]uint + unsolicitedNodes *lru.Cache[enode.ID, *enode.Node] privateKeyGenerator func() (*ecdsa.PrivateKey, error) } @@ -105,6 +108,7 @@ type replyMatcher struct { // these fields must match in the reply. from enode.ID ip net.IP + port int ptype byte // time when the request must complete @@ -131,30 +135,40 @@ type replyMatchFunc func(v4wire.Packet) (matched bool, requestDone bool) type reply struct { from enode.ID ip net.IP + port int data v4wire.Packet // loop indicates whether there was // a matching request by sending on this channel. matched chan<- bool } +type nodes struct { + addr *net.UDPAddr + nodes []v4wire.Node +} + func ListenV4(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { cfg = cfg.withDefaults(respTimeout) closeCtx, cancel := context.WithCancel(ctx) + unsolicitedNodes, _ := lru.New[enode.ID, *enode.Node](500) + t := &UDPv4{ conn: c, priv: cfg.PrivateKey, netrestrict: cfg.NetRestrict, localNode: ln, db: ln.Database(), - gotreply: make(chan reply), - addReplyMatcher: make(chan *replyMatcher), - unhandled: make(chan UnhandledPacket), + gotreply: make(chan reply, 10), + addReplyMatcher: make(chan *replyMatcher, 10), + gotkey: make(chan v4wire.Pubkey, 10), + gotnodes: make(chan nodes, 10), replyTimeout: cfg.ReplyTimeout, pingBackDelay: cfg.PingBackDelay, closeCtx: closeCtx, cancelCloseCtx: cancel, log: cfg.Log, errors: map[string]uint{}, + unsolicitedNodes: unsolicitedNodes, privateKeyGenerator: cfg.PrivateKeyGenerator, } @@ -167,8 +181,7 @@ func ListenV4(ctx context.Context, protocol string, c UDPConn, ln *enode.LocalNo t.wg.Add(2) go t.loop() - go t.readLoop() - go t.handleUnhandled(cfg.Unhandled) + go t.readLoop(cfg.Unhandled) return t, nil } @@ -192,11 +205,6 @@ func (t *UDPv4) Close() { t.conn.Close() t.wg.Wait() t.tab.close() - if t.unhandled != nil { - ch := t.unhandled - t.unhandled = nil - close(ch) - } }) } @@ -263,7 +271,7 @@ func (t *UDPv4) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) *r } // Add a matcher for the reply to the pending reply queue. Pongs are matched if they // reference the ping we're about to send. - rm := t.pending(toid, toaddr.IP, v4wire.PongPacket, func(p v4wire.Packet) (matched bool, requestDone bool) { + rm := t.pending(toid, toaddr.IP, toaddr.Port, v4wire.PongPacket, func(p v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(p.(*v4wire.Pong).ReplyTok, hash) if matched && callback != nil { callback() @@ -323,6 +331,7 @@ func (t *UDPv4) newRandomLookup(ctx context.Context) *lookup { func (t *UDPv4) newLookup(ctx context.Context, targetKey *ecdsa.PublicKey) *lookup { targetKeyEnc := v4wire.EncodePubkey(targetKey) target := enode.PubkeyEncoded(targetKeyEnc).ID() + it := newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) { return t.findnode(n.ID(), n.addr(), targetKeyEnc) }) @@ -344,7 +353,7 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke // active until enough nodes have been received. nodes := make([]*node, 0, bucketSize) nreceived := 0 - rm := t.pending(toid, toaddr.IP, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) { + rm := t.pending(toid, toaddr.IP, toaddr.Port, v4wire.NeighborsPacket, func(r v4wire.Packet) (matched bool, requestDone bool) { reply := r.(*v4wire.Neighbors) for _, rn := range reply.Nodes { nreceived++ @@ -396,7 +405,7 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) { // Add a matcher for the reply to the pending reply queue. Responses are matched if // they reference the request we're about to send. - rm := t.pending(n.ID(), addr.IP, v4wire.ENRResponsePacket, func(r v4wire.Packet) (matched bool, requestDone bool) { + rm := t.pending(n.ID(), addr.IP, addr.Port, v4wire.ENRResponsePacket, func(r v4wire.Packet) (matched bool, requestDone bool) { matched = bytes.Equal(r.(*v4wire.ENRResponse).ReplyTok, hash) return matched, matched }) @@ -428,9 +437,10 @@ func (t *UDPv4) RequestENR(n *enode.Node) (*enode.Node, error) { // pending adds a reply matcher to the pending reply queue. // see the documentation of type replyMatcher for a detailed explanation. -func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) *replyMatcher { +func (t *UDPv4) pending(id enode.ID, ip net.IP, port int, ptype byte, callback replyMatchFunc) *replyMatcher { ch := make(chan error, 1) - p := &replyMatcher{from: id, ip: ip, ptype: ptype, callback: callback, errc: ch} + p := &replyMatcher{from: id, ip: ip, port: port, ptype: ptype, callback: callback, errc: ch} + select { case t.addReplyMatcher <- p: // loop will handle it @@ -442,10 +452,10 @@ func (t *UDPv4) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchF // handleReply dispatches a reply packet, invoking reply matchers. It returns // whether any matcher considered the packet acceptable. -func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, req v4wire.Packet) bool { +func (t *UDPv4) handleReply(from enode.ID, fromIP net.IP, port int, req v4wire.Packet) bool { matched := make(chan bool, 1) select { - case t.gotreply <- reply{from, fromIP, req, matched}: + case t.gotreply <- reply{from, fromIP, port, req, matched}: // loop will handle it return <-matched case <-t.closeCtx.Done(): @@ -461,88 +471,195 @@ func (t *UDPv4) loop() { var ( plist = list.New() - timeout = time.NewTimer(0) - nextTimeout *replyMatcher // head of plist when timeout was last reset - contTimeouts = 0 // number of continuous timeouts to do NTP checks + mutex = sync.Mutex{} + contTimeouts = 0 // number of continuous timeouts to do NTP checks ntpWarnTime = time.Unix(0, 0) ) - <-timeout.C // ignore first timeout - defer timeout.Stop() - resetTimeout := func() { - if plist.Front() == nil || nextTimeout == plist.Front().Value { - return - } - // Start the timer so it fires when the next pending reply has expired. - now := time.Now() - for el := plist.Front(); el != nil; el = el.Next() { - nextTimeout = el.Value.(*replyMatcher) - if dist := nextTimeout.deadline.Sub(now); dist < 2*t.replyTimeout { - timeout.Reset(dist) + listUpdate := make(chan *list.Element, 10) + + go func() { + var ( + timeout = time.NewTimer(0) + nextTimeout *replyMatcher // head of plist when timeout was last reset + ) + + <-timeout.C // ignore first timeout + defer timeout.Stop() + + resetTimeout := func() { + mutex.Lock() + defer mutex.Unlock() + + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - // Remove pending replies whose deadline is too far in the - // future. These can occur if the system clock jumped - // backwards after the deadline was assigned. - nextTimeout.errc <- errClockWarp - plist.Remove(el) + + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*replyMatcher) + if dist := nextTimeout.deadline.Sub(now); dist < 2*t.replyTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + + nextTimeout = nil + timeout.Stop() } - nextTimeout = nil - timeout.Stop() - } - for { - resetTimeout() + for { + select { + case now := <-timeout.C: + func() { + mutex.Lock() + defer mutex.Unlock() + + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*replyMatcher) + if !now.Before(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + contTimeouts++ + } + } + // If we've accumulated too many timeouts, do an NTP time sync check + if contTimeouts > ntpFailureThreshold { + if time.Since(ntpWarnTime) >= ntpWarningCooldown { + ntpWarnTime = time.Now() + go checkClockDrift() + } + contTimeouts = 0 + } + }() + + resetTimeout() + + case el := <-listUpdate: + if el == nil { + return + } + + resetTimeout() + } + } + }() + for { select { case <-t.closeCtx.Done(): - for el := plist.Front(); el != nil; el = el.Next() { - el.Value.(*replyMatcher).errc <- errClosed - } + listUpdate <- nil + func() { + mutex.Lock() + defer mutex.Unlock() + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*replyMatcher).errc <- errClosed + } + }() return case p := <-t.addReplyMatcher: - p.deadline = time.Now().Add(t.replyTimeout) - plist.PushBack(p) + func() { + mutex.Lock() + defer mutex.Unlock() + p.deadline = time.Now().Add(t.replyTimeout) + listUpdate <- plist.PushBack(p) + }() case r := <-t.gotreply: - var matched bool // whether any replyMatcher considered the reply acceptable. + + type matchCandidate struct { + el *list.Element + errc chan error + } + + var matchCandidates []matchCandidate + + mutex.Lock() for el := plist.Front(); el != nil; el = el.Next() { p := el.Value.(*replyMatcher) if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) { + candidate := matchCandidate{el, p.errc} + p.errc = make(chan error, 1) + matchCandidates = append(matchCandidates, candidate) + } + } + mutex.Unlock() + + if len(matchCandidates) == 0 { + // if there are no matched candidates try again matching against + // ip & port to handle node key changes + mutex.Lock() + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*replyMatcher) + if p.ptype == r.data.Kind() && p.ip.Equal(r.ip) && p.port == r.port { + candidate := matchCandidate{el, p.errc} + p.errc = make(chan error, 1) + matchCandidates = append(matchCandidates, candidate) + } + } + mutex.Unlock() + + if len(matchCandidates) == 0 { + r.matched <- false + } + } + + go func(r reply) { + var matched bool // whether any replyMatcher considered the reply acceptable. + for _, candidate := range matchCandidates { + p := candidate.el.Value.(*replyMatcher) ok, requestDone := p.callback(r.data) matched = matched || ok p.reply = r.data + // Remove the matcher if callback indicates that all replies have been received. if requestDone { - p.errc <- nil - plist.Remove(el) + mutex.Lock() + plist.Remove(candidate.el) + mutex.Unlock() + candidate.errc <- nil + listUpdate <- candidate.el + } else { + select { + case err := <-p.errc: + candidate.errc <- err + default: + p.errc = candidate.errc + } } - // Reset the continuous timeout counter (time drift detection) - contTimeouts = 0 } - } - r.matched <- matched - case now := <-timeout.C: - nextTimeout = nil + r.matched <- matched + }(r) - // Notify and remove callbacks whose deadline is in the past. - for el := plist.Front(); el != nil; el = el.Next() { - p := el.Value.(*replyMatcher) - if now.After(p.deadline) || now.Equal(p.deadline) { - p.errc <- errTimeout - plist.Remove(el) - contTimeouts++ + // Reset the continuous timeout counter (time drift detection) + contTimeouts = 0 + case key := <-t.gotkey: + go func() { + if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil { + for _, n := range t.LookupPubkey(key) { + t.unsolicitedNodes.Add(n.ID(), n) + } } - } - // If we've accumulated too many timeouts, do an NTP time sync check - if contTimeouts > ntpFailureThreshold { - if time.Since(ntpWarnTime) >= ntpWarningCooldown { - ntpWarnTime = time.Now() - go checkClockDrift() + }() + + case nodes := <-t.gotnodes: + for _, rn := range nodes.nodes { + n, err := t.nodeFromRPC(nodes.addr, rn) + if err != nil { + t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", nodes.addr, "err", err) + continue } - contTimeouts = 0 + t.unsolicitedNodes.Add(n.ID(), &n.Node) } } } @@ -564,10 +681,16 @@ func (t *UDPv4) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet [] } // readLoop runs in its own goroutine. it handles incoming UDP packets. -func (t *UDPv4) readLoop() { +func (t *UDPv4) readLoop(unhandled chan<- ReadPacket) { defer t.wg.Done() defer debug.LogPanic() + if unhandled != nil { + defer close(unhandled) + } + + unknownKeys, _ := lru.New[v4wire.Pubkey, any](100) + buf := make([]byte, maxPacketSize) for { nbytes, from, err := t.conn.ReadFromUDP(buf) @@ -582,40 +705,35 @@ func (t *UDPv4) readLoop() { } return } - if err := t.handlePacket(from, buf[:nbytes]); err != nil && t.unhandled != nil { - select { - case t.unhandled <- UnhandledPacket{ReadPacket: ReadPacket{buf[:nbytes], from}, Reason: err}: - default: - } - } - } -} - -func (t *UDPv4) handleUnhandled(unhandled chan<- ReadPacket) { - if unhandled != nil { - defer close(unhandled) - } - - for u := range t.unhandled { - switch { - case errors.Is(u.Reason, errUnsolicitedReply): - _, fromKey, _, err := v4wire.Decode(u.Data) - if err == nil { - fromId := enode.PubkeyEncoded(fromKey).ID() - - t.log.Trace("Unsolicited packet", "from", fromId, "addr", u.Addr) - t.sendPing(fromId, u.Addr, func() { - if key, err := v4wire.DecodePubkey(crypto.S256(), fromKey); err == nil { - t.LookupPubkey(key) + if err := t.handlePacket(from, buf[:nbytes]); err != nil { + func() { + switch { + case errors.Is(err, errUnsolicitedReply): + if packet, fromKey, _, err := v4wire.Decode(buf[:nbytes]); err == nil { + switch packet.Kind() { + case v4wire.PongPacket: + if _, ok := unknownKeys.Get(fromKey); !ok { + fromId := enode.PubkeyEncoded(fromKey).ID() + t.log.Trace("Unsolicited packet", "type", packet.Name(), "from", fromId, "addr", from) + unknownKeys.Add(fromKey, nil) + t.gotkey <- fromKey + } + case v4wire.NeighborsPacket: + neighbors := packet.(*v4wire.Neighbors) + t.gotnodes <- nodes{from, neighbors.Nodes} + default: + fromId := enode.PubkeyEncoded(fromKey).ID() + t.log.Trace("Unsolicited packet", "type", packet.Name(), "from", fromId, "addr", from) + } + } else { + t.log.Trace("Unsolicited packet handling failed", "addr", from, "err", err) } - }) - } else { - t.log.Trace("Unsolicited packet handling fialed", "addr", u.Addr, "err", err) - } - default: - if unhandled != nil { - unhandled <- u.ReadPacket - } + default: + if unhandled != nil { + unhandled <- ReadPacket{buf[:nbytes], from} + } + } + }() } } } @@ -628,6 +746,7 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error { } packet := t.wrapPacket(rawpacket) fromID := enode.PubkeyEncoded(fromKey).ID() + if packet.preverify != nil { err = packet.preverify(packet, from, fromID, fromKey) } @@ -772,7 +891,7 @@ func (t *UDPv4) verifyPong(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } - if !t.handleReply(fromID, from.IP, req) { + if !t.handleReply(fromID, from.IP, from.Port, req) { t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 return errUnsolicitedReply } @@ -838,7 +957,7 @@ func (t *UDPv4) verifyNeighbors(h *packetHandlerV4, from *net.UDPAddr, fromID en t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 return errExpired } - if !t.handleReply(fromID, from.IP, h.Packet) { + if !t.handleReply(fromID, from.IP, from.Port, h.Packet) { t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 return errUnsolicitedReply } @@ -875,7 +994,7 @@ func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e // ENRRESPONSE/v4 func (t *UDPv4) verifyENRResponse(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { - if !t.handleReply(fromID, from.IP, h.Packet) { + if !t.handleReply(fromID, from.IP, from.Port, h.Packet) { t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 return errUnsolicitedReply } diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 44118a58858..5e2a9df92b6 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -237,7 +237,7 @@ func TestUDPv4_responseTimeouts(t *testing.T) { p.errc = nilErr test.udp.addReplyMatcher <- p time.AfterFunc(randomDuration(60*time.Millisecond), func() { - if !test.udp.handleReply(p.from, p.ip, testPacket(p.ptype)) { + if !test.udp.handleReply(p.from, p.ip, p.port, testPacket(p.ptype)) { t.Logf("not matched: %v", p) } }) diff --git a/p2p/server.go b/p2p/server.go index 3066cb0de6e..8802fff414f 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -1032,6 +1032,8 @@ func cleanError(err string) string { return "i/o timeout" case strings.HasSuffix(err, "closed by the remote host."): return "closed by remote" + case strings.HasSuffix(err, "connection reset by peer"): + return "closed by remote" default: return err } From d14e9f147edb3aaae3879f99720dc066d9e726e5 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sun, 5 Nov 2023 13:52:04 +0000 Subject: [PATCH 20/26] fixed error map r/w clashes --- p2p/dial.go | 15 +++++++++------ p2p/discover/v4_udp.go | 35 ++++++++++++++++++++++++++++++++++- p2p/server.go | 4 ++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/p2p/dial.go b/p2p/dial.go index 83053bce19a..8bb3934ebe4 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -93,6 +93,7 @@ var ( // to create peer connections to nodes arriving through the iterator. type dialScheduler struct { dialConfig + mutex sync.Mutex setupFunc dialSetupFunc wg sync.WaitGroup cancel context.CancelFunc @@ -125,8 +126,8 @@ type dialScheduler struct { historyTimerTime mclock.AbsTime // for logStats - doneSinceLastLog int - errors map[string]uint + dialed int + errors map[string]uint } type dialSetupFunc func(net.Conn, connFlag, *enode.Node) error @@ -266,7 +267,7 @@ loop: id := task.dest.ID() delete(d.dialing, id) d.updateStaticPool(id) - d.doneSinceLastLog++ + d.dialed++ case c := <-d.addPeerCh: if c.is(dynDialedConn) || c.is(staticDialedConn) { @@ -343,15 +344,15 @@ func (d *dialScheduler) readNodes(it enode.Iterator) { // nolint func (d *dialScheduler) logStats() { vals := []interface{}{"protocol", d.subProtocolVersion, - "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.doneSinceLastLog, "static", len(d.static)} + "peers", fmt.Sprintf("%d/%d", len(d.peers), d.maxDialPeers), "tried", d.dialed, "static", len(d.static)} + d.mutex.Lock() for err, count := range d.errors { vals = append(vals, err, count) } + d.mutex.Unlock() d.log.Debug("[p2p] Dial scheduler", vals...) - - d.doneSinceLastLog = 0 } // rearmHistoryTimer configures d.historyTimer to fire when the @@ -552,7 +553,9 @@ func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error { cleanErr := cleanupDialErr(err) d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanErr) + d.mutex.Lock() d.errors[cleanErr.Error()] = d.errors[cleanErr.Error()] + 1 + d.mutex.Unlock() return &dialError{err} } mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()}) diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index faf75bc2f93..76c7cdeb1bb 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -72,6 +72,7 @@ const ( // UDPv4 implements the v4 wire protocol. type UDPv4 struct { + mutex sync.Mutex conn UDPConn log log.Logger netrestrict *netutil.Netlist @@ -195,7 +196,15 @@ func (t *UDPv4) Version() string { } func (t *UDPv4) Errors() map[string]uint { - return t.errors + errors := map[string]uint{} + + t.mutex.Lock() + for key, value := range t.errors { + errors[key] = value + } + t.mutex.Unlock() + + return errors } // Close shuts down the socket and aborts any running queries. @@ -844,11 +853,15 @@ func (t *UDPv4) verifyPing(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I senderKey, err := v4wire.DecodePubkey(crypto.S256(), fromKey) if err != nil { + t.mutex.Lock() t.errors[err.Error()] = t.errors[err.Error()] + 1 + t.mutex.Unlock() return err } if v4wire.Expired(req.Expiration) { + t.mutex.Lock() t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } h.senderKey = senderKey @@ -888,11 +901,15 @@ func (t *UDPv4) verifyPong(h *packetHandlerV4, from *net.UDPAddr, fromID enode.I req := h.Packet.(*v4wire.Pong) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } if !t.handleReply(fromID, from.IP, from.Port, req) { + t.mutex.Lock() t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 + t.mutex.Unlock() return errUnsolicitedReply } t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)}) @@ -906,7 +923,9 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno req := h.Packet.(*v4wire.Findnode) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } if !t.checkBond(fromID, from.IP) { @@ -916,7 +935,9 @@ func (t *UDPv4) verifyFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno // and UDP port of the target as the source address. The recipient of the findnode // packet would then send a neighbors packet (which is a much bigger packet than // findnode) to the victim. + t.mutex.Lock() t.errors[errUnknownNodeStr] = t.errors[errUnknownNodeStr] + 1 + t.mutex.Unlock() return errUnknownNode } return nil @@ -954,11 +975,15 @@ func (t *UDPv4) verifyNeighbors(h *packetHandlerV4, from *net.UDPAddr, fromID en req := h.Packet.(*v4wire.Neighbors) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } if !t.handleReply(fromID, from.IP, from.Port, h.Packet) { + t.mutex.Lock() t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 + t.mutex.Unlock() return errUnsolicitedReply } return nil @@ -970,11 +995,15 @@ func (t *UDPv4) verifyENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e req := h.Packet.(*v4wire.ENRRequest) if v4wire.Expired(req.Expiration) { + t.mutex.Lock() t.errors[errExpiredStr] = t.errors[errExpiredStr] + 1 + t.mutex.Unlock() return errExpired } if !t.checkBond(fromID, from.IP) { + t.mutex.Lock() t.errors[errUnknownNodeStr] = t.errors[errUnknownNodeStr] + 1 + t.mutex.Unlock() return errUnknownNode } return nil @@ -987,7 +1016,9 @@ func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e }) if err != nil { + t.mutex.Lock() t.errors[err.Error()] = t.errors[err.Error()] + 1 + t.mutex.Unlock() } } @@ -995,7 +1026,9 @@ func (t *UDPv4) handleENRRequest(h *packetHandlerV4, from *net.UDPAddr, fromID e func (t *UDPv4) verifyENRResponse(h *packetHandlerV4, from *net.UDPAddr, fromID enode.ID, fromKey v4wire.Pubkey) error { if !t.handleReply(fromID, from.IP, from.Port, h.Packet) { + t.mutex.Lock() t.errors[errUnsolicitedReplyStr] = t.errors[errUnsolicitedReplyStr] + 1 + t.mutex.Unlock() return errUnsolicitedReply } return nil diff --git a/p2p/server.go b/p2p/server.go index 8802fff414f..c8201fed9ec 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -1063,7 +1063,9 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro remotePubkey, err := c.doEncHandshake(srv.PrivateKey) if err != nil { errStr := cleanError(err.Error()) + srv.lock.Lock() srv.errors[errStr] = srv.errors[errStr] + 1 + srv.lock.Unlock() srv.logger.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err) return err } @@ -1084,7 +1086,9 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro phs, err := c.doProtoHandshake(srv.ourHandshake) if err != nil { errStr := cleanError(err.Error()) + srv.lock.Lock() srv.errors[errStr] = srv.errors[errStr] + 1 + srv.lock.Unlock() clog.Trace("Failed p2p handshake", "err", err) return err } From 99c92166cbfb1000bab22c8cac44e31e9570ed21 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sun, 5 Nov 2023 14:38:06 +0000 Subject: [PATCH 21/26] exit timer loop on close --- p2p/discover/v4_udp.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 76c7cdeb1bb..9037d72d878 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -525,6 +525,9 @@ func (t *UDPv4) loop() { for { select { + case <-t.closeCtx.Done(): + return + case now := <-timeout.C: func() { mutex.Lock() From 0cfaec51f7974a248f1368cfa77724e518c611c6 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sun, 5 Nov 2023 19:23:49 +0000 Subject: [PATCH 22/26] log unsolicited --- p2p/discover/table.go | 4 +++- p2p/discover/v4_udp.go | 31 +++++++++++++++++++++++-------- p2p/discover/v5_udp.go | 4 ++++ 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index c0011727a5c..feaf5d39788 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -102,6 +102,7 @@ type transport interface { ping(*enode.Node) (seq uint64, err error) Version() string Errors() map[string]uint + LenUnsolicited() int } // bucket contains nodes, ordered by their last activity. the entry @@ -302,7 +303,8 @@ loop: case <-tableMainenance.C: live := tab.live() - vals := []interface{}{"protocol", tab.protocol, "version", tab.net.Version(), "len", tab.len(), "live", tab.live(), "ips", tab.ips.Len(), "db", tab.dbseeds, "reval", tab.revalidates} + vals := []interface{}{"protocol", tab.protocol, "version", tab.net.Version(), + "len", tab.len(), "live", tab.live(), "unsol", tab.net.LenUnsolicited(), "ips", tab.ips.Len(), "db", tab.dbseeds, "reval", tab.revalidates} func() { tab.mutex.Lock() diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 9037d72d878..2c3b314e8c2 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -207,6 +207,12 @@ func (t *UDPv4) Errors() map[string]uint { return errors } +func (t *UDPv4) LenUnslocited() int { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.unsolicitedNodes.Len() +} + // Close shuts down the socket and aborts any running queries. func (t *UDPv4) Close() { t.closeOnce.Do(func() { @@ -658,21 +664,30 @@ func (t *UDPv4) loop() { case key := <-t.gotkey: go func() { if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil { - for _, n := range t.LookupPubkey(key) { + nodes := t.LookupPubkey(key) + mutex.Lock() + defer mutex.Unlock() + + for _, n := range nodes { t.unsolicitedNodes.Add(n.ID(), n) } } }() case nodes := <-t.gotnodes: - for _, rn := range nodes.nodes { - n, err := t.nodeFromRPC(nodes.addr, rn) - if err != nil { - t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", nodes.addr, "err", err) - continue + + func() { + mutex.Lock() + defer mutex.Unlock() + for _, rn := range nodes.nodes { + n, err := t.nodeFromRPC(nodes.addr, rn) + if err != nil { + t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", nodes.addr, "err", err) + continue + } + t.unsolicitedNodes.Add(n.ID(), &n.Node) } - t.unsolicitedNodes.Add(n.ID(), &n.Node) - } + }() } } } diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index c5c44bb074b..88001776aff 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -191,6 +191,10 @@ func (t *UDPv5) Errors() map[string]uint { return t.errors } +func (t *UDPv5) LenUnslocited() int { + return 0 +} + // Close shuts down packet processing. func (t *UDPv5) Close() { t.closeOnce.Do(func() { From c5cd704eca8be3a5ac02c187a68e3e4c253f98ef Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Mon, 6 Nov 2023 09:35:28 +0000 Subject: [PATCH 23/26] Added accessors and delayed provider test --- erigon-lib/diagnostics/network.go | 4 +-- erigon-lib/diagnostics/provider.go | 43 +++++++++++++++++++++++-- erigon-lib/diagnostics/provider_test.go | 23 +++++++++++-- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/erigon-lib/diagnostics/network.go b/erigon-lib/diagnostics/network.go index 08bfaed8d31..7436a4b9166 100644 --- a/erigon-lib/diagnostics/network.go +++ b/erigon-lib/diagnostics/network.go @@ -16,8 +16,6 @@ package diagnostics -import "reflect" - func (p PeerStatistics) Type() Type { - return Type(reflect.TypeOf(p)) + return TypeOf(p) } diff --git a/erigon-lib/diagnostics/provider.go b/erigon-lib/diagnostics/provider.go index 71431e92890..2982df16338 100644 --- a/erigon-lib/diagnostics/provider.go +++ b/erigon-lib/diagnostics/provider.go @@ -18,7 +18,35 @@ const ( ckChan ctxKey = iota ) -type Type reflect.Type +type Type interface { + reflect.Type + Context() context.Context + Err() error +} + +type diagType struct { + reflect.Type +} + +var cancelled = func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx +}() + +func (t diagType) Context() context.Context { + providerMutex.Lock() + defer providerMutex.Unlock() + if reg := providers[t]; reg != nil { + return reg.context + } + + return cancelled +} + +func (t diagType) Err() error { + return t.Context().Err() +} type Info interface { Type() Type @@ -26,7 +54,7 @@ type Info interface { func TypeOf(i Info) Type { t := reflect.TypeOf(i) - return Type(t) + return diagType{t} } type Provider interface { @@ -76,6 +104,11 @@ func StartProviders(ctx context.Context, infoType Type, logger log.Logger) { reg := providers[infoType] + if reg == nil { + reg = ®istry{} + providers[infoType] = reg + } + toStart := make([]Provider, len(reg.providers)) copy(toStart, reg.providers) @@ -103,9 +136,13 @@ func startProvider(ctx context.Context, infoType Type, provider Provider, logger } } -func Send[I Info](ctx context.Context, info I) error { +func Send[I Info](info I) error { + ctx := info.Type().Context() + if ctx.Err() != nil { if !errors.Is(ctx.Err(), context.Canceled) { + // drop the diagnostic message if there is + // no active diagnostic context for the type return nil } diff --git a/erigon-lib/diagnostics/provider_test.go b/erigon-lib/diagnostics/provider_test.go index 7d8ea6b10ec..b5f2fefc7f4 100644 --- a/erigon-lib/diagnostics/provider_test.go +++ b/erigon-lib/diagnostics/provider_test.go @@ -31,7 +31,7 @@ func (t *testProvider) StartDiagnostics(ctx context.Context) error { case <-ctx.Done(): return nil case <-timer.C: - diagnostics.Send(ctx, testInfo{count}) + diagnostics.Send(testInfo{count}) count++ } } @@ -54,6 +54,25 @@ func TestProviderRegistration(t *testing.T) { } } +func TestDelayedProviderRegistration(t *testing.T) { + + time.AfterFunc(1*time.Second, func() { + // diagnostics provider + provider := &testProvider{} + diagnostics.RegisterProvider(provider, diagnostics.TypeOf(testInfo{}), log.Root()) + }) + + // diagnostics receiver + ctx, ch, cancel := diagnostics.Context[testInfo](context.Background(), 1) + diagnostics.StartProviders(ctx, diagnostics.TypeOf(testInfo{}), log.Root()) + + for info := range ch { + if info.count == 3 { + cancel() + } + } +} + func TestProviderFuncRegistration(t *testing.T) { // diagnostics provider @@ -68,7 +87,7 @@ func TestProviderFuncRegistration(t *testing.T) { case <-ctx.Done(): return nil case <-timer.C: - diagnostics.Send(ctx, testInfo{count}) + diagnostics.Send(testInfo{count}) count++ } } From 5a687b599b807ee51a2658495d8feed30223a8d9 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Mon, 6 Nov 2023 09:43:48 +0000 Subject: [PATCH 24/26] fix logging typos --- p2p/discover/table_util_test.go | 1 + p2p/discover/v4_udp.go | 2 +- p2p/discover/v5_udp.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go index 14b473e40fa..e4613192884 100644 --- a/p2p/discover/table_util_test.go +++ b/p2p/discover/table_util_test.go @@ -158,6 +158,7 @@ func (t *pingRecorder) updateRecord(n *enode.Node) { func (t *pingRecorder) Self() *enode.Node { return nullNode } func (t *pingRecorder) Version() string { return "none" } func (t *pingRecorder) Errors() map[string]uint { return nil } +func (t *pingRecorder) LenUnsolicited() int { return 0 } func (t *pingRecorder) lookupSelf() []*enode.Node { return nil } func (t *pingRecorder) lookupRandom() []*enode.Node { return nil } diff --git a/p2p/discover/v4_udp.go b/p2p/discover/v4_udp.go index 2c3b314e8c2..9d962df0414 100644 --- a/p2p/discover/v4_udp.go +++ b/p2p/discover/v4_udp.go @@ -207,7 +207,7 @@ func (t *UDPv4) Errors() map[string]uint { return errors } -func (t *UDPv4) LenUnslocited() int { +func (t *UDPv4) LenUnsolicited() int { t.mutex.Lock() defer t.mutex.Unlock() return t.unsolicitedNodes.Len() diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 88001776aff..d66d44e36f0 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -191,7 +191,7 @@ func (t *UDPv5) Errors() map[string]uint { return t.errors } -func (t *UDPv5) LenUnslocited() int { +func (t *UDPv5) LenUnsolicited() int { return 0 } From 30bcb9cb26ed99cf6dc9e787a73a757f3d338ac3 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Mon, 6 Nov 2023 10:39:06 +0000 Subject: [PATCH 25/26] fix test hang due to unintialized errors --- p2p/server.go | 5 +++++ p2p/server_test.go | 1 + 2 files changed, 6 insertions(+) diff --git a/p2p/server.go b/p2p/server.go index a7c8129419d..c51d82e990b 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -1046,6 +1046,11 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running + + if srv.errors == nil { + srv.errors = map[string]uint{} + } + srv.lock.Unlock() if !running { return errServerStopped diff --git a/p2p/server_test.go b/p2p/server_test.go index dfa972af640..4c44cfc5c51 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -458,6 +458,7 @@ func TestServerSetupConn(t *testing.T) { srv := &Server{ Config: cfg, newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt }, //nolint:scopelint + errors: map[string]uint{}, } if !test.dontstart { if err := srv.TestStart(logger); err != nil { From f13376124f677abbbdf5ce3e46136d9c6dbbab42 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Mon, 6 Nov 2023 10:53:57 +0000 Subject: [PATCH 26/26] clear errors on setup --- p2p/server.go | 7 ++----- p2p/server_test.go | 1 - 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index c51d82e990b..7ba83014a3e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -1046,11 +1046,8 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running - - if srv.errors == nil { - srv.errors = map[string]uint{} - } - + // reset error counts + srv.errors = map[string]uint{} srv.lock.Unlock() if !running { return errServerStopped diff --git a/p2p/server_test.go b/p2p/server_test.go index 4c44cfc5c51..dfa972af640 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -458,7 +458,6 @@ func TestServerSetupConn(t *testing.T) { srv := &Server{ Config: cfg, newTransport: func(fd net.Conn, dialDest *ecdsa.PublicKey) transport { return test.tt }, //nolint:scopelint - errors: map[string]uint{}, } if !test.dontstart { if err := srv.TestStart(logger); err != nil {