Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p, p2p/discover: add dial metrics #829

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions p2p/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,13 @@ type dialError struct {

// dial performs the actual connection attempt.
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
dialMeter.Mark(1)
fd, err := srv.Dialer.Dial(dest)
if err != nil {
dialConnectionError.Mark(1)
return &dialError{err}
}
mfd := newMeteredConn(fd, false)
return srv.SetupConn(mfd, t.flags, dest)
return srv.SetupConn(newMeteredConn(fd), t.flags, dest)
}

func (t *dialTask) String() string {
Expand Down
37 changes: 37 additions & 0 deletions p2p/discover/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package discover

import (
"fmt"

"github.com/XinFinOrg/XDPoSChain/metrics"
)

const (
moduleName = "discover"
)

var (
bucketsCounter []*metrics.Counter
)

func init() {
for i := 0; i < nBuckets; i++ {
bucketsCounter = append(bucketsCounter, metrics.NewRegisteredCounter(fmt.Sprintf("%s/bucket/%d/count", moduleName, i), nil))
}
}
43 changes: 40 additions & 3 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
)

Expand Down Expand Up @@ -86,7 +87,8 @@ type Table struct {
bonding map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes

nodeAddedHook func(*Node) // for testing
nodeAddedHook func(*bucket, *Node)
nodeRemovedHook func(*bucket, *Node)

net transport
self *Node // metadata of the local node
Expand Down Expand Up @@ -114,6 +116,7 @@ type bucket struct {
entries []*Node // live entries, sorted by time of last contact
replacements []*Node // recently seen nodes to be used if revalidation fails
ips netutil.DistinctNetSet
index int
}

func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
Expand Down Expand Up @@ -145,7 +148,8 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
index: i,
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
Expand All @@ -158,6 +162,22 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
return tab, nil
}

func newMeteredTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
tab, err := newTable(t, ourID, ourAddr, nodeDBPath, bootnodes)
if err != nil {
return nil, err
}
if metrics.Enabled() {
tab.nodeAddedHook = func(b *bucket, n *Node) {
bucketsCounter[b.index].Inc(1)
}
tab.nodeRemovedHook = func(b *bucket, n *Node) {
bucketsCounter[b.index].Dec(1)
}
}
return tab, nil
}

func (tab *Table) seedRand() {
var b [8]byte
crand.Read(b[:])
Expand Down Expand Up @@ -814,14 +834,31 @@ func (tab *Table) bumpOrAdd(b *bucket, n *Node) bool {
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
tab.nodeAddedHook(b, n)
}
return true
}

func (tab *Table) deleteInBucket(b *bucket, n *Node) {
// Check if the node is actually in the bucket so the removed hook
// isn't called multiple times for the same node.
if !contains(b.entries, n.ID) {
return
}
b.entries = deleteNode(b.entries, n)
tab.removeIP(b, n.IP)
if tab.nodeRemovedHook != nil {
tab.nodeRemovedHook(b, n)
}
}

func contains(ns []*Node, id NodeID) bool {
for _, n := range ns {
if n.ID == id {
return true
}
}
return false
}

// pushNode adds n to the front of list, keeping at most max items.
Expand Down
9 changes: 0 additions & 9 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,15 +641,6 @@ func sortedByDistanceTo(distbase common.Hash, slice []*Node) bool {
return true
}

func contains(ns []*Node, id NodeID) bool {
for _, n := range ns {
if n.ID == id {
return true
}
}
return false
}

// gen wraps quick.Value so it's easier to use.
// it generates a random value of the given value's type.
func gen(typ interface{}, rand *rand.Rand) interface{} {
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) {
}
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
tab, err := newMeteredTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestUDP_findnodeMultiReply(t *testing.T) {
func TestUDP_successfulPing(t *testing.T) {
test := newUDPTest(t)
added := make(chan *Node, 1)
test.table.nodeAddedHook = func(n *Node) { added <- n }
test.table.nodeAddedHook = func(b *bucket, n *Node) { added <- n }
defer test.table.Close()

// The remote side sends a ping packet to initiate the exchange.
Expand Down
55 changes: 46 additions & 9 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,61 @@
package p2p

import (
"errors"
"net"

"github.com/XinFinOrg/XDPoSChain/metrics"
)

var (
ingressConnectMeter = metrics.NewRegisteredMeter("p2p/InboundConnects", nil)
ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil)
egressConnectMeter = metrics.NewRegisteredMeter("p2p/OutboundConnects", nil)
egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
)

var (
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)

serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil)
dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil)
dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil)

// handshake error meters
dialTooManyPeers = metrics.NewRegisteredMeter("p2p/dials/error/saturated", nil)
dialAlreadyConnected = metrics.NewRegisteredMeter("p2p/dials/error/known", nil)
dialSelf = metrics.NewRegisteredMeter("p2p/dials/error/self", nil)
dialUselessPeer = metrics.NewRegisteredMeter("p2p/dials/error/useless", nil)
dialUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/dials/error/id/unexpected", nil)
dialEncHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/enc", nil)
dialProtoHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/proto", nil)
)

func markDialError(err error) {
if !metrics.Enabled() {
return
}
if err2 := errors.Unwrap(err); err2 != nil {
err = err2
}
switch err {
case DiscTooManyPeers:
dialTooManyPeers.Mark(1)
case DiscAlreadyConnected:
dialAlreadyConnected.Mark(1)
case DiscSelf:
dialSelf.Mark(1)
case DiscUselessPeer:
dialUselessPeer.Mark(1)
case DiscUnexpectedIdentity:
dialUnexpectedIdentity.Mark(1)
case errEncHandshakeError:
dialEncHandshakeError.Mark(1)
case errProtoHandshakeError:
dialProtoHandshakeError.Mark(1)
}
}

// meteredConn is a wrapper around a network TCP connection that meters both the
// inbound and outbound network traffic.
type meteredConn struct {
Expand All @@ -40,17 +83,11 @@ type meteredConn struct {
// newMeteredConn creates a new metered connection, also bumping the ingress or
// egress connection meter. If the metrics system is disabled, this function
// returns the original object.
func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
func newMeteredConn(conn net.Conn) net.Conn {
// Short circuit if metrics are disabled
if !metrics.Enabled() {
return conn
}
// Otherwise bump the connection counters and wrap the connection
if ingress {
ingressConnectMeter.Mark(1)
} else {
egressConnectMeter.Mark(1)
}
return &meteredConn{conn.(*net.TCPConn)}
}

Expand Down
17 changes: 15 additions & 2 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ const (
frameWriteTimeout = 20 * time.Second
)

var errServerStopped = errors.New("server stopped")
var (
errServerStopped = errors.New("server stopped")
errEncHandshakeError = errors.New("rlpx enc error")
errProtoHandshakeError = errors.New("rlpx proto error")
)

// Config holds Server options.
type Config struct {
Expand Down Expand Up @@ -702,7 +706,11 @@ running:
}
if p.Inbound() {
inboundCount++
serveSuccessMeter.Mark(1)
} else {
dialSuccessMeter.Mark(1)
}
activePeerGauge.Inc(1)
}
// The dialer logic relies on the assumption that
// dial tasks complete after the peer has been added or
Expand All @@ -720,6 +728,7 @@ running:
if pd.Inbound() {
inboundCount--
}
activePeerGauge.Dec(1)
}
}

Expand Down Expand Up @@ -839,7 +848,8 @@ func (srv *Server) listenLoop() {
}
}

fd = newMeteredConn(fd, true)
fd = newMeteredConn(fd)
serveMeter.Mark(1)
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
go func() {
srv.SetupConn(fd, inboundConn, nil)
Expand All @@ -859,6 +869,9 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
if !c.is(inboundConn) {
markDialError(err)
}
c.close(err)
srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
}
Expand Down
9 changes: 4 additions & 5 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package p2p

import (
"crypto/ecdsa"
"errors"
"math/rand"
"net"
"reflect"
Expand Down Expand Up @@ -506,10 +505,10 @@ func TestServerSetupConn(t *testing.T) {
wantCloseErr: errServerStopped,
},
{
tt: &setupTransport{id: id, encHandshakeErr: errors.New("read error")},
tt: &setupTransport{id: id, encHandshakeErr: errEncHandshakeError},
flags: inboundConn,
wantCalls: "doEncHandshake,close,",
wantCloseErr: errors.New("read error"),
wantCloseErr: errEncHandshakeError,
},
{
tt: &setupTransport{id: id},
Expand All @@ -526,11 +525,11 @@ func TestServerSetupConn(t *testing.T) {
wantCloseErr: DiscUnexpectedIdentity,
},
{
tt: &setupTransport{id: id, protoHandshakeErr: errors.New("foo")},
tt: &setupTransport{id: id, protoHandshakeErr: errProtoHandshakeError},
dialDest: &discover.Node{ID: id},
flags: dynDialedConn,
wantCalls: "doEncHandshake,doProtoHandshake,close,",
wantCloseErr: errors.New("foo"),
wantCloseErr: errProtoHandshakeError,
},
{
tt: &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}},
Expand Down