Skip to content

Commit

Permalink
add more metrics for compact blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters committed Jul 30, 2024
1 parent 1fe8522 commit 1846523
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
26 changes: 26 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type Metrics struct {

// The amount of proposals that failed to be received in time
TimedOutProposals metrics.Counter

CompactBlocksReceived metrics.Counter
CompactBlocksSent metrics.Counter
CompactBlocksFailed metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -279,6 +283,24 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "timed_out_proposals",
Help: "Number of proposals that failed to be received in time",
}, labels).With(labelsAndValues...),
CompactBlocksReceived: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "compact_blocks_received",
Help: "Number of compact blocks received by the node",
}, labels).With(labelsAndValues...),
CompactBlocksSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "compact_blocks_sent",
Help: "Number of compact blocks sent by the node",
}, labels).With(labelsAndValues...),
CompactBlocksFailed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "compact_blocks_failed",
Help: "Number of compact blocks failed to be received by the node",
}, labels).With(labelsAndValues...),
}
}

Expand Down Expand Up @@ -317,6 +339,10 @@ func NopMetrics() *Metrics {
FullPrevoteMessageDelay: discard.NewGauge(),
ApplicationRejectedProposals: discard.NewCounter(),
TimedOutProposals: discard.NewCounter(),

CompactBlocksReceived: discard.NewCounter(),
CompactBlocksSent: discard.NewCounter(),
CompactBlocksFailed: discard.NewCounter(),
}
}

Expand Down
3 changes: 1 addition & 2 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ OUTER_LOOP:
},
}, logger) {
ps.SetHasBlock(prs.Height, prs.Round)
conR.conS.jsonMetrics.SentCompactBlocks++
conR.conS.metrics.CompactBlocksSent.Add(1)
}
continue OUTER_LOOP
}
Expand Down Expand Up @@ -841,7 +841,6 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
string(peer.ID()),
schema.Upload,
)
conR.conS.jsonMetrics.SentBlockParts++
} else {
logger.Debug("Sending block part for catchup failed")
// sleep to avoid retrying too fast
Expand Down
13 changes: 2 additions & 11 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ type State struct {

// for reporting metrics
metrics *Metrics
jsonMetrics *JSONMetrics
traceClient trace.Tracer
}

Expand Down Expand Up @@ -206,7 +205,6 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
jsonMetrics: NewJSONMetrics(path),
traceClient: trace.NoOpTracer(),
}

Expand Down Expand Up @@ -1801,12 +1799,6 @@ func (cs *State) finalizeCommit(height int64) {
// NewHeightStep!
cs.updateToState(stateCopy)

cs.jsonMetrics.Blocks++
// Save every 20 blocks
if cs.Height%20 == 0 {
cs.jsonMetrics.Save()
}

fail.Fail() // XXX

// Private validator might have changed it's key pair => refetch pubkey.
Expand Down Expand Up @@ -1979,7 +1971,7 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
func (cs *State) addCompactBlock(msg *CompactBlockMessage, peerID p2p.ID) error {
compactBlock := msg.Block
height := compactBlock.Height
cs.jsonMetrics.ReceivedCompactBlocks++
cs.metrics.CompactBlocksReceived.Add(1)

if cs.ProposalBlock != nil {
// We already have the proposal block.
Expand Down Expand Up @@ -2020,7 +2012,7 @@ func (cs *State) addCompactBlock(msg *CompactBlockMessage, peerID p2p.ID) error

cs.mtx.Lock()
if err != nil {
cs.jsonMetrics.CompactBlockFailures++
cs.metrics.CompactBlocksFailed.Add(1)
if ctx.Err() != nil {
cs.Logger.Info("failed to fetch transactions within the timeout", "timeout", timeout)
return nil
Expand Down Expand Up @@ -2101,7 +2093,6 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
}

cs.metrics.BlockGossipPartsReceived.With("matches_current", "true").Add(1)
cs.jsonMetrics.ReceivedBlockParts++

if cs.ProposalBlockParts.ByteSize() > cs.state.ConsensusParams.Block.MaxBytes {
return added, fmt.Errorf("total size of proposal block parts exceeds maximum block bytes (%d > %d)",
Expand Down
15 changes: 3 additions & 12 deletions mempool/cat/block_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,13 @@ func (memR *Reactor) FetchTxsFromKeys(ctx context.Context, blockID []byte, compa
}
}
memR.Logger.Info("fetching transactions from peers", "blockID", blockID, "numTxs", len(txs), "numMissing", len(missingKeys))
memR.mempool.metrics.MissingTxs.Add(float64(len(missingKeys)))

memR.mempool.jsonMetrics.Lock()
memR.mempool.jsonMetrics.TransactionsMissing = append(memR.mempool.jsonMetrics.TransactionsMissing, uint64(len(missingKeys)))
memR.mempool.jsonMetrics.Transactions = append(memR.mempool.jsonMetrics.Transactions, uint64(len(compactData)))
// Check if we got lucky and already had all the transactions.
if len(missingKeys) == 0 {
memR.mempool.jsonMetrics.TimeTakenFetchingTxs = append(memR.mempool.jsonMetrics.TimeTakenFetchingTxs, 0)
memR.mempool.jsonMetrics.Unlock()
memR.Logger.Info("fetched all txs, none missing", "blockID", blockID)
return txs, nil
}
memR.mempool.jsonMetrics.Unlock()

// setup a request for this block and begin to track and retrieve all missing transactions
request := memR.blockFetcher.newRequest(
Expand All @@ -60,12 +56,7 @@ func (memR *Reactor) FetchTxsFromKeys(ctx context.Context, blockID []byte, compa
)
defer func() {
timeTaken := request.TimeTaken()
if timeTaken == 0 {
return
}
memR.mempool.jsonMetrics.Lock()
memR.mempool.jsonMetrics.TimeTakenFetchingTxs = append(memR.mempool.jsonMetrics.TimeTakenFetchingTxs, timeTaken)
memR.mempool.jsonMetrics.Unlock()
memR.Logger.Info("fetched txs", "timeTaken", timeTaken, "numMissing", len(missingKeys))
}()

// request the missing transactions if we haven't already
Expand Down
6 changes: 0 additions & 6 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type TxPool struct {
config *config.MempoolConfig
proxyAppConn proxy.AppConnMempool
metrics *mempool.Metrics
jsonMetrics *mempool.JSONMetrics

// these values are modified once per height
updateMtx sync.Mutex
Expand Down Expand Up @@ -111,7 +110,6 @@ func NewTxPool(
store: newStore(),
broadcastCh: make(chan *wrappedTx),
txsToBeBroadcast: make([]types.TxKey, 0),
jsonMetrics: mempool.NewJSONMetrics(path),
}

for _, opt := range options {
Expand Down Expand Up @@ -559,10 +557,6 @@ func (txmp *TxPool) Update(
txmp.notifyTxsAvailable()
}
}
// save every 20 blocks
if blockHeight%20 == 0 {
txmp.jsonMetrics.Save()
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Metrics struct {
// RerequestedTxs defines the number of times that a requested tx
// never received a response in time and a new request was made.
RerequestedTxs metrics.Counter

// MissingTxs defines the number of transactions that were not found in the mempool
// from the current proposal
MissingTxs metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down

0 comments on commit 1846523

Please sign in to comment.