Skip to content

Commit

Permalink
EVM-675 Implement additional metrics (#1564)
Browse files Browse the repository at this point in the history
* Consensus metrics

* P2P and Syncer metrics

* TxPool metrics

* JsonRpc metrics

* Small fix

* Lint fix

* Comments fix

* Comments fix

* Comments fix
  • Loading branch information
goran-ethernal authored Jun 7, 2023
1 parent 2c2c45d commit ef33b05
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 3 deletions.
3 changes: 3 additions & 0 deletions consensus/polybft/blockchain_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (p *blockchainWrapper) CommitBlock(block *types.FullBlock) error {
func (p *blockchainWrapper) ProcessBlock(parent *types.Header, block *types.Block,
callback func(*state.Transition) error) (*types.FullBlock, error) {
header := block.Header.Copy()
start := time.Now().UTC()

transition, err := p.executor.BeginTxn(parent.StateRoot, header, types.BytesToAddress(header.Miner))
if err != nil {
Expand All @@ -108,6 +109,8 @@ func (p *blockchainWrapper) ProcessBlock(parent *types.Header, block *types.Bloc

_, root := transition.Commit()

updateBlockExecutionMetric(start)

if root != block.Header.StateRoot {
return nil, fmt.Errorf("incorrect state root: (%s, %s)", root, block.Header.StateRoot)
}
Expand Down
7 changes: 7 additions & 0 deletions consensus/polybft/consensus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func updateBlockMetrics(currentBlock *types.Block, parentHeader *types.Header) e
metrics.SetGauge([]string{consensusMetricsPrefix, "rounds"}, float32(extra.Checkpoint.BlockRound))
metrics.SetGauge([]string{consensusMetricsPrefix, "chain_head"}, float32(currentBlock.Number()))
metrics.IncrCounter([]string{consensusMetricsPrefix, "block_counter"}, float32(1))
metrics.SetGauge([]string{consensusMetricsPrefix, "block_space_used"}, float32(currentBlock.Header.GasUsed))

return nil
}
Expand All @@ -46,3 +47,9 @@ func updateEpochMetrics(epoch epochMetadata) {
// update number of validators metrics
metrics.SetGauge([]string{consensusMetricsPrefix, "validators"}, float32(epoch.Validators.Len()))
}

// updateBlockExecutionMetric updates the block execution metric
func updateBlockExecutionMetric(start time.Time) {
metrics.SetGauge([]string{consensusMetricsPrefix, "block_execution_time"},
float32(time.Now().UTC().Sub(start).Seconds()))
}
5 changes: 5 additions & 0 deletions consensus/polybft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/0xPolygon/polygon-edge/contracts"
"github.com/0xPolygon/polygon-edge/state"
"github.com/0xPolygon/polygon-edge/types"
"github.com/armon/go-metrics"
hcf "github.com/hashicorp/go-hclog"
)

Expand Down Expand Up @@ -104,6 +105,10 @@ type fsm struct {

// BuildProposal builds a proposal for the current round (used if proposer)
func (f *fsm) BuildProposal(currentRound uint64) ([]byte, error) {
start := time.Now().UTC()
defer metrics.SetGauge([]string{consensusMetricsPrefix, "block_building_time"},
float32(time.Now().UTC().Sub(start).Seconds()))

parent := f.parent

extraParent, err := GetIbftExtra(parent.ExtraData)
Expand Down
10 changes: 9 additions & 1 deletion jsonrpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"math"
"reflect"
"strings"
"time"
"unicode"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
)

Expand Down Expand Up @@ -377,8 +379,14 @@ func (d *Dispatcher) handleReq(req Request) ([]byte, Error) {
ok bool
)

output := fd.fv.Call(inArgs)
start := time.Now().UTC()
output := fd.fv.Call(inArgs) // call rpc endpoint function
// measure execution time of rpc endpoint function
metrics.SetGauge([]string{jsonRPCMetric, req.Method + "_time"}, float32(time.Now().UTC().Sub(start).Seconds()))

if err := getError(output[1]); err != nil {
// measure error on the rpc endpoint function
metrics.IncrCounter([]string{jsonRPCMetric, req.Method + "_errors"}, 1)
d.logInternalError(req.Method, err)

if res := output[0].Interface(); res != nil {
Expand Down
2 changes: 2 additions & 0 deletions jsonrpc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/0xPolygon/polygon-edge/types"
)

const jsonRPCMetric = "json_rpc"

// For union type of transaction and types.Hash
type transactionOrHash interface {
getHash() types.Hash
Expand Down
7 changes: 7 additions & 0 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -61,6 +62,8 @@ func (t *Topic) Publish(obj proto.Message) error {
return err
}

metrics.SetGauge([]string{networkMetrics, "egress_bytes"}, float32(len(data)))

return t.topic.Publish(context.Background(), data)
}

Expand Down Expand Up @@ -98,6 +101,7 @@ func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{},
}

t.logger.Error("failed to get topic", "err", err)
metrics.IncrCounter([]string{networkMetrics, "bad_messages"}, float32(1))

continue
}
Expand All @@ -106,10 +110,13 @@ func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{},
obj := t.createObj()
if err := proto.Unmarshal(msg.Data, obj); err != nil {
t.logger.Error("failed to unmarshal topic", "err", err)
metrics.IncrCounter([]string{networkMetrics, "bad_messages"}, float32(1))

return
}

metrics.SetGauge([]string{networkMetrics, "ingress_bytes"}, float32(len(msg.Data)))

handler(obj, msg.GetFrom())
}()
}
Expand Down
5 changes: 5 additions & 0 deletions syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/0xPolygon/polygon-edge/network/event"
"github.com/0xPolygon/polygon-edge/syncer/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/libp2p/go-libp2p/core/peer"
"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -380,18 +381,22 @@ func blockStreamToChannel(stream proto.SyncPeer_GetBlocksClient) (<-chan *types.
}

