Skip to content

Commit

Permalink
Add block production and consensus cycle metrics (#1516)
Browse files Browse the repository at this point in the history
### Description

Adds better metrics for load testing.
Also adds a command line option `--metrics.loadtestcsvfile` to specify a file to write out information about the block production and consensus cycle in a CSV format. This was previously on a separate branch that was cherry-picked onto the branch that we wanted to loadtest.

### Tested

Local load test.

### Related issues

- Fixes #1511
  • Loading branch information
Joshua Gutow authored Apr 21, 2021
1 parent 32f9d89 commit ede6fa2
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 38 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ var (
utils.MetricsInfluxDBUsernameFlag,
utils.MetricsInfluxDBPasswordFlag,
utils.MetricsInfluxDBTagsFlag,
utils.MetricsLoadTestCSVFlag,
}
)

Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,11 @@ var (
Usage: "Comma-separated InfluxDB tags (key/values) attached to all measurements",
Value: "host=localhost",
}
MetricsLoadTestCSVFlag = cli.StringFlag{
Name: "metrics.loadtestcsvfile",
Usage: "Write a csv with information about the block production cycle to the given file name. If passed an empty string or non-existent, do not output csv metrics.",
Value: "",
}

EWASMInterpreterFlag = cli.StringFlag{
Name: "vm.ewasm",
Expand Down Expand Up @@ -1453,6 +1458,9 @@ func setIstanbul(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.Istanbul.RoundStateDBPath = stack.ResolvePath(cfg.Istanbul.RoundStateDBPath)
cfg.Istanbul.Validator = ctx.GlobalIsSet(MiningEnabledFlag.Name) || ctx.GlobalIsSet(DeveloperFlag.Name)
cfg.Istanbul.Replica = ctx.GlobalIsSet(IstanbulReplicaFlag.Name)
if ctx.GlobalIsSet(MetricsLoadTestCSVFlag.Name) {
cfg.Istanbul.LoadTestCSVFile = ctx.GlobalString(MetricsLoadTestCSVFlag.Name)
}
}

func setProxyP2PConfig(ctx *cli.Context, proxyCfg *p2p.Config) {
Expand Down
42 changes: 42 additions & 0 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/big"
"os"
"sync"
"time"

Expand Down Expand Up @@ -111,6 +112,15 @@ func New(config *istanbul.Config, db ethdb.Database) consensus.Istanbul {
blocksDowntimeEventMeter: metrics.NewRegisteredMeter("consensus/istanbul/blocks/downtimeevent", nil),
blocksFinalizedTransactionsGauge: metrics.NewRegisteredGauge("consensus/istanbul/blocks/transactions", nil),
blocksFinalizedGasUsedGauge: metrics.NewRegisteredGauge("consensus/istanbul/blocks/gasused", nil),
sleepGauge: metrics.NewRegisteredGauge("consensus/istanbul/backend/sleep", nil),
}
if config.LoadTestCSVFile != "" {
if f, err := os.Create(config.LoadTestCSVFile); err == nil {
backend.csvRecorder = metrics.NewCSVRecorder(f, "blockNumber", "txCount", "gasUsed", "round",
"cycle", "sleep", "consensus", "block_verify", "block_construct",
"sysload", "syswait", "procload")
}

}

backend.core = istanbulCore.New(backend, backend.config)
Expand Down Expand Up @@ -276,6 +286,14 @@ type Backend struct {
// Gauge counting the gas used in the last block
blocksFinalizedGasUsedGauge metrics.Gauge

// Gauge reporting how many nanoseconds were spent sleeping
sleepGauge metrics.Gauge
// Start of the previous block cycle.
cycleStart time.Time

// Consensus csv recorded for load testing
csvRecorder *metrics.CSVRecorder

// Cache for the return values of the method RetrieveValidatorConnSet
cachedValidatorConnSet map[common.Address]bool
cachedValidatorConnSetBlockNum uint64
Expand Down Expand Up @@ -404,6 +422,9 @@ func (sb *Backend) Close() error {
errs = append(errs, err)
}
}
if err := sb.csvRecorder.Close(); err != nil {
errs = append(errs, err)
}
var concatenatedErrs error
for i, err := range errs {
if i == 0 {
Expand Down Expand Up @@ -483,6 +504,9 @@ func (sb *Backend) Commit(proposal istanbul.Proposal, aggregatedSeal types.Istan
Bitmap: aggregatedEpochValidatorSetSeal.Bitmap,
Signature: aggregatedEpochValidatorSetSeal.Signature,
})
if sb.csvRecorder != nil {
sb.recordBlockProductionTimes(block.Header().Number.Uint64(), len(block.Transactions()), block.GasUsed(), aggregatedSeal.Round.Uint64())
}

sb.logger.Info("Committed", "address", sb.Address(), "round", aggregatedSeal.Round.Uint64(), "hash", proposal.Hash(), "number", proposal.Number().Uint64())
// - if the proposed and committed blocks are the same, send the proposed hash
Expand Down Expand Up @@ -1013,3 +1037,21 @@ func (sb *Backend) UpdateReplicaState(seq *big.Int) {
sb.replicaState.NewChainHead(seq)
}
}

// recordBlockProductionTimes records information about the block production cycle and reports it through the CSVRecorder
func (sb *Backend) recordBlockProductionTimes(blockNumber uint64, txCount int, gasUsed, round uint64) {
cycle := time.Since(sb.cycleStart)
sb.cycleStart = time.Now()
sleepGauge := sb.sleepGauge
consensusGauge := metrics.Get("consensus/istanbul/core/consensus_commit").(metrics.Gauge)
verifyGauge := metrics.Get("consensus/istanbul/core/verify").(metrics.Gauge)
blockConstructGauge := metrics.Get("miner/worker/block_construct").(metrics.Gauge)
cpuSysLoadGauge := metrics.Get("system/cpu/sysload").(metrics.Gauge)
cpuSysWaitGauge := metrics.Get("system/cpu/syswait").(metrics.Gauge)
cpuProcLoadGauge := metrics.Get("system/cpu/procload").(metrics.Gauge)

sb.csvRecorder.Write(blockNumber, txCount, gasUsed, round,
cycle.Nanoseconds(), sleepGauge.Value(), consensusGauge.Value(), verifyGauge.Value(), blockConstructGauge.Value(),
cpuSysLoadGauge.Value(), cpuSysWaitGauge.Value(), cpuProcLoadGauge.Value())

}
5 changes: 5 additions & 0 deletions consensus/istanbul/backend/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ func (sb *Backend) Prepare(chain consensus.ChainReader, header *types.Header) er
// wait for the timestamp of header, use this to adjust the block period
delay := time.Unix(int64(header.Time), 0).Sub(now())
time.Sleep(delay)
if delay < 0 {
sb.sleepGauge.Update(0)
} else {
sb.sleepGauge.Update(delay.Nanoseconds())
}

return sb.addParentSeal(chain, header)
}
Expand Down
4 changes: 4 additions & 0 deletions consensus/istanbul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Config struct {
AnnounceQueryEnodeGossipPeriod uint64 `toml:",omitempty"` // Time duration (in seconds) between gossiped query enode messages
AnnounceAggressiveQueryEnodeGossipOnEnablement bool `toml:",omitempty"` // Specifies if this node should aggressively query enodes on announce enablement
AnnounceAdditionalValidatorsToGossip int64 `toml:",omitempty"` // Specifies the number of additional non-elected validators to gossip an announce

// Load test config
LoadTestCSVFile string `toml:",omitempty"` // If non-empty, specifies the file to write out csv metrics about the block production cycle to.
}

// ProxyConfig represents the configuration for validator's proxies
Expand Down Expand Up @@ -96,6 +99,7 @@ var DefaultConfig = &Config{
AnnounceQueryEnodeGossipPeriod: 300, // 5 minutes
AnnounceAggressiveQueryEnodeGossipOnEnablement: true,
AnnounceAdditionalValidatorsToGossip: 10,
LoadTestCSVFile: "", // disable by default
}

//ApplyParamsChainConfigToConfig applies the istanbul config values from params.chainConfig to the istanbul.Config config
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"math/big"
"reflect"
"time"

"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/consensus/istanbul"
Expand Down Expand Up @@ -137,6 +138,7 @@ func (c *core) broadcastCommit(sub *istanbul.Subject) {
}

func (c *core) handleCommit(msg *istanbul.Message) error {
defer c.handleCommitTimer.UpdateSince(time.Now())
// Decode COMMIT message
var commit *istanbul.CommittedSubject
err := msg.Decode(&commit)
Expand Down
50 changes: 32 additions & 18 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,15 @@ type core struct {

consensusTimestamp time.Time

// the timer to record consensus duration (from accepting a preprepare to final committed stage)
consensusTimer metrics.Timer
// Time from accepting a pre-prepare (after block verifcation) to preparing or committing
consensusPrepareTimeGauge metrics.Gauge
consensusCommitTimeGauge metrics.Gauge
// Time to verify blocks. Only records cache misses.
verifyGauge metrics.Gauge
// Historgram of the time to handle each message type
handlePrePrepareTimer metrics.Timer
handlePrepareTimer metrics.Timer
handleCommitTimer metrics.Timer
}

// New creates an Istanbul consensus core
Expand All @@ -146,17 +153,22 @@ func New(backend CoreBackend, config *istanbul.Config) Engine {
}

c := &core{
config: config,
address: backend.Address(),
logger: log.New(),
selectProposer: validator.GetProposerSelector(config.ProposerPolicy),
handlerWg: new(sync.WaitGroup),
backend: backend,
pendingRequests: prque.New(nil),
pendingRequestsMu: new(sync.Mutex),
consensusTimestamp: time.Time{},
rsdb: rsdb,
consensusTimer: metrics.NewRegisteredTimer("consensus/istanbul/core/consensus", nil),
config: config,
address: backend.Address(),
logger: log.New(),
selectProposer: validator.GetProposerSelector(config.ProposerPolicy),
handlerWg: new(sync.WaitGroup),
backend: backend,
pendingRequests: prque.New(nil),
pendingRequestsMu: new(sync.Mutex),
consensusTimestamp: time.Time{},
rsdb: rsdb,
consensusPrepareTimeGauge: metrics.NewRegisteredGauge("consensus/istanbul/core/consensus_prepare", nil),
consensusCommitTimeGauge: metrics.NewRegisteredGauge("consensus/istanbul/core/consensus_commit", nil),
verifyGauge: metrics.NewRegisteredGauge("consensus/istanbul/core/verify", nil),
handlePrePrepareTimer: metrics.NewRegisteredTimer("consensus/istanbul/core/handle_preprepare", nil),
handlePrepareTimer: metrics.NewRegisteredTimer("consensus/istanbul/core/handle_prepare", nil),
handleCommitTimer: metrics.NewRegisteredTimer("consensus/istanbul/core/handle_commit", nil),
}
msgBacklog := newMsgBacklog(
func(msg *istanbul.Message) {
Expand Down Expand Up @@ -358,6 +370,12 @@ func (c *core) commit() error {
return err
}

// Update metrics.
if !c.consensusTimestamp.IsZero() {
c.consensusCommitTimeGauge.Update(time.Since(c.consensusTimestamp).Nanoseconds())
c.consensusTimestamp = time.Time{}
}

// Process Backlog Messages
c.backlog.updateState(c.current.View(), c.current.State())

Expand Down Expand Up @@ -529,11 +547,6 @@ func (c *core) startNewSequence() error {
// This function is called on a final committed event which should occur once the block is inserted into the chain.
return nil
}
// Update metrics.
if !c.consensusTimestamp.IsZero() {
c.consensusTimer.UpdateSince(c.consensusTimestamp)
c.consensusTimestamp = time.Time{}
}

// Generate next view and preprepare
newView := &istanbul.View{
Expand Down Expand Up @@ -781,6 +794,7 @@ func (c *core) verifyProposal(proposal istanbul.Proposal) (time.Duration, error)
return 0, verificationStatus
} else {
logger.Trace("verification status cache miss")
defer func(start time.Time) { c.verifyGauge.Update(time.Since(start).Nanoseconds()) }(time.Now())

duration, err := c.backend.Verify(proposal)
logger.Trace("proposal verify return values", "duration", duration, "err", err)
Expand Down
6 changes: 6 additions & 0 deletions consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"reflect"
"time"

"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/consensus/istanbul"
Expand Down Expand Up @@ -173,6 +174,7 @@ func (c *core) getViewFromVerifiedPreparedCertificate(preparedCertificate istanb
}

func (c *core) handlePrepare(msg *istanbul.Message) error {
defer c.handlePrepareTimer.UpdateSince(time.Now())
// Decode PREPARE message
var prepare *istanbul.Subject
err := msg.Decode(&prepare)
Expand Down Expand Up @@ -211,6 +213,10 @@ func (c *core) handlePrepare(msg *istanbul.Message) error {
return err
}
logger.Trace("Got quorum prepares or commits", "tag", "stateTransition")
// Update metrics.
if !c.consensusTimestamp.IsZero() {
c.consensusPrepareTimeGauge.Update(time.Since(c.consensusTimestamp).Nanoseconds())
}

// Process Backlog Messages
c.backlog.updateState(c.current.View(), c.current.State())
Expand Down
2 changes: 2 additions & 0 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (c *core) sendPreprepare(request *istanbul.Request, roundChangeCertificate
}

func (c *core) handlePreprepare(msg *istanbul.Message) error {
defer c.handlePrePrepareTimer.UpdateSince(time.Now())

logger := c.newLogger("func", "handlePreprepare", "tag", "handleMsg", "from", msg.Address)
logger.Trace("Got preprepare message", "m", msg)

Expand Down
73 changes: 73 additions & 0 deletions metrics/csv_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 The Celo Authors
// This file is part of the celo library.
//
// The celo library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The celo library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the celo library. If not, see <http://www.gnu.org/licenses/>.

package metrics

import (
"encoding/csv"
"fmt"
"io"
"sync"
)

type WriterCloser interface {
io.Writer
io.Closer
}

// A CSVRecorder enables easy writing of CSV data a specified writer.
// The header is written on creation. Writing is thread safe.
type CSVRecorder struct {
writer csv.Writer
backingWC WriterCloser
writeMu sync.Mutex
}

// NewCSVRecorder creates a CSV recorder that writes to the supplied writer.
// The writer is retained and can be closed by calling CSVRecorder.Close()
// The header is immediately written upon construction.
func NewCSVRecorder(wc WriterCloser, fields ...string) *CSVRecorder {
c := &CSVRecorder{
writer: *csv.NewWriter(wc),
backingWC: wc,
}
c.writer.Write(fields)
return c
}

// WriteRow writes out as csv row. Will convert the values to a string using "%v".
func (c *CSVRecorder) Write(values ...interface{}) {
if c == nil {
return
}
strs := make([]string, 0, len(values))
for _, v := range values {
strs = append(strs, fmt.Sprintf("%v", v))
}

c.writeMu.Lock()
defer c.writeMu.Unlock()
c.writer.Write(strs)
}

// Close closes the writer. This is a no-op for a nil receiver.
func (c *CSVRecorder) Close() error {
if c == nil {
return nil
}
c.writer.Flush()
return c.backingWC.Close()
}
Loading

0 comments on commit ede6fa2

Please sign in to comment.