From 5e010aa53f8b5be9c726e17191d11c9646d8bd5b Mon Sep 17 00:00:00 2001 From: sanaz Date: Tue, 21 Nov 2023 13:02:02 -0800 Subject: [PATCH] feat: Add standard tracing points to portions of the mempool and consensus (#1055) This PR updates the trace client with a slight refactor (renaming to be more consistent) and then adds a few standard tables to store traced information. We are also able to select which tables we wish to update (and therefore which information to trace) in the config. closes #978 --------- Co-authored-by: Rootul P --- config/config.go | 11 +++ config/toml.go | 4 ++ consensus/reactor.go | 45 ++++++++----- consensus/state.go | 47 +++++++------ mempool/cat/reactor.go | 61 ++++++++++++----- mempool/v1/reactor.go | 50 +++++++++----- mempool/v1/reactor_test.go | 7 +- node/node.go | 20 ++++-- pkg/trace/README.md | 61 ++++++++++++++--- pkg/trace/client.go | 39 ++++++----- pkg/trace/doc.go | 4 +- pkg/trace/schema/consensus.go | 83 +++++++++++++++++++++++ pkg/trace/schema/mempool.go | 122 ++++++++++++++++++++++++++++++++++ pkg/trace/schema/tables.go | 41 ++++++++++++ test/maverick/node/node.go | 1 + 15 files changed, 489 insertions(+), 107 deletions(-) create mode 100644 pkg/trace/schema/consensus.go create mode 100644 pkg/trace/schema/mempool.go create mode 100644 pkg/trace/schema/tables.go diff --git a/config/config.go b/config/config.go index 10fdde9b36..24c490a7e7 100644 --- a/config/config.go +++ b/config/config.go @@ -63,6 +63,12 @@ var ( minSubscriptionBufferSize = 100 defaultSubscriptionBufferSize = 200 + + // DefaultInfluxTables is a list of tables that are used for storing traces. + // This global var is filled by an init function in the schema package. This + // allows for the schema package to contain all the relevant logic while + // avoiding import cycles. + DefaultInfluxTables = []string{} ) // Config defines the top level configuration for a CometBFT node @@ -1194,6 +1200,10 @@ type InstrumentationConfig struct { // InfluxBatchSize is the number of points to write in a single batch. InfluxBatchSize int `mapstructure:"influx_batch_size"` + // InfluxTables is the list of tables that will be traced. See the + // pkg/trace/schema for a complete list of tables. + InfluxTables []string `mapstructure:"influx_tables"` + // PyroscopeURL is the pyroscope url used to establish a connection with a // pyroscope continuous profiling server. PyroscopeURL string `mapstructure:"pyroscope_url"` @@ -1220,6 +1230,7 @@ func DefaultInstrumentationConfig() *InstrumentationConfig { InfluxOrg: "celestia", InfluxBucket: "e2e", InfluxBatchSize: 20, + InfluxTables: DefaultInfluxTables, PyroscopeURL: "", PyroscopeTrace: false, PyroscopeProfileTypes: []string{ diff --git a/config/toml.go b/config/toml.go index e75501672e..13220bd204 100644 --- a/config/toml.go +++ b/config/toml.go @@ -557,6 +557,10 @@ influx_org = "{{ .Instrumentation.InfluxOrg }}" # The size of the batches that are sent to the database. influx_batch_size = {{ .Instrumentation.InfluxBatchSize }} +# The list of tables that are updated when tracing. All available tables and +# their schema can be found in the pkg/trace/schema package. +influx_tables = [{{ range .Instrumentation.InfluxTables }}{{ printf "%q, " . }}{{end}}] + # The URL of the pyroscope instance to use for continuous profiling. # If empty, continuous profiling is disabled. pyroscope_url = "{{ .Instrumentation.PyroscopeURL }}" diff --git a/consensus/reactor.go b/consensus/reactor.go index dab8b06ec5..3fcf48627f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -8,19 +8,20 @@ import ( "time" "github.com/gogo/protobuf/proto" - - cstypes "github.com/cometbft/cometbft/consensus/types" - "github.com/cometbft/cometbft/libs/bits" - cmtevents "github.com/cometbft/cometbft/libs/events" - cmtjson "github.com/cometbft/cometbft/libs/json" - "github.com/cometbft/cometbft/libs/log" - cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/p2p" - cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus" - cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" - sm "github.com/cometbft/cometbft/state" - "github.com/cometbft/cometbft/types" - cmttime "github.com/cometbft/cometbft/types/time" + cstypes "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/libs/bits" + cmtevents "github.com/tendermint/tendermint/libs/events" + cmtjson "github.com/tendermint/tendermint/libs/json" + "github.com/tendermint/tendermint/libs/log" + cmtsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/pkg/trace" + "github.com/tendermint/tendermint/pkg/trace/schema" + cmtcons "github.com/tendermint/tendermint/proto/tendermint/consensus" + cmtproto "github.com/tendermint/tendermint/proto/tendermint/types" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" + cmttime "github.com/tendermint/tendermint/types/time" ) const ( @@ -48,7 +49,8 @@ type Reactor struct { eventBus *types.EventBus rs *cstypes.RoundState - Metrics *Metrics + Metrics *Metrics + traceClient *trace.Client } type ReactorOption func(*Reactor) @@ -57,10 +59,11 @@ type ReactorOption func(*Reactor) // consensusState. func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor { conR := &Reactor{ - conS: consensusState, - waitSync: waitSync, - rs: consensusState.GetRoundState(), - Metrics: NopMetrics(), + conS: consensusState, + waitSync: waitSync, + rs: consensusState.GetRoundState(), + Metrics: NopMetrics(), + traceClient: &trace.Client{}, } conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) @@ -335,6 +338,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) + schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload) conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -591,6 +595,7 @@ OUTER_LOOP: Part: *parts, }, }, logger) { + schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } continue OUTER_LOOP @@ -1023,6 +1028,10 @@ func ReactorMetrics(metrics *Metrics) ReactorOption { return func(conR *Reactor) { conR.Metrics = metrics } } +func ReactorTracing(traceClient *trace.Client) ReactorOption { + return func(conR *Reactor) { conR.traceClient = traceClient } +} + //----------------------------------------------------------------------------- var ( diff --git a/consensus/state.go b/consensus/state.go index 3e73876107..0d66e5e764 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -12,23 +12,24 @@ import ( "github.com/gogo/protobuf/proto" - cfg "github.com/cometbft/cometbft/config" - cstypes "github.com/cometbft/cometbft/consensus/types" - "github.com/cometbft/cometbft/crypto" - cmtevents "github.com/cometbft/cometbft/libs/events" - "github.com/cometbft/cometbft/libs/fail" - cmtjson "github.com/cometbft/cometbft/libs/json" - "github.com/cometbft/cometbft/libs/log" - cmtmath "github.com/cometbft/cometbft/libs/math" - cmtos "github.com/cometbft/cometbft/libs/os" - "github.com/cometbft/cometbft/libs/service" - cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/p2p" - "github.com/cometbft/cometbft/pkg/trace" - cmtproto "github.com/cometbft/cometbft/proto/tendermint/types" - sm "github.com/cometbft/cometbft/state" - "github.com/cometbft/cometbft/types" - cmttime "github.com/cometbft/cometbft/types/time" + cfg "github.com/tendermint/tendermint/config" + cstypes "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/crypto" + cmtevents "github.com/tendermint/tendermint/libs/events" + "github.com/tendermint/tendermint/libs/fail" + cmtjson "github.com/tendermint/tendermint/libs/json" + "github.com/tendermint/tendermint/libs/log" + cmtmath "github.com/tendermint/tendermint/libs/math" + cmtos "github.com/tendermint/tendermint/libs/os" + "github.com/tendermint/tendermint/libs/service" + cmtsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/pkg/trace" + "github.com/tendermint/tendermint/pkg/trace/schema" + cmtproto "github.com/tendermint/tendermint/proto/tendermint/types" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" + cmttime "github.com/tendermint/tendermint/types/time" ) // Consensus sentinel errors @@ -142,7 +143,7 @@ type State struct { // for reporting metrics metrics *Metrics - eventCollector *trace.Client + traceClient *trace.Client } // StateOption sets an optional parameter on the State. @@ -173,7 +174,7 @@ func NewState( evpool: evpool, evsw: cmtevents.NewEventSwitch(), metrics: NopMetrics(), - eventCollector: &trace.Client{}, + traceClient: &trace.Client{}, } // set function defaults (may be overwritten before calling Start) @@ -215,9 +216,9 @@ func StateMetrics(metrics *Metrics) StateOption { return func(cs *State) { cs.metrics = metrics } } -// SetEventCollector sets the remote event collector. -func SetEventCollector(ec *trace.Client) StateOption { - return func(cs *State) { cs.eventCollector = ec } +// SetTraceClient sets the remote event collector. +func SetTraceClient(ec *trace.Client) StateOption { + return func(cs *State) { cs.traceClient = ec } } // String returns a string. @@ -703,6 +704,8 @@ func (cs *State) newStep() { cs.nSteps++ + schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, cs.Step) + // newStep is called by updateToState in NewState before the eventBus is set! if cs.eventBus != nil { if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil { diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 152c029b29..205ab1b336 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -7,13 +7,15 @@ import ( "github.com/gogo/protobuf/proto" - cfg "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/crypto/tmhash" - "github.com/cometbft/cometbft/libs/log" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/p2p" - protomem "github.com/cometbft/cometbft/proto/tendermint/mempool" - "github.com/cometbft/cometbft/types" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/pkg/trace" + "github.com/tendermint/tendermint/pkg/trace/schema" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/types" ) const ( @@ -35,10 +37,11 @@ const ( // spec under /.spec.md type Reactor struct { p2p.BaseReactor - opts *ReactorOptions - mempool *TxPool - ids *mempoolIDs - requests *requestScheduler + opts *ReactorOptions + mempool *TxPool + ids *mempoolIDs + requests *requestScheduler + traceClient *trace.Client } type ReactorOptions struct { @@ -52,6 +55,9 @@ type ReactorOptions struct { // MaxGossipDelay is the maximum allotted time that the reactor expects a transaction to // arrive before issuing a new request to a different peer MaxGossipDelay time.Duration + + // TraceClient is the trace client for collecting trace level events + TraceClient *trace.Client } func (opts *ReactorOptions) VerifyAndComplete() error { @@ -81,10 +87,11 @@ func NewReactor(mempool *TxPool, opts *ReactorOptions) (*Reactor, error) { return nil, err } memR := &Reactor{ - opts: opts, - mempool: mempool, - ids: newMempoolIDs(), - requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout), + opts: opts, + mempool: mempool, + ids: newMempoolIDs(), + requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout), + traceClient: &trace.Client{}, } memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) return memR, nil @@ -218,6 +225,9 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { // NOTE: This setup also means that we can support older mempool implementations that simply // flooded the network with transactions. case *protomem.Txs: + for _, tx := range msg.Txs { + schema.WriteMempoolTx(memR.traceClient, e.Src.ID(), tx, schema.TransferTypeDownload, schema.CatVersionFieldValue) + } protoTxs := msg.GetTxs() if len(protoTxs) == 0 { memR.Logger.Error("received empty txs from peer", "src", e.Src) @@ -260,6 +270,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { // 3. If we recently evicted the tx and still don't have space for it, we do nothing. // 4. Else, we request the transaction from that peer. case *protomem.SeenTx: + schema.WriteMempoolPeerState( + memR.traceClient, + e.Src.ID(), + schema.SeenTxStateUpdateFieldValue, + schema.TransferTypeDownload, + schema.CatVersionFieldValue, + ) txKey, err := types.TxKeyFromBytes(msg.TxKey) if err != nil { memR.Logger.Error("peer sent SeenTx with incorrect tx key", "err", err) @@ -287,6 +304,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { // A peer is requesting a transaction that we have claimed to have. Find the specified // transaction and broadcast it to the peer. We may no longer have the transaction case *protomem.WantTx: + schema.WriteMempoolPeerState( + memR.traceClient, + e.Src.ID(), + schema.WantTxStateUpdateFieldValue, + schema.TransferTypeDownload, + schema.CatVersionFieldValue, + ) txKey, err := types.TxKeyFromBytes(msg.TxKey) if err != nil { memR.Logger.Error("peer sent WantTx with incorrect tx key", "err", err) @@ -296,6 +320,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { tx, has := memR.mempool.Get(txKey) if has && !memR.opts.ListenOnly { peerID := memR.ids.GetIDForPeer(e.Src.ID()) + schema.WriteMempoolTx( + memR.traceClient, + e.Src.ID(), + msg.TxKey, + schema.TransferTypeUpload, + schema.CatVersionFieldValue, + ) memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID) if p2p.SendEnvelopeShim(e.Src, p2p.Envelope{ //nolint:staticcheck ChannelID: mempool.MempoolChannel, diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 8dfa3d5c76..b23e22f3b9 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -7,14 +7,16 @@ import ( "github.com/gogo/protobuf/proto" - cfg "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/clist" - "github.com/cometbft/cometbft/libs/log" - cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/p2p" - protomem "github.com/cometbft/cometbft/proto/tendermint/mempool" - "github.com/cometbft/cometbft/types" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/clist" + "github.com/tendermint/tendermint/libs/log" + cmtsync "github.com/tendermint/tendermint/libs/sync" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/pkg/trace" + "github.com/tendermint/tendermint/pkg/trace/schema" + protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/types" ) // Reactor handles mempool tx broadcasting amongst peers. @@ -22,9 +24,10 @@ import ( // peers you received it from. type Reactor struct { p2p.BaseReactor - config *cfg.MempoolConfig - mempool *TxMempool - ids *mempoolIDs + config *cfg.MempoolConfig + mempool *TxMempool + ids *mempoolIDs + traceClient *trace.Client } type mempoolIDs struct { @@ -91,11 +94,12 @@ func newMempoolIDs() *mempoolIDs { } // NewReactor returns a new Reactor with the given config and mempool. -func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool) *Reactor { +func NewReactor(config *cfg.MempoolConfig, mempool *TxMempool, traceClient *trace.Client) *Reactor { memR := &Reactor{ - config: config, - mempool: mempool, - ids: newMempoolIDs(), + config: config, + mempool: mempool, + ids: newMempoolIDs(), + traceClient: traceClient, } memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) return memR @@ -176,6 +180,15 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: + for _, tx := range msg.Txs { + schema.WriteMempoolTx( + memR.traceClient, + e.Src.ID(), + tx, + schema.TransferTypeDownload, + schema.V1VersionFieldValue, + ) + } protoTxs := msg.GetTxs() if len(protoTxs) == 0 { memR.Logger.Error("received tmpty txs from peer", "src", e.Src) @@ -290,6 +303,13 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // to avoid doing it a second time memTx.SetPeer(peerID) } + schema.WriteMempoolTx( + memR.traceClient, + peer.ID(), + memTx.tx, + schema.TransferTypeUpload, + schema.V1VersionFieldValue, + ) } select { diff --git a/mempool/v1/reactor_test.go b/mempool/v1/reactor_test.go index 1ae9fbddfa..d4b3c088ab 100644 --- a/mempool/v1/reactor_test.go +++ b/mempool/v1/reactor_test.go @@ -12,8 +12,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/cometbft/cometbft/abci/example/kvstore" - "github.com/cometbft/cometbft/p2p/mock" + "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/p2p/mock" + "github.com/tendermint/tendermint/pkg/trace" cfg "github.com/cometbft/cometbft/config" @@ -163,7 +164,7 @@ func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { mempool, cleanup := newMempoolWithAppAndConfig(cc, config) defer cleanup() - reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i] = NewReactor(config.Mempool, mempool, &trace.Client{}) // so we dont start the consensus states reactors[i].SetLogger(logger.With("validator", i)) } diff --git a/node/node.go b/node/node.go index c1694e8cb2..82de6c6c6d 100644 --- a/node/node.go +++ b/node/node.go @@ -377,6 +377,7 @@ func createMempoolAndMempoolReactor( state sm.State, memplMetrics *mempl.Metrics, logger log.Logger, + traceClient *trace.Client, ) (mempl.Mempool, p2p.Reactor) { switch config.Mempool.Version { case cfg.MempoolV2: @@ -393,8 +394,9 @@ func createMempoolAndMempoolReactor( reactor, err := mempoolv2.NewReactor( mp, &mempoolv2.ReactorOptions{ - ListenOnly: !config.Mempool.Broadcast, - MaxTxSize: config.Mempool.MaxTxBytes, + ListenOnly: !config.Mempool.Broadcast, + MaxTxSize: config.Mempool.MaxTxBytes, + TraceClient: traceClient, }, ) if err != nil { @@ -421,6 +423,7 @@ func createMempoolAndMempoolReactor( reactor := mempoolv1.NewReactor( config.Mempool, mp, + traceClient, ) if config.Consensus.WaitForTxs() { mp.EnableTxsAvailable() @@ -509,7 +512,7 @@ func createConsensusReactor(config *cfg.Config, waitSync bool, eventBus *types.EventBus, consensusLogger log.Logger, - evCollector *trace.Client, + traceClient *trace.Client, ) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( config.Consensus, @@ -519,13 +522,18 @@ func createConsensusReactor(config *cfg.Config, mempool, evidencePool, cs.StateMetrics(csMetrics), - cs.SetEventCollector(evCollector), + cs.SetTraceClient(traceClient), ) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) + consensusReactor := cs.NewReactor( + consensusState, + waitSync, + cs.ReactorMetrics(csMetrics), + cs.ReactorTracing(traceClient), + ) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor @@ -859,7 +867,7 @@ func NewNode(config *cfg.Config, } // Make MempoolReactor - mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) + mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger, influxdbClient) // Make Evidence Reactor evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) diff --git a/pkg/trace/README.md b/pkg/trace/README.md index 208ba5e5ca..d883e1b413 100644 --- a/pkg/trace/README.md +++ b/pkg/trace/README.md @@ -16,17 +16,17 @@ example, we're pushing a point in the consensus reactor to measure exactly when each step of consensus is reached for each node. ```go -if cs.eventCollector.IsCollecting() { - cs.eventCollector.WritePoint("consensus", map[string]interface{}{ - "roundData": []interface{}{rs.Height, rs.Round, rs.Step}, - }) -} +client.WritePoint(RoundStateTable, map[string]interface{}{ + HeightFieldKey: height, + RoundFieldKey: round, + StepFieldKey: step.String(), +}) ``` Using this method enforces the typical schema, where we are tagging (aka indexing) each point by the chain-id and the node-id, then adding the local time of the creation of the event. If you need to push a custom point, you can use -the underlying client directly. See influxdb2.WriteAPI for more details. +the underlying client directly. See `influxdb2.WriteAPI` for more details. ### Schema @@ -40,19 +40,54 @@ node. from(bucket: "e2e") |> range(start: -1h) |> filter( - fn: (r) => r["_measurement"] == "consensus" + fn: (r) => r["_measurement"] == "consensus_round_state" and r.chain_id == "ci-YREG8X" and r.node_id == "0b529c309608172a29c49979394734260b42acfb" ) ``` +We can easily retrieve all fields in a relatively standard table format by using +the pivot `fluxQL` command. + +```flux +from(bucket: "mocha") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "consensus_round_state") + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +``` + +### Querying Data Using Python + +Python can be used to quickly search for and isolate specific patterns. + +```python +from influxdb_client import InfluxDBClient +from influxdb_client.client.write_api import SYNCHRONOUS + +client = InfluxDBClient(url="http://your-influx-url:8086/", token="your-influx-token", org="celestia") + +query_api = client.query_api() + +def create_flux_table_query(start, bucket, measurement, filter_clause): + flux_table_query = f''' + from(bucket: "{bucket}") + |> range(start: {start}) + |> filter(fn: (r) => r._measurement == "{measurement}") + {filter_clause} + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + return flux_table_query + +query = create_flux_table_query("-1h", "mocha", "consenus_round_state", "") +result = query_api.query(query=query) +``` ### Running a node with remote tracing on Tracing will only occur if an influxdb URL in specified either directly in the `config.toml` or as flags provided to the start sub command. -configure in the config.toml +#### Configure in the `config.toml` ```toml ####################################################### @@ -62,7 +97,7 @@ configure in the config.toml ... -# The URL of the influxdb instance to use for remote event +# The URL of the influxdb instance to use for remote event # collection. If empty, remote event collection is disabled. influx_url = "http://your-influx-ip:8086/" @@ -77,9 +112,15 @@ influx_org = "celestia" # The size of the batches that are sent to the database. influx_batch_size = 20 + +# The list of tables that are updated when tracing. All available tables and +# their schema can be found in the pkg/trace/schema package. +influx_tables = ["consensus_round_state", "mempool_tx", ] + ``` -or +or + ```sh celestia-appd start --influxdb-url=http://your-influx-ip:8086/ --influxdb-token="your-token" ``` diff --git a/pkg/trace/client.go b/pkg/trace/client.go index 7801f7cd1b..2d2c3bc09b 100644 --- a/pkg/trace/client.go +++ b/pkg/trace/client.go @@ -16,9 +16,9 @@ const ( ChainIDTag = "chain_id" ) -// EventCollectorConfig is the influxdb client configuration used for +// ClientConfigConfig is the influxdb client configuration used for // collecting events. -type EventCollectorConfig struct { +type ClientConfigConfig struct { // URL is the influxdb url. URL string `mapstructure:"influx_url"` // Token is the influxdb token. @@ -31,16 +31,6 @@ type EventCollectorConfig struct { BatchSize int `mapstructure:"influx_batch_size"` } -// DefaultEventCollectorConfig returns the default configuration. -func DefaultEventCollectorConfig() EventCollectorConfig { - return EventCollectorConfig{ - URL: "", - Org: "celestia", - Bucket: "e2e", - BatchSize: 10, - } -} - // Client is an influxdb client that can be used to push events to influxdb. It // is used to collect trace data from many different nodes in a network. If // there is no URL in the config.toml, then the underlying client is nil and no @@ -58,6 +48,10 @@ type Client struct { // nodeID is added as a tag all points nodeID string + // tables is a map from table name to the schema of that table that are + // configured to be collected. + tables map[string]struct{} + // Client is the influxdb client. This field is nil if no connection is // established. Client influxdb2.Client @@ -87,8 +81,9 @@ func NewClient(cfg *config.InstrumentationConfig, logger log.Logger, chainID, no cancel: cancel, chainID: chainID, nodeID: nodeID, + tables: sliceToMap(cfg.InfluxTables), } - if cfg == nil || cfg.InfluxURL == "" { + if cfg.InfluxURL == "" { return cli, nil } cli.Client = influxdb2.NewClientWithOptions( @@ -125,8 +120,12 @@ func (c *Client) logErrors(logger log.Logger) { } // IsCollecting returns true if the client is collecting events. -func (c *Client) IsCollecting() bool { - return c.Client != nil +func (c *Client) IsCollecting(table string) bool { + if c.Client == nil { + return false + } + _, has := c.tables[table] + return has } // WritePoint async writes a point to influxdb. To enforce the schema, it @@ -135,7 +134,7 @@ func (c *Client) IsCollecting() bool { // nothing. The "table" arg is used as the influxdb "measurement" for the point. // If other tags are needed, use WriteCustomPoint. func (c *Client) WritePoint(table string, fields map[string]interface{}) { - if !c.IsCollecting() { + if !c.IsCollecting(table) { return } writeAPI := c.Client.WriteAPI(c.cfg.InfluxOrg, c.cfg.InfluxBucket) @@ -146,3 +145,11 @@ func (c *Client) WritePoint(table string, fields map[string]interface{}) { p := write.NewPoint(table, tags, fields, time.Now()) writeAPI.WritePoint(p) } + +func sliceToMap([]string) map[string]struct{} { + m := make(map[string]struct{}) + for _, s := range []string{} { + m[s] = struct{}{} + } + return m +} diff --git a/pkg/trace/doc.go b/pkg/trace/doc.go index 9372af5839..3d6521464e 100644 --- a/pkg/trace/doc.go +++ b/pkg/trace/doc.go @@ -18,8 +18,8 @@ each step of consensus is reached for each node. ```go - if cs.eventCollector.IsCollecting() { - cs.eventCollector.WritePoint("consensus", map[string]interface{}{ + if cs.traceClient.IsCollecting() { + cs.traceClient.WritePoint("consensus", map[string]interface{}{ "roundData": []interface{}{rs.Height, rs.Round, rs.Step}, }) } diff --git a/pkg/trace/schema/consensus.go b/pkg/trace/schema/consensus.go new file mode 100644 index 0000000000..726a53d262 --- /dev/null +++ b/pkg/trace/schema/consensus.go @@ -0,0 +1,83 @@ +package schema + +import ( + cstypes "github.com/tendermint/tendermint/consensus/types" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/pkg/trace" +) + +// ConsensusTables returns the list of tables that are used for consensus +// tracing. +func ConsensusTables() []string { + return []string{ + RoundStateTable, + BlockPartsTable, + } +} + +// Schema constants for the consensus round state tracing database. +const ( + // RoundStateTable is the name of the table that stores the consensus + // state traces. Follows this schema: + // + // | time | height | round | step | + RoundStateTable = "consensus_round_state" + + // StepFieldKey is the name of the field that stores the consensus step. The + // value is a string. + StepFieldKey = "step" +) + +// WriteRoundState writes a tracing point for a tx using the predetermined +// schema for consensus state tracing. This is used to create a table in the following +// schema: +// +// | time | height | round | step | +func WriteRoundState(client *trace.Client, height int64, round int32, step cstypes.RoundStepType) { + client.WritePoint(RoundStateTable, map[string]interface{}{ + HeightFieldKey: height, + RoundFieldKey: round, + StepFieldKey: step.String(), + }) +} + +// Schema constants for the "consensus_block_parts" table. +const ( + // BlockPartsTable is the name of the table that stores the consensus block + // parts. + // following schema: + // + // | time | height | round | index | peer | transfer type | + BlockPartsTable = "consensus_block_parts" + + // BlockPartIndexFieldKey is the name of the field that stores the block + // part + BlockPartIndexFieldKey = "index" +) + +// WriteBlockPart writes a tracing point for a BlockPart using the predetermined +// schema for consensus state tracing. This is used to create a table in the +// following schema: +// +// | time | height | round | index | peer | transfer type | +func WriteBlockPart( + client *trace.Client, + height int64, + round int32, + peer p2p.ID, + index uint32, + transferType string, +) { + // this check is redundant to what is checked during WritePoint, although it + // is an optimization to avoid allocations from the map of fields. + if !client.IsCollecting(BlockPartsTable) { + return + } + client.WritePoint(BlockPartsTable, map[string]interface{}{ + HeightFieldKey: height, + RoundFieldKey: round, + BlockPartIndexFieldKey: index, + PeerFieldKey: peer, + TransferTypeFieldKey: transferType, + }) +} diff --git a/pkg/trace/schema/mempool.go b/pkg/trace/schema/mempool.go new file mode 100644 index 0000000000..89c53f102d --- /dev/null +++ b/pkg/trace/schema/mempool.go @@ -0,0 +1,122 @@ +package schema + +import ( + "github.com/tendermint/tendermint/libs/bytes" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/pkg/trace" + "github.com/tendermint/tendermint/types" +) + +// MempoolTables returns the list of tables for mempool tracing. +func MempoolTables() []string { + return []string{ + MempoolTxTable, + MempoolPeerStateTable, + } +} + +// Schema constants for the mempool_tx table +const ( + // MempoolTxTable is the tracing "measurement" (aka table) for the mempool + // that stores tracing data related to gossiping transactions. + // + // The schema for this table is: + // | time | peerID | tx size | tx hash | transfer type | mempool version | + MempoolTxTable = "mempool_tx" + + // TxFieldKey is the tracing field key for receiving for sending a + // tx. This should take the form of a tx hash as the value. + TxFieldKey = "tx" + + // SizeFieldKey is the tracing field key for the size of a tx. This + // should take the form of the size of the tx as the value. + SizeFieldKey = "size" + + // VersionFieldKey is the tracing field key for the version of the mempool. + // This is used to distinguish between versions of the mempool. + VersionFieldKey = "version" + + // V1VersionFieldValue is a tracing field value for the version of + // the mempool. This value is used by the "version" field key. + V1VersionFieldValue = "v1" + + // CatVersionFieldValue is a tracing field value for the version of + // the mempool. This value is used by the "version" field key. + CatVersionFieldValue = "cat" +) + +// WriteMempoolTx writes a tracing point for a tx using the predetermined +// schema for mempool tracing. This is used to create a table in the following +// schema: +// +// | time | peerID | tx size | tx hash | transfer type | mempool version | +func WriteMempoolTx(client *trace.Client, peer p2p.ID, tx []byte, transferType, version string) { + // this check is redundant to what is checked during WritePoint, although it + // is an optimization to avoid allocations from the map of fields. + if !client.IsCollecting(MempoolTxTable) { + return + } + client.WritePoint(MempoolTxTable, map[string]interface{}{ + TxFieldKey: bytes.HexBytes(types.Tx(tx).Hash()).String(), + PeerFieldKey: peer, + SizeFieldKey: len(tx), + TransferTypeFieldKey: transferType, + VersionFieldKey: version, + }) +} + +const ( + // MempoolPeerState is the tracing "measurement" (aka table) for the mempool + // that stores tracing data related to mempool state, specifically + // the gossipping of "SeenTx" and "WantTx". + // + // The schema for this table is: + // | time | peerID | update type | mempool version | + MempoolPeerStateTable = "mempool_peer_state" + + // StateUpdateFieldKey is the tracing field key for state updates of the mempool. + StateUpdateFieldKey = "update" + + // SeenTxStateUpdateFieldValue is a tracing field value for the state + // update of the mempool. This value is used by the "update" field key. + SeenTxStateUpdateFieldValue = "seen_tx" + + // WantTxStateUpdateFieldValue is a tracing field value for the state + // update of the mempool. This value is used by the "update" field key. + WantTxStateUpdateFieldValue = "want_tx" + + // RemovedTxStateUpdateFieldValue is a tracing field value for the local + // state update of the mempool. This value is used by the "update" field + // key. + RemovedTxStateUpdateFieldValue = "removed_tx" + + // AddedTxStateUpdateFieldValue is a tracing field value for the local state + // update of the mempool. This value is used by the "update" field key. + AddedTxStateUpdateFieldValue = "added_tx" +) + +// WriteMempoolPeerState writes a tracing point for the mempool state using +// the predetermined schema for mempool tracing. This is used to create a table +// in the following schema: +// +// | time | peerID | transfer type | state update | mempool version | +func WriteMempoolPeerState(client *trace.Client, peer p2p.ID, stateUpdate, transferType, version string) { + // this check is redundant to what is checked during WritePoint, although it + // is an optimization to avoid allocations from creating the map of fields. + if !client.IsCollecting(RoundStateTable) { + return + } + client.WritePoint(RoundStateTable, map[string]interface{}{ + PeerFieldKey: peer, + TransferTypeFieldKey: transferType, + StateUpdateFieldKey: stateUpdate, + VersionFieldKey: version, + }) +} + +const ( +// LocalTable is the tracing "measurement" (aka table) for the local mempool +// updates, such as when a tx is added or removed. +// TODO: actually implement the local mempool tracing +// LocalTable = "mempool_local" +) diff --git a/pkg/trace/schema/tables.go b/pkg/trace/schema/tables.go new file mode 100644 index 0000000000..2c8c9ef97d --- /dev/null +++ b/pkg/trace/schema/tables.go @@ -0,0 +1,41 @@ +package schema + +import "github.com/tendermint/tendermint/config" + +func init() { + config.DefaultInfluxTables = AllTables() +} + +func AllTables() []string { + tables := []string{} + tables = append(tables, MempoolTables()...) + tables = append(tables, ConsensusTables()...) + return tables +} + +// General purpose schema constants used across multiple tables +const ( + // PeerFieldKey is the tracing field key for the peer that sent or + // received a tx. This should take the form of the peer's address as the + // value. + PeerFieldKey = "peer" + + // TransferTypeFieldKey is the tracing field key for the class of a tx. + TransferTypeFieldKey = "transfer_type" + + // TransferTypeDownload is a tracing field value for receiving some + // data from a peer. This value is used by the "TransferType" field key. + TransferTypeDownload = "download" + + // TransferTypeUpload is a tracing field value for sending some data + // to a peer. This value is used by the "TransferType" field key. + TransferTypeUpload = "upload" + + // RoundFieldKey is the name of the field that stores the consensus round. + // The value is an int32. + RoundFieldKey = "round" + + // HeightFieldKey is the name of the field that stores the consensus height. + // The value is an int64. + HeightFieldKey = "height" +) diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 7432e1ea64..4a45b9dd30 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -425,6 +425,7 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, reactor := mempoolv1.NewReactor( config.Mempool, mp, + &trace.Client{}, ) if config.Consensus.WaitForTxs() { mp.EnableTxsAvailable()