Skip to content

Commit

Permalink
metrics, cmd/geth: change init-process of metrics (ethereum#30814)
Browse files Browse the repository at this point in the history
This PR modifies how the metrics library handles `Enabled`: previously,
the package `init` decided whether to serve real metrics or just
dummy-types.

This has several drawbacks: 
- During pkg init, we need to determine whether metrics are enabled or
not. So we first hacked in a check if certain geth-specific
commandline-flags were enabled. Then we added a similar check for
geth-env-vars. Then we almost added a very elaborate check for
toml-config-file, plus toml parsing.

- Using "real" types and dummy types interchangeably means that
everything is hidden behind interfaces. This has a performance penalty,
and also it just adds a lot of code.

This PR removes the interface stuff, uses concrete types, and allows for
the setting of Enabled to happen later. It is still assumed that
`metrics.Enable()` is invoked early on.

The somewhat 'heavy' operations, such as ticking meters and exp-decay,
now checks the enable-flag to prevent resource leak.

The change may be large, but it's mostly pretty trivial, and from the
last time I gutted the metrics, I ensured that we have fairly good test
coverage.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
  • Loading branch information
holiman and fjl authored Dec 10, 2024
1 parent 4ecf085 commit 9045b79
Show file tree
Hide file tree
Showing 58 changed files with 739 additions and 1,396 deletions.
11 changes: 4 additions & 7 deletions cmd/geth/chaincmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/era"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -282,14 +281,12 @@ func importChain(ctx *cli.Context) error {
if ctx.Args().Len() < 1 {
utils.Fatalf("This command requires an argument.")
}
// Start metrics export if enabled
utils.SetupMetrics(ctx)
// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)

stack, _ := makeConfigNode(ctx)
stack, cfg := makeConfigNode(ctx)
defer stack.Close()

// Start metrics export if enabled
utils.SetupMetrics(&cfg.Metrics)

chain, db := utils.MakeChain(ctx, stack, false)
defer db.Close()

Expand Down
24 changes: 24 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func makeFullNode(ctx *cli.Context) *node.Node {
cfg.Eth.OverrideVerkle = &v
}

// Start metrics export if enabled
utils.SetupMetrics(&cfg.Metrics)

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Create gauge with geth system and build information
Expand Down Expand Up @@ -325,6 +328,27 @@ func applyMetricConfig(ctx *cli.Context, cfg *gethConfig) {
if ctx.IsSet(utils.MetricsInfluxDBOrganizationFlag.Name) {
cfg.Metrics.InfluxDBOrganization = ctx.String(utils.MetricsInfluxDBOrganizationFlag.Name)
}
// Sanity-check the commandline flags. It is fine if some unused fields is part
// of the toml-config, but we expect the commandline to only contain relevant
// arguments, otherwise it indicates an error.
var (
enableExport = ctx.Bool(utils.MetricsEnableInfluxDBFlag.Name)
enableExportV2 = ctx.Bool(utils.MetricsEnableInfluxDBV2Flag.Name)
)
if enableExport || enableExportV2 {
v1FlagIsSet := ctx.IsSet(utils.MetricsInfluxDBUsernameFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBPasswordFlag.Name)

v2FlagIsSet := ctx.IsSet(utils.MetricsInfluxDBTokenFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBOrganizationFlag.Name) ||
ctx.IsSet(utils.MetricsInfluxDBBucketFlag.Name)

if enableExport && v2FlagIsSet {
utils.Fatalf("Flags --influxdb.metrics.organization, --influxdb.metrics.token, --influxdb.metrics.bucket are only available for influxdb-v2")
} else if enableExportV2 && v1FlagIsSet {
utils.Fatalf("Flags --influxdb.metrics.username, --influxdb.metrics.password are only available for influxdb-v1")
}
}
}

func setAccountManagerBackends(conf *node.Config, am *accounts.Manager, keydir string) error {
Expand Down
7 changes: 0 additions & 7 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"go.uber.org/automaxprocs/maxprocs"

Expand Down Expand Up @@ -325,12 +324,6 @@ func prepare(ctx *cli.Context) {
ctx.Set(utils.CacheFlag.Name, strconv.Itoa(4096))
}
}

// Start metrics export if enabled
utils.SetupMetrics(ctx)

// Start system runtime metrics collection
go metrics.CollectProcessMetrics(3 * time.Second)
}

