Skip to content

Commit

Permalink
feat: Add standard tracing points to portions of the mempool and cons…
Browse files Browse the repository at this point in the history
…ensus (#1055)

This PR updates the trace client with a slight refactor (renaming to be
more consistent) and then adds a few standard tables to store traced
information. We are also able to select which tables we wish to update
(and therefore which information to trace) in the config.

closes #978

---------

Co-authored-by: Rootul P <rootulp@gmail.com>
  • Loading branch information
staheri14 and rootulp committed Nov 21, 2023
1 parent ab18899 commit 5e010aa
Show file tree
Hide file tree
Showing 15 changed files with 489 additions and 107 deletions.
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ var (

minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200

// DefaultInfluxTables is a list of tables that are used for storing traces.
// This global var is filled by an init function in the schema package. This
// allows for the schema package to contain all the relevant logic while
// avoiding import cycles.
DefaultInfluxTables = []string{}
)

// Config defines the top level configuration for a CometBFT node
Expand Down Expand Up @@ -1194,6 +1200,10 @@ type InstrumentationConfig struct {
// InfluxBatchSize is the number of points to write in a single batch.
InfluxBatchSize int `mapstructure:"influx_batch_size"`

// InfluxTables is the list of tables that will be traced. See the
// pkg/trace/schema for a complete list of tables.
InfluxTables []string `mapstructure:"influx_tables"`

// PyroscopeURL is the pyroscope url used to establish a connection with a
// pyroscope continuous profiling server.
PyroscopeURL string `mapstructure:"pyroscope_url"`
Expand All @@ -1220,6 +1230,7 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
InfluxOrg: "celestia",
InfluxBucket: "e2e",
InfluxBatchSize: 20,
InfluxTables: DefaultInfluxTables,
PyroscopeURL: "",
PyroscopeTrace: false,
PyroscopeProfileTypes: []string{
Expand Down
4 changes: 4 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ influx_org = "{{ .Instrumentation.InfluxOrg }}"
# The size of the batches that are sent to the database.
influx_batch_size = {{ .Instrumentation.InfluxBatchSize }}
# The list of tables that are updated when tracing. All available tables and
# their schema can be found in the pkg/trace/schema package.
influx_tables = [{{ range .Instrumentation.InfluxTables }}{{ printf "%q, " . }}{{end}}]
# The URL of the pyroscope instance to use for continuous profiling.
# If empty, continuous profiling is disabled.
pyroscope_url = "{{ .Instrumentation.PyroscopeURL }}"
Expand Down
45 changes: 27 additions & 18 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import (
"time"

"github.com/gogo/protobuf/proto"

cstypes "github.com/cometbft/cometbft/consensus/types"
"github.com/cometbft/cometbft/libs/bits"
cmtevents "github.com/cometbft/cometbft/libs/events"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
sm "github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/types"
cmttime "github.com/cometbft/cometbft/types/time"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
cmtevents "github.com/tendermint/tendermint/libs/events"
cmtjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
cmtsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
cmtcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
cmttime "github.com/tendermint/tendermint/types/time"
)

const (
Expand Down Expand Up @@ -48,7 +49,8 @@ type Reactor struct {
eventBus *types.EventBus
rs *cstypes.RoundState

Metrics *Metrics
Metrics *Metrics
traceClient *trace.Client
}

type ReactorOption func(*Reactor)
Expand All @@ -57,10 +59,11 @@ type ReactorOption func(*Reactor)
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
traceClient: &trace.Client{},
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

Expand Down Expand Up @@ -335,6 +338,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand Down Expand Up @@ -591,6 +595,7 @@ OUTER_LOOP:
Part: *parts,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
Expand Down Expand Up @@ -1023,6 +1028,10 @@ func ReactorMetrics(metrics *Metrics) ReactorOption {
return func(conR *Reactor) { conR.Metrics = metrics }
}

func ReactorTracing(traceClient *trace.Client) ReactorOption {
return func(conR *Reactor) { conR.traceClient = traceClient }
}

//-----------------------------------------------------------------------------

var (
Expand Down
47 changes: 25 additions & 22 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ import (

"github.com/gogo/protobuf/proto"

cfg "github.com/cometbft/cometbft/config"
cstypes "github.com/cometbft/cometbft/consensus/types"
"github.com/cometbft/cometbft/crypto"
cmtevents "github.com/cometbft/cometbft/libs/events"
"github.com/cometbft/cometbft/libs/fail"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
cmtmath "github.com/cometbft/cometbft/libs/math"
cmtos "github.com/cometbft/cometbft/libs/os"
"github.com/cometbft/cometbft/libs/service"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
"github.com/cometbft/cometbft/pkg/trace"
cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
sm "github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/types"
cmttime "github.com/cometbft/cometbft/types/time"
cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto"
cmtevents "github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/libs/fail"
cmtjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
cmtmath "github.com/tendermint/tendermint/libs/math"
cmtos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/libs/service"
cmtsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
cmttime "github.com/tendermint/tendermint/types/time"
)

// Consensus sentinel errors
Expand Down Expand Up @@ -142,7 +143,7 @@ type State struct {
// for reporting metrics
metrics *Metrics

eventCollector *trace.Client
traceClient *trace.Client
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -173,7 +174,7 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
eventCollector: &trace.Client{},
traceClient: &trace.Client{},
}

// set function defaults (may be overwritten before calling Start)
Expand Down Expand Up @@ -215,9 +216,9 @@ func StateMetrics(metrics *Metrics) StateOption {
return func(cs *State) { cs.metrics = metrics }
}

// SetEventCollector sets the remote event collector.
func SetEventCollector(ec *trace.Client) StateOption {
return func(cs *State) { cs.eventCollector = ec }
// SetTraceClient sets the remote event collector.
func SetTraceClient(ec *trace.Client) StateOption {
return func(cs *State) { cs.traceClient = ec }
}

// String returns a string.
Expand Down Expand Up @@ -703,6 +704,8 @@ func (cs *State) newStep() {

cs.nSteps++

schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, cs.Step)

// newStep is called by updateToState in NewState before the eventBus is set!
if cs.eventBus != nil {
if err := cs.eventBus.PublishEventNewRoundStep(rs); err != nil {
Expand Down
61 changes: 46 additions & 15 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (

"github.com/gogo/protobuf/proto"

cfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/crypto/tmhash"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/mempool"
"github.com/cometbft/cometbft/p2p"
protomem "github.com/cometbft/cometbft/proto/tendermint/mempool"
"github.com/cometbft/cometbft/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
)

const (
Expand All @@ -35,10 +37,11 @@ const (
// spec under /.spec.md
type Reactor struct {
p2p.BaseReactor
opts *ReactorOptions
mempool *TxPool
ids *mempoolIDs
requests *requestScheduler
opts *ReactorOptions
mempool *TxPool
ids *mempoolIDs
requests *requestScheduler
traceClient *trace.Client
}

type ReactorOptions struct {
Expand All @@ -52,6 +55,9 @@ type ReactorOptions struct {
// MaxGossipDelay is the maximum allotted time that the reactor expects a transaction to
// arrive before issuing a new request to a different peer
MaxGossipDelay time.Duration

// TraceClient is the trace client for collecting trace level events
TraceClient *trace.Client
}

func (opts *ReactorOptions) VerifyAndComplete() error {
Expand Down Expand Up @@ -81,10 +87,11 @@ func NewReactor(mempool *TxPool, opts *ReactorOptions) (*Reactor, error) {
return nil, err
}
memR := &Reactor{
opts: opts,
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
opts: opts,
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
traceClient: &trace.Client{},
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR, nil
Expand Down Expand Up @@ -218,6 +225,9 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// NOTE: This setup also means that we can support older mempool implementations that simply
// flooded the network with transactions.
case *protomem.Txs:
for _, tx := range msg.Txs {
schema.WriteMempoolTx(memR.traceClient, e.Src.ID(), tx, schema.TransferTypeDownload, schema.CatVersionFieldValue)
}
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received empty txs from peer", "src", e.Src)
Expand Down Expand Up @@ -260,6 +270,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// 3. If we recently evicted the tx and still don't have space for it, we do nothing.
// 4. Else, we request the transaction from that peer.
case *protomem.SeenTx:
schema.WriteMempoolPeerState(
memR.traceClient,
e.Src.ID(),
schema.SeenTxStateUpdateFieldValue,
schema.TransferTypeDownload,
schema.CatVersionFieldValue,
)
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
memR.Logger.Error("peer sent SeenTx with incorrect tx key", "err", err)
Expand Down Expand Up @@ -287,6 +304,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
// A peer is requesting a transaction that we have claimed to have. Find the specified
// transaction and broadcast it to the peer. We may no longer have the transaction
case *protomem.WantTx:
schema.WriteMempoolPeerState(
memR.traceClient,
e.Src.ID(),
schema.WantTxStateUpdateFieldValue,
schema.TransferTypeDownload,
schema.CatVersionFieldValue,
)
txKey, err := types.TxKeyFromBytes(msg.TxKey)
if err != nil {
memR.Logger.Error("peer sent WantTx with incorrect tx key", "err", err)
Expand All @@ -296,6 +320,13 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
tx, has := memR.mempool.Get(txKey)
if has && !memR.opts.ListenOnly {
peerID := memR.ids.GetIDForPeer(e.Src.ID())
schema.WriteMempoolTx(
memR.traceClient,
e.Src.ID(),
msg.TxKey,
schema.TransferTypeUpload,
schema.CatVersionFieldValue,
)
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)
if p2p.SendEnvelopeShim(e.Src, p2p.Envelope{ //nolint:staticcheck
ChannelID: mempool.MempoolChannel,
Expand Down
Loading

0 comments on commit 5e010aa

Please sign in to comment.