Skip to content

Commit

Permalink
Improvements to Log Poller metrics (#9769)
Browse files Browse the repository at this point in the history
* Adding evmChainID to the logpoller, removing address, because it doesn't match all the queries and creates big cardinality. Reducing number of latency buckets as we don't need to values like 7 - 10 seconds, everything above 1 second is worrying enough

* Minor fix

* Minor fix
  • Loading branch information
mateusz-sekara authored Jul 14, 2023
1 parent 9443bed commit 6f8ad63
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 23 deletions.
36 changes: 18 additions & 18 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,20 @@ var (
float64(1 * time.Second),
float64(2 * time.Second),
float64(5 * time.Second),
float64(7 * time.Second),
float64(10 * time.Second),
}
lpQueryHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "log_poller_query_duration",
Help: "Measures duration of Log Poller's queries fetching logs",
Buckets: sqlLatencyBuckets,
}, []string{"query", "address"})
}, []string{"evmChainID", "query"})
)

// ObservedLogPoller is a decorator layer for LogPoller, responsible for adding pushing histogram metrics for some of the queries.
// It doesn't change internal logic, because all calls are delegated to the origin LogPoller
type ObservedLogPoller struct {
LogPoller
histogram *prometheus.HistogramVec
chainId string
}

// NewObservedLogPoller creates an observed version of log poller created by NewLogPoller
Expand All @@ -52,86 +51,87 @@ func NewObservedLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod ti
return &ObservedLogPoller{
LogPoller: NewLogPoller(orm, ec, lggr, pollPeriod, finalityDepth, backfillBatchSize, rpcBatchSize, keepBlocksDepth),
histogram: lpQueryHistogram,
chainId: orm.chainID.String(),
}
}