// geth is the main entry point into the system if no special subcommand is run.
Expand Down
103 changes: 46 additions & 57 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,67 +1968,56 @@ func RegisterFullSyncTester(stack *node.Node, eth *eth.Ethereum, target common.H
log.Info("Registered full-sync tester", "hash", target)
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")

var (
enableExport = ctx.Bool(MetricsEnableInfluxDBFlag.Name)
enableExportV2 = ctx.Bool(MetricsEnableInfluxDBV2Flag.Name)
)

if enableExport || enableExportV2 {
CheckExclusive(ctx, MetricsEnableInfluxDBFlag, MetricsEnableInfluxDBV2Flag)

v1FlagIsSet := ctx.IsSet(MetricsInfluxDBUsernameFlag.Name) ||
ctx.IsSet(MetricsInfluxDBPasswordFlag.Name)

v2FlagIsSet := ctx.IsSet(MetricsInfluxDBTokenFlag.Name) ||
ctx.IsSet(MetricsInfluxDBOrganizationFlag.Name) ||
ctx.IsSet(MetricsInfluxDBBucketFlag.Name)

if enableExport && v2FlagIsSet {
Fatalf("Flags --influxdb.metrics.organization, --influxdb.metrics.token, --influxdb.metrics.bucket are only available for influxdb-v2")
} else if enableExportV2 && v1FlagIsSet {
Fatalf("Flags --influxdb.metrics.username, --influxdb.metrics.password are only available for influxdb-v1")
}
}

var (
endpoint = ctx.String(MetricsInfluxDBEndpointFlag.Name)
database = ctx.String(MetricsInfluxDBDatabaseFlag.Name)
username = ctx.String(MetricsInfluxDBUsernameFlag.Name)
password = ctx.String(MetricsInfluxDBPasswordFlag.Name)

token = ctx.String(MetricsInfluxDBTokenFlag.Name)
bucket = ctx.String(MetricsInfluxDBBucketFlag.Name)
organization = ctx.String(MetricsInfluxDBOrganizationFlag.Name)
)

if enableExport {
tagsMap := SplitTagsFlag(ctx.String(MetricsInfluxDBTagsFlag.Name))

log.Info("Enabling metrics export to InfluxDB")

go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "geth.", tagsMap)
} else if enableExportV2 {
tagsMap := SplitTagsFlag(ctx.String(MetricsInfluxDBTagsFlag.Name))

log.Info("Enabling metrics export to InfluxDB (v2)")

go influxdb.InfluxDBV2WithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, token, bucket, organization, "geth.", tagsMap)
}
// SetupMetrics configures the metrics system.
func SetupMetrics(cfg *metrics.Config) {
if !cfg.Enabled {
return
}
log.Info("Enabling metrics collection")
metrics.Enable()

if ctx.IsSet(MetricsHTTPFlag.Name) {
address := net.JoinHostPort(ctx.String(MetricsHTTPFlag.Name), fmt.Sprintf("%d", ctx.Int(MetricsPortFlag.Name)))
log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address)
exp.Setup(address)
} else if ctx.IsSet(MetricsPortFlag.Name) {
log.Warn(fmt.Sprintf("--%s specified without --%s, metrics server will not start.", MetricsPortFlag.Name, MetricsHTTPFlag.Name))
}
// InfluxDB exporter.
var (
enableExport = cfg.EnableInfluxDB
enableExportV2 = cfg.EnableInfluxDBV2
)
if cfg.EnableInfluxDB && cfg.EnableInfluxDBV2 {
Fatalf("Flags %v can't be used at the same time", strings.Join([]string{MetricsEnableInfluxDBFlag.Name, MetricsEnableInfluxDBV2Flag.Name}, ", "))
}
var (
endpoint = cfg.InfluxDBEndpoint
database = cfg.InfluxDBDatabase
username = cfg.InfluxDBUsername
password = cfg.InfluxDBPassword

token = cfg.InfluxDBToken
bucket = cfg.InfluxDBBucket
organization = cfg.InfluxDBOrganization
tagsMap = SplitTagsFlag(cfg.InfluxDBTags)
)
if enableExport {
log.Info("Enabling metrics export to InfluxDB")
go influxdb.InfluxDBWithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "geth.", tagsMap)
} else if enableExportV2 {
tagsMap := SplitTagsFlag(cfg.InfluxDBTags)
log.Info("Enabling metrics export to InfluxDB (v2)")
go influxdb.InfluxDBV2WithTags(metrics.DefaultRegistry, 10*time.Second, endpoint, token, bucket, organization, "geth.", tagsMap)
}

// Expvar exporter.
if cfg.HTTP != "" {
address := net.JoinHostPort(cfg.HTTP, fmt.Sprintf("%d", cfg.Port))
log.Info("Enabling stand-alone metrics HTTP endpoint", "address", address)
exp.Setup(address)
} else if cfg.HTTP == "" && cfg.Port != 0 {
log.Warn(fmt.Sprintf("--%s specified without --%s, metrics server will not start.", MetricsPortFlag.Name, MetricsHTTPFlag.Name))
}

// Enable system metrics collection.
go metrics.CollectProcessMetrics(3 * time.Second)
}

// SplitTagsFlag parses a comma-separated list of k=v metrics tags.
func SplitTagsFlag(tagsFlag string) map[string]string {
tags := strings.Split(tagsFlag, ",")
tagsMap := map[string]string{}
Expand Down
12 changes: 6 additions & 6 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,24 @@ type freezerTable struct {
headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file

headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge metrics.Gauge // Gauge for tracking the combined size of all freezer tables
headBytes int64 // Number of bytes written to the head file
readMeter *metrics.Meter // Meter for measuring the effective amount of data read
writeMeter *metrics.Meter // Meter for measuring the effective amount of data written
sizeGauge *metrics.Gauge // Gauge for tracking the combined size of all freezer tables

logger log.Logger // Logger with database path and table name embedded
lock sync.RWMutex // Mutex protecting the data file descriptors
}

// newFreezerTable opens the given path as a freezer table.
func newFreezerTable(path, name string, disableSnappy, readonly bool) (*freezerTable, error) {
return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy, readonly)
return newTable(path, name, metrics.NewInactiveMeter(), metrics.NewInactiveMeter(), metrics.NewGauge(), freezerTableSize, disableSnappy, readonly)
}