if err != nil {
metrics.IncrCounter([]string{syncerMetrics, "bad_message"}, 1)
errorCh <- err

break
}

block, err := fromProto(protoBlock)
if err != nil {
metrics.IncrCounter([]string{syncerMetrics, "bad_block"}, 1)
errorCh <- err

break
}

metrics.SetGauge([]string{syncerMetrics, "ingress_bytes"}, float32(len(protoBlock.Block)))

blockCh <- block
}
}()
Expand Down
2 changes: 2 additions & 0 deletions syncer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/0xPolygon/polygon-edge/network/grpc"
"github.com/0xPolygon/polygon-edge/syncer/proto"
"github.com/0xPolygon/polygon-edge/types"
"github.com/armon/go-metrics"
"github.com/golang/protobuf/ptypes/empty"
)

Expand Down Expand Up @@ -64,6 +65,7 @@ func (s *syncPeerService) GetBlocks(
}

resp := toProtoBlock(block)
metrics.SetGauge([]string{syncerMetrics, "egress_bytes"}, float32(len(resp.Block)))

// if client closes stream, context.Canceled is given
if err := stream.Send(resp); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/0xPolygon/polygon-edge/helper/progress"
"github.com/0xPolygon/polygon-edge/network/event"
"github.com/0xPolygon/polygon-edge/types"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/libp2p/go-libp2p/core/peer"
)
Expand Down Expand Up @@ -240,13 +241,18 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, newBlockCallback func(*types.F

fullBlock, err := s.blockchain.VerifyFinalizedBlock(block)
if err != nil {
metrics.IncrCounter([]string{syncerMetrics, "bad_block"}, 1)

return lastReceivedNumber, false, fmt.Errorf("unable to verify block, %w", err)
}

if err := s.blockchain.WriteFullBlock(fullBlock, syncerName); err != nil {
metrics.IncrCounter([]string{syncerMetrics, "bad_block"}, 1)

return lastReceivedNumber, false, fmt.Errorf("failed to write block while bulk syncing: %w", err)
}

updateMetrics(fullBlock)
shouldTerminate = newBlockCallback(fullBlock)

lastReceivedNumber = block.Number()
Expand All @@ -255,3 +261,9 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, newBlockCallback func(*types.F
}
}
}

func updateMetrics(fullBlock *types.FullBlock) {
metrics.SetGauge([]string{syncerMetrics, "tx_num"}, float32(len(fullBlock.Block.Transactions)))
metrics.SetGauge([]string{syncerMetrics, "receipts_num"}, float32(len(fullBlock.Receipts)))
metrics.SetGauge([]string{syncerMetrics, "blocks_num"}, 1)
}
2 changes: 2 additions & 0 deletions syncer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"google.golang.org/protobuf/proto"
)

const syncerMetrics = "syncer"

type Blockchain interface {
// SubscribeEvents subscribes new blockchain event
SubscribeEvents() blockchain.Subscription
Expand Down
7 changes: 5 additions & 2 deletions txpool/slot_gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync/atomic"

"github.com/0xPolygon/polygon-edge/types"
"github.com/armon/go-metrics"
)

const (
Expand All @@ -23,12 +24,14 @@ func (g *slotGauge) read() uint64 {

// increase increases the height of the gauge by the specified slots amount.
func (g *slotGauge) increase(slots uint64) {
atomic.AddUint64(&g.height, slots)
newHeight := atomic.AddUint64(&g.height, slots)
metrics.SetGauge([]string{txPoolMetrics, "slots_used"}, float32(newHeight))
}

// decrease decreases the height of the gauge by the specified slots amount.
func (g *slotGauge) decrease(slots uint64) {
atomic.AddUint64(&g.height, ^(slots - 1))
newHeight := atomic.AddUint64(&g.height, ^(slots - 1))
metrics.SetGauge([]string{txPoolMetrics, "slots_used"}, float32(newHeight))
}

// highPressure checks if the gauge level
Expand Down
Loading

0 comments on commit ef33b05

Please sign in to comment.