func (o *ObservedLogPoller) LogsCreatedAfter(eventSig common.Hash, address common.Address, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "LogsCreatedAfter", address, func() ([]Log, error) {
return withObservedQuery(o, "LogsCreatedAfter", func() ([]Log, error) {
return o.LogPoller.LogsCreatedAfter(eventSig, address, after, confs, qopts...)
})
}

func (o *ObservedLogPoller) LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error) {
return withObservedQuery(o.histogram, "LatestLogByEventSigWithConfs", common.Address{}, func() (*Log, error) {
return withObservedQuery(o, "LatestLogByEventSigWithConfs", func() (*Log, error) {
return o.LogPoller.LatestLogByEventSigWithConfs(eventSig, address, confs, qopts...)
})
}

func (o *ObservedLogPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "LatestLogEventSigsAddrsWithConfs", common.Address{}, func() ([]Log, error) {
return withObservedQuery(o, "LatestLogEventSigsAddrsWithConfs", func() ([]Log, error) {
return o.LogPoller.LatestLogEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
})
}

func (o *ObservedLogPoller) LatestBlockByEventSigsAddrsWithConfs(eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
return withObservedQuery(o.histogram, "LatestBlockByEventSigsAddrsWithConfs", common.Address{}, func() (int64, error) {
return withObservedQuery(o, "LatestBlockByEventSigsAddrsWithConfs", func() (int64, error) {
return o.LogPoller.LatestBlockByEventSigsAddrsWithConfs(eventSigs, addresses, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogs", address, func() ([]Log, error) {
return withObservedQuery(o, "IndexedLogs", func() ([]Log, error) {
return o.LogPoller.IndexedLogs(eventSig, address, topicIndex, topicValues, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogsByBlockRange", address, func() ([]Log, error) {
return withObservedQuery(o, "IndexedLogsByBlockRange", func() ([]Log, error) {
return o.LogPoller.IndexedLogsByBlockRange(start, end, eventSig, address, topicIndex, topicValues, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogsCreatedAfter", address, func() ([]Log, error) {
return withObservedQuery(o, "IndexedLogsCreatedAfter", func() ([]Log, error) {
return o.LogPoller.IndexedLogsCreatedAfter(eventSig, address, topicIndex, topicValues, after, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogsTopicGreaterThan", address, func() ([]Log, error) {
return withObservedQuery(o, "IndexedLogsTopicGreaterThan", func() ([]Log, error) {
return o.LogPoller.IndexedLogsTopicGreaterThan(eventSig, address, topicIndex, topicValueMin, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogsTopicRange", address, func() ([]Log, error) {
return withObservedQuery(o, "IndexedLogsTopicRange", func() ([]Log, error) {
return o.LogPoller.IndexedLogsTopicRange(eventSig, address, topicIndex, topicValueMin, topicValueMax, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogsWithSigsExcluding", address, func() ([]Log, error) {
return withObservedQuery(o, "IndexedLogsWithSigsExcluding", func() ([]Log, error) {
return o.LogPoller.IndexedLogsWithSigsExcluding(address, eventSigA, eventSigB, topicIndex, fromBlock, toBlock, confs, qopts...)
})
}

func (o *ObservedLogPoller) LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "LogsDataWordRange", address, func() ([]Log, error) {
return withObservedQuery(o, "LogsDataWordRange", func() ([]Log, error) {
return o.LogPoller.LogsDataWordRange(eventSig, address, wordIndex, wordValueMin, wordValueMax, confs, qopts...)
})
}

func (o *ObservedLogPoller) LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "LogsDataWordGreaterThan", address, func() ([]Log, error) {
return withObservedQuery(o, "LogsDataWordGreaterThan", func() ([]Log, error) {
return o.LogPoller.LogsDataWordGreaterThan(eventSig, address, wordIndex, wordValueMin, confs, qopts...)
})
}

func withObservedQuery[T any](histogram *prometheus.HistogramVec, queryName string, address common.Address, query func() (T, error)) (T, error) {
func withObservedQuery[T any](o *ObservedLogPoller, queryName string, query func() (T, error)) (T, error) {
queryStarted := time.Now()
defer func() {
histogram.
WithLabelValues(queryName, address.String()).
o.histogram.
WithLabelValues(o.chainId, queryName).
Observe(float64(time.Since(queryStarted)))
}()
return query()
Expand Down
46 changes: 41 additions & 5 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package logpoller

import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

Expand All @@ -18,7 +21,7 @@ import (

func TestMultipleMetricsArePublished(t *testing.T) {
ctx := testutils.Context(t)
lp := createObservedPollLogger(t)
lp := createObservedPollLogger(t, 100)
require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))

_, _ = lp.IndexedLogs(common.Hash{}, common.Address{}, 1, []common.Hash{}, 1, pg.WithParentCtx(ctx))
Expand All @@ -39,19 +42,21 @@ func TestMultipleMetricsArePublished(t *testing.T) {

func TestShouldPublishMetricInCaseOfError(t *testing.T) {
ctx := testutils.Context(t)
lp := createObservedPollLogger(t)
lp := createObservedPollLogger(t, 200)
require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))

_, err := lp.LatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx))
require.Error(t, err)

require.Equal(t, 1, testutil.CollectAndCount(lp.histogram))
require.Equal(t, 1, counterFromHistogramByLabels(t, lp.histogram, "200", "LatestLogByEventSigWithConfs"))

resetMetrics(*lp)
}

func TestNotObservedFunctions(t *testing.T) {
ctx := testutils.Context(t)
lp := createObservedPollLogger(t)
lp := createObservedPollLogger(t, 300)
require.Equal(t, 0, testutil.CollectAndCount(lp.histogram))

_, err := lp.Logs(0, 1, common.Hash{}, common.Address{}, pg.WithParentCtx(ctx))
Expand All @@ -64,10 +69,25 @@ func TestNotObservedFunctions(t *testing.T) {
resetMetrics(*lp)
}

func createObservedPollLogger(t *testing.T) *ObservedLogPoller {
func TestMetricsAreProperlyPopulatedWithLabels(t *testing.T) {
lp := createObservedPollLogger(t, 420)
expectedCount := 9

for i := 0; i < expectedCount; i++ {
_, err := withObservedQuery(lp, "query", func() (string, error) { return "value", nil })
require.NoError(t, err)
}

require.Equal(t, expectedCount, counterFromHistogramByLabels(t, lp.histogram, "420", "query"))
require.Equal(t, 0, counterFromHistogramByLabels(t, lp.histogram, "420", "other_query"))
require.Equal(t, 0, counterFromHistogramByLabels(t, lp.histogram, "5", "query"))
resetMetrics(*lp)
}

func createObservedPollLogger(t *testing.T, chainId int64) *ObservedLogPoller {
lggr, _ := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
db := pgtest.NewSqlxDB(t)
orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr, pgtest.NewQConfig(true))
orm := NewORM(big.NewInt(chainId), db, lggr, pgtest.NewQConfig(true))
return NewObservedLogPoller(
orm, nil, lggr, 1, 1, 1, 1, 1000,
).(*ObservedLogPoller)
Expand All @@ -76,3 +96,19 @@ func createObservedPollLogger(t *testing.T) *ObservedLogPoller {
func resetMetrics(lp ObservedLogPoller) {
lp.histogram.Reset()
}

func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int {
observer, err := histogramVec.GetMetricWithLabelValues(labels...)
require.NoError(t, err)

metricCh := make(chan prometheus.Metric, 1)
observer.(prometheus.Histogram).Collect(metricCh)
close(metricCh)

metric := <-metricCh
pb := &io_prometheus_client.Metric{}
err = metric.Write(pb)
require.NoError(t, err)

return int(pb.GetHistogram().GetSampleCount())
}

0 comments on commit 6f8ad63

Please sign in to comment.