// newTable opens a freezer table, creating the data and index files if they are
// non-existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
func newTable(path string, name string, readMeter, writeMeter *metrics.Meter, sizeGauge *metrics.Gauge, maxFilesize uint32, noCompression, readonly bool) (*freezerTable, error) {
// Ensure the containing directory exists and open the indexEntry file
if err := os.MkdirAll(path, 0755); err != nil {
return nil, err
Expand Down
32 changes: 16 additions & 16 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ type triePrefetcher struct {
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests

deliveryMissMeter metrics.Meter

accountLoadReadMeter metrics.Meter
accountLoadWriteMeter metrics.Meter
accountDupReadMeter metrics.Meter
accountDupWriteMeter metrics.Meter
accountDupCrossMeter metrics.Meter
accountWasteMeter metrics.Meter

storageLoadReadMeter metrics.Meter
storageLoadWriteMeter metrics.Meter
storageDupReadMeter metrics.Meter
storageDupWriteMeter metrics.Meter
storageDupCrossMeter metrics.Meter
storageWasteMeter metrics.Meter
deliveryMissMeter *metrics.Meter

accountLoadReadMeter *metrics.Meter
accountLoadWriteMeter *metrics.Meter
accountDupReadMeter *metrics.Meter
accountDupWriteMeter *metrics.Meter
accountDupCrossMeter *metrics.Meter
accountWasteMeter *metrics.Meter

storageLoadReadMeter *metrics.Meter
storageLoadWriteMeter *metrics.Meter
storageDupReadMeter *metrics.Meter
storageDupWriteMeter *metrics.Meter
storageDupCrossMeter *metrics.Meter
storageWasteMeter *metrics.Meter
}

func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (p *triePrefetcher) terminate(async bool) {

// report aggregates the pre-fetching and usage metrics and reports them.
func (p *triePrefetcher) report() {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
for _, fetcher := range p.fetchers {
Expand Down
4 changes: 2 additions & 2 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return ErrAlreadyReserved
}
p.reservations[addr] = subpool
if metrics.Enabled {
if metrics.Enabled() {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Inc(1)
}
Expand All @@ -143,7 +143,7 @@ func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return errors.New("address not owned")
}
delete(p.reservations, addr)
if metrics.Enabled {
if metrics.Enabled() {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Dec(1)
}
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, recei
// to access the queue, so they already need a lock anyway.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter,
reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter,
results int, validate func(index int, header *types.Header) error,
reconstruct func(index int, result *fetchResult)) (int, error) {
// Short circuit if the data was never requested
Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error
defer h.decHandlers()

if err := h.peers.registerSnapExtension(peer); err != nil {
if metrics.Enabled {
if metrics.Enabled() {
if peer.Inbound() {
snap.IngressRegistrationErrorMeter.Mark(1)
} else {
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func handleMessage(backend Backend, peer *Peer) error {
var handlers = eth68

// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
if metrics.Enabled() {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
defer func(start time.Time) {
sampler := func() metrics.Sample {
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H

// markError registers the error with the corresponding metric.
func markError(p *Peer, err error) {
if !metrics.Enabled {
if !metrics.Enabled() {
return
}
m := meters.get(p.Inbound())
Expand Down
12 changes: 6 additions & 6 deletions eth/protocols/eth/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,23 @@ func (h *bidirectionalMeters) get(ingress bool) *hsMeters {
type hsMeters struct {
// peerError measures the number of errors related to incorrect peer
// behaviour, such as invalid message code, size, encoding, etc.
peerError metrics.Meter
peerError *metrics.Meter

// timeoutError measures the number of timeouts.
timeoutError metrics.Meter
timeoutError *metrics.Meter

// networkIDMismatch measures the number of network id mismatch errors.
networkIDMismatch metrics.Meter
networkIDMismatch *metrics.Meter

// protocolVersionMismatch measures the number of differing protocol
// versions.
protocolVersionMismatch metrics.Meter
protocolVersionMismatch *metrics.Meter

// genesisMismatch measures the number of differing genesises.
genesisMismatch metrics.Meter
genesisMismatch *metrics.Meter

// forkidRejected measures the number of differing forkids.
forkidRejected metrics.Meter
forkidRejected *metrics.Meter
}

// newHandshakeMeters registers and returns handshake meters for the given
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/snap/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func HandleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()
start := time.Now()
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {
if metrics.Enabled() {
h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code)
defer func(start time.Time) {
sampler := func() metrics.Sample {
Expand Down
Loading

0 comments on commit 9045b79

Please sign in to comment.