Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/protocols, prp/tracker: add support for req/rep rtt tracking #22608

Merged
merged 4 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func handleMessage(backend Backend, peer *Peer) error {
if peer.Version() >= ETH66 {
handlers = eth66
}
// Track the emount of time it takes to serve the request and run the handler
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
defer func(start time.Time) {
Expand Down
10 changes: 10 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, BlockHeadersMsg, res.RequestId)

return backend.Handle(peer, &res.BlockHeadersPacket)
}

Expand All @@ -345,6 +347,8 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, BlockBodiesMsg, res.RequestId)

return backend.Handle(peer, &res.BlockBodiesPacket)
}

Expand All @@ -363,6 +367,8 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, NodeDataMsg, res.RequestId)

return backend.Handle(peer, &res.NodeDataPacket)
}

Expand All @@ -381,6 +387,8 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, ReceiptsMsg, res.RequestId)

return backend.Handle(peer, &res.ReceiptsPacket)
}

Expand Down Expand Up @@ -506,5 +514,7 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
}
peer.markTransaction(tx.Hash())
}
requestTracker.Fulfil(peer.id, peer.version, PooledTransactionsMsg, txs.RequestId)

return backend.Handle(peer, &txs.PooledTransactionsPacket)
}
35 changes: 28 additions & 7 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,11 @@ func (p *Peer) RequestOneHeader(hash common.Hash) error {
Reverse: false,
}
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetBlockHeadersPacket: &query,
})
}
Expand All @@ -432,8 +435,11 @@ func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, re
Reverse: reverse,
}
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetBlockHeadersPacket: &query,
})
}
Expand All @@ -451,8 +457,11 @@ func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, rever
Reverse: reverse,
}
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id)
return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetBlockHeadersPacket: &query,
})
}
Expand All @@ -476,8 +485,11 @@ func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int,
func (p *Peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id)
return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetBlockBodiesPacket: hashes,
})
}
Expand All @@ -489,8 +501,11 @@ func (p *Peer) RequestBodies(hashes []common.Hash) error {
func (p *Peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id)
return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetNodeDataPacket: hashes,
})
}
Expand All @@ -501,8 +516,11 @@ func (p *Peer) RequestNodeData(hashes []common.Hash) error {
func (p *Peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id)
return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetReceiptsPacket: hashes,
})
}
Expand All @@ -513,8 +531,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash) error {
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
RequestId: rand.Uint64(),
RequestId: id,
GetPooledTransactionsPacket: hashes,
})
}
Expand Down
26 changes: 26 additions & 0 deletions eth/protocols/eth/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package eth

import (
"time"

"github.com/ethereum/go-ethereum/p2p/tracker"
)

// requestTracker is a singleton tracker for eth/66 and newer request times.
var requestTracker = tracker.New(ProtocolName, 5*time.Minute)
8 changes: 8 additions & 0 deletions eth/protocols/snap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func handleMessage(backend Backend, peer *Peer) error {
return fmt.Errorf("accounts not monotonically increasing: #%d [%x] vs #%d [%x]", i-1, res.Accounts[i-1].Hash[:], i, res.Accounts[i].Hash[:])
}
}
requestTracker.Fulfil(peer.id, peer.version, AccountRangeMsg, res.ID)

return backend.Handle(peer, res)

case msg.Code == GetStorageRangesMsg:
Expand Down Expand Up @@ -360,6 +362,8 @@ func handleMessage(backend Backend, peer *Peer) error {
}
}
}
requestTracker.Fulfil(peer.id, peer.version, StorageRangesMsg, res.ID)

return backend.Handle(peer, res)

case msg.Code == GetByteCodesMsg:
Expand Down Expand Up @@ -404,6 +408,8 @@ func handleMessage(backend Backend, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, ByteCodesMsg, res.ID)

return backend.Handle(peer, res)

case msg.Code == GetTrieNodesMsg:
Expand Down Expand Up @@ -497,6 +503,8 @@ func handleMessage(backend Backend, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, TrieNodesMsg, res.ID)

return backend.Handle(peer, res)

default:
Expand Down
7 changes: 7 additions & 0 deletions eth/protocols/snap/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (p *Peer) Log() log.Logger {
// trie, starting with the origin.
func (p *Peer) RequestAccountRange(id uint64, root common.Hash, origin, limit common.Hash, bytes uint64) error {
p.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes))

requestTracker.Track(p.id, p.version, GetAccountRangeMsg, AccountRangeMsg, id)
return p2p.Send(p.rw, GetAccountRangeMsg, &GetAccountRangePacket{
ID: id,
Root: root,
Expand All @@ -83,6 +85,7 @@ func (p *Peer) RequestStorageRanges(id uint64, root common.Hash, accounts []comm
} else {
p.logger.Trace("Fetching ranges of small storage slots", "reqid", id, "root", root, "accounts", len(accounts), "first", accounts[0], "bytes", common.StorageSize(bytes))
}
requestTracker.Track(p.id, p.version, GetStorageRangesMsg, StorageRangesMsg, id)
return p2p.Send(p.rw, GetStorageRangesMsg, &GetStorageRangesPacket{
ID: id,
Root: root,
Expand All @@ -96,6 +99,8 @@ func (p *Peer) RequestStorageRanges(id uint64, root common.Hash, accounts []comm
// RequestByteCodes fetches a batch of bytecodes by hash.
func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) error {
p.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes))

requestTracker.Track(p.id, p.version, GetByteCodesMsg, ByteCodesMsg, id)
return p2p.Send(p.rw, GetByteCodesMsg, &GetByteCodesPacket{
ID: id,
Hashes: hashes,
Expand All @@ -107,6 +112,8 @@ func (p *Peer) RequestByteCodes(id uint64, hashes []common.Hash, bytes uint64) e
// a specificstate trie.
func (p *Peer) RequestTrieNodes(id uint64, root common.Hash, paths []TrieNodePathSet, bytes uint64) error {
p.logger.Trace("Fetching set of trie nodes", "reqid", id, "root", root, "pathsets", len(paths), "bytes", common.StorageSize(bytes))

requestTracker.Track(p.id, p.version, GetTrieNodesMsg, TrieNodesMsg, id)
return p2p.Send(p.rw, GetTrieNodesMsg, &GetTrieNodesPacket{
ID: id,
Root: root,
Expand Down
26 changes: 26 additions & 0 deletions eth/protocols/snap/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package snap

import (
"time"

"github.com/ethereum/go-ethereum/p2p/tracker"
)

// requestTracker is a singleton tracker for request times.
var requestTracker = tracker.New(ProtocolName, time.Minute)
3 changes: 0 additions & 3 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ const (

// HandleHistName is the prefix of the per-packet serving time histograms.
HandleHistName = "p2p/handle"

// WaitHistName is the prefix of the per-packet (req only) waiting time histograms.
WaitHistName = "p2p/wait"
)

var (
Expand Down
Loading