Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maint: Refactor metrics registration to streamline declaration and enable easier documentation generation #1350

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func (a *App) Start() error {
}

a.Logger.Debug().Logf("Starting up App...")
a.Metrics.Register("config_hash", "gauge")
a.Metrics.Register("rule_config_hash", "gauge")
for _, metric := range configHashMetrics {
a.Metrics.Register(metric)
}
a.IncomingRouter.SetVersion(a.Version)
a.PeerRouter.SetVersion(a.Version)

Expand All @@ -64,3 +65,18 @@ func (a *App) Stop() error {
a.Logger.Debug().Logf("Shutting down App...")
return nil
}

var configHashMetrics = []metrics.Metadata{
metrics.Metadata{
Name: "config_hash",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "The hash of the current configuration",
},
metrics.Metadata{
Name: "rule_config_hash",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "The hash of the current rules configuration",
},
}
61 changes: 48 additions & 13 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,9 @@ func main() {

// these have to be done after the injection (of metrics)
// these are the metrics that libhoney will emit; we preregister them so that they always appear
libhoneyMetricsName := map[string]string{
"queue_length": "gauge",
"queue_overflow": "counter",
"send_errors": "counter",
"send_retries": "counter",
"batches_sent": "counter",
"messages_sent": "counter",
"response_decode_errors": "counter",
}

for name, typ := range libhoneyMetricsName {
upstreamMetricsRecorder.Register(name, typ)
peerMetricsRecorder.Register(name, typ)
for _, metric := range libhoneyMetrics {
upstreamMetricsRecorder.Register(metric)
peerMetricsRecorder.Register(metric)
}

// Register metrics after the metrics object has been created
Expand Down Expand Up @@ -381,3 +371,48 @@ func main() {
close(monitorDone)
close(sigsToExit)
}

var libhoneyMetrics = []metrics.Metadata{
metrics.Metadata{
Name: "queue_length",
Type: metrics.Gauge,
Unit: metrics.Dimensionless,
Description: "number of events waiting to be sent to destination",
},
metrics.Metadata{
Name: "queue_overflow",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of events dropped due to queue overflow",
},
metrics.Metadata{
Name: "send_errors",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of errors encountered while sending events to destination",
},
metrics.Metadata{
Name: "send_retries",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of times a batch of events was retried",
},
metrics.Metadata{
Name: "batches_sent",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of batches of events sent to destination",
},
metrics.Metadata{
Name: "messages_sent",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of messages sent to destination",
},
metrics.Metadata{
Name: "response_decode_errors",
Type: metrics.Counter,
Unit: metrics.Dimensionless,
Description: "number of errors encountered while decoding responses from destination",
},
}
18 changes: 11 additions & 7 deletions collect/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,30 @@ type DefaultInMemCache struct {

const DefaultInMemCacheCapacity = 10000

var collectCacheMetrics = []metrics.Metadata{
{Name: "collect_cache_buffer_overrun", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "The number of times the trace overwritten in the circular buffer has not yet been sent"},
{Name: "collect_cache_capacity", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The number of traces that can be stored in the cache"},
{Name: "collect_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "The number of traces currently stored in the cache"},
}

func NewInMemCache(
capacity int,
metrics metrics.Metrics,
met metrics.Metrics,
logger logger.Logger,
) *DefaultInMemCache {
logger.Debug().Logf("Starting DefaultInMemCache")
defer func() { logger.Debug().Logf("Finished starting DefaultInMemCache") }()

// buffer_overrun increments when the trace overwritten in the circular
// buffer has not yet been sent
metrics.Register("collect_cache_buffer_overrun", "counter")
metrics.Register("collect_cache_capacity", "gauge")
metrics.Register("collect_cache_entries", "histogram")
for _, metadata := range collectCacheMetrics {
met.Register(metadata)
}

if capacity == 0 {
capacity = DefaultInMemCacheCapacity
}

return &DefaultInMemCache{
Metrics: metrics,
Metrics: met,
Logger: logger,
cache: make(map[string]*types.Trace, capacity),
traceBuffer: make([]*types.Trace, capacity),
Expand Down
9 changes: 9 additions & 0 deletions collect/cache/cuckoo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ const (
AddQueueSleepTime = 100 * time.Microsecond
)

var cuckooTraceCheckerMetrics = []metrics.Metadata{
{Name: CurrentCapacity, Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "current capacity of the cuckoo filter"},
{Name: FutureLoadFactor, Type: metrics.Gauge, Unit: metrics.Percent, Description: "the fraction of slots occupied in the future cuckoo filter"},
{Name: CurrentLoadFactor, Type: metrics.Gauge, Unit: metrics.Percent, Description: "the fraction of slots occupied in the current cuckoo filter"},
}

func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker {
c := &CuckooTraceChecker{
capacity: capacity,
Expand All @@ -52,6 +58,9 @@ func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker
met: m,
addch: make(chan string, AddQueueDepth),
}
for _, metric := range cuckooTraceCheckerMetrics {
m.Register(metric)
}

// To try to avoid blocking on Add, we have a goroutine that pulls from a
// channel and adds to the filter.
Expand Down
8 changes: 7 additions & 1 deletion collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ type cuckooSentCache struct {
// Make sure it implements TraceSentCache
var _ TraceSentCache = (*cuckooSentCache)(nil)

var cuckooSentCacheMetrics = []metrics.Metadata{
{Name: "cache_recent_dropped_traces", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "the current size of the most recent dropped trace cache"},
}

func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (TraceSentCache, error) {
stc, err := lru.New[string, *keptTraceCacheEntry](int(cfg.KeptSize))
if err != nil {
Expand All @@ -180,7 +184,9 @@ func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (Trac
// request.
recentDroppedIDs := generics.NewSetWithTTL[string](3 * time.Second)

met.Register("cache_recent_dropped_traces", "gauge")
for _, metric := range cuckooSentCacheMetrics {
met.Register(metric)
}

cache := &cuckooSentCache{
met: met,
Expand Down
12 changes: 9 additions & 3 deletions collect/cache/kept_reasons_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ type KeptReasonsCache struct {
hashSeed uint64
}

var keptReasonCacheMetrics = []metrics.Metadata{
{Name: "collect_sent_reasons_cache_entries", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "Number of entries in the sent reasons cache"},
}

// NewKeptReasonsCache returns a new SentReasonsCache.
func NewKeptReasonsCache(metrics metrics.Metrics) *KeptReasonsCache {
metrics.Register("collect_sent_reasons_cache_entries", "histogram")
func NewKeptReasonsCache(met metrics.Metrics) *KeptReasonsCache {
for _, metric := range keptReasonCacheMetrics {
met.Register(metric)
}

return &KeptReasonsCache{
Metrics: metrics,
Metrics: met,
keys: make(map[uint64]uint32),
hashSeed: rand.Uint64(),
}
Expand Down
72 changes: 38 additions & 34 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,39 @@ type InMemCollector struct {
hostname string
}

var inMemCollectorMetrics = []metrics.Metadata{
{Name: "trace_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "time taken to process a trace from arrival to send"},
{Name: "trace_span_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans in a trace"},
{Name: "collector_incoming_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the incoming queue"},
{Name: "collector_peer_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the peer queue"},
{Name: "collector_incoming_queue_length", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans in the incoming queue"},
{Name: "collector_peer_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of spans currently in the peer queue"},
{Name: "collector_cache_size", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces currently stored in the trace cache"},
{Name: "memory_heap_allocation", Type: metrics.Gauge, Unit: metrics.Bytes, Description: "current heap allocation"},
{Name: "span_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans received by the collector"},
{Name: "span_processed", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans processed by the collector"},
{Name: "spans_waiting", Type: metrics.UpDown, Unit: metrics.Dimensionless, Description: "number of spans waiting to be processed by the collector"},
{Name: "trace_sent_cache_hit", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of late spans received for traces that have already been sent"},
{Name: "trace_accepted", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of new traces received by the collector"},
{Name: "trace_send_kept", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that has been kept"},
{Name: "trace_send_dropped", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that has been dropped"},
{Name: "trace_send_has_root", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept traces that have a root span"},
{Name: "trace_send_no_root", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept traces that do not have a root span"},
{Name: "trace_forwarded_on_peer_change", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces forwarded due to peer membership change"},
{Name: "trace_redistribution_count", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of traces redistributed due to peer membership change"},
{Name: "trace_send_on_shutdown", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces sent during shutdown"},
{Name: "trace_forwarded_on_shutdown", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces forwarded during shutdown"},

{Name: TraceSendGotRoot, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to root span arrival"},
{Name: TraceSendExpired, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to TraceTimeout or SendDelay"},
{Name: TraceSendSpanLimit, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to span limit"},
{Name: TraceSendEjectedFull, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to cache capacity overrun"},
{Name: TraceSendEjectedMemsize, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces that are ready for decision due to memory overrun"},
{Name: TraceSendLateSpan, Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of spans that are sent due to late span arrival"},

{Name: "dropped_from_stress", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of traces dropped due to stress relief"},
}

func (i *InMemCollector) Start() error {
i.Logger.Debug().Logf("Starting InMemCollector")
defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }()
Expand All @@ -107,39 +140,11 @@ func (i *InMemCollector) Start() error {

i.Health.Register(CollectorHealthKey, 3*time.Second)

i.Metrics.Register("trace_duration_ms", "histogram")
i.Metrics.Register("trace_span_count", "histogram")
i.Metrics.Register("collector_incoming_queue", "histogram")
i.Metrics.Register("collector_peer_queue_length", "gauge")
i.Metrics.Register("collector_incoming_queue_length", "gauge")
i.Metrics.Register("collector_peer_queue", "histogram")
i.Metrics.Register("collector_cache_size", "gauge")
i.Metrics.Register("memory_heap_allocation", "gauge")
i.Metrics.Register("span_received", "counter")
i.Metrics.Register("span_processed", "counter")
i.Metrics.Register("spans_waiting", "updown")
i.Metrics.Register("trace_sent_cache_hit", "counter")
i.Metrics.Register("trace_accepted", "counter")
i.Metrics.Register("trace_send_kept", "counter")
i.Metrics.Register("trace_send_dropped", "counter")
i.Metrics.Register("trace_send_has_root", "counter")
i.Metrics.Register("trace_send_no_root", "counter")
i.Metrics.Register("trace_forwarded_on_peer_change", "gauge")
i.Metrics.Register("trace_redistribution_count", "gauge")
i.Metrics.Register("trace_send_on_shutdown", "counter")
i.Metrics.Register("trace_forwarded_on_shutdown", "counter")

i.Metrics.Register(TraceSendGotRoot, "counter")
i.Metrics.Register(TraceSendExpired, "counter")
i.Metrics.Register(TraceSendSpanLimit, "counter")
i.Metrics.Register(TraceSendEjectedFull, "counter")
i.Metrics.Register(TraceSendEjectedMemsize, "counter")
i.Metrics.Register(TraceSendLateSpan, "counter")
for _, metric := range inMemCollectorMetrics {
i.Metrics.Register(metric)
}

sampleCacheConfig := i.Config.GetSampleCacheConfig()
i.Metrics.Register(cache.CurrentCapacity, "gauge")
i.Metrics.Register(cache.FutureLoadFactor, "gauge")
i.Metrics.Register(cache.CurrentLoadFactor, "gauge")
var err error
i.sampleTraceCache, err = cache.NewCuckooSentCache(sampleCacheConfig, i.Metrics)
if err != nil {
Expand Down Expand Up @@ -1061,19 +1066,18 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) {
}
}

func newRedistributeNotifier(logger logger.Logger, metrics metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: 30 * time.Second,
maxAttempts: 5,
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: metrics,
metrics: met,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}
r.metrics.Register("trace_redistribution_count", "gauge")

return r
}
Expand Down
14 changes: 10 additions & 4 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ type StressRelief struct {

const StressReliefHealthKey = "stress_relief"

var stressReliefMetrics = []metrics.Metadata{
{Name: "cluster_stress_level", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The overall stress level of the cluster"},
{Name: "individual_stress_level", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The stress level of the individual node"},
{Name: "stress_level", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "The stress level that's being used to determine whether to activate stress relief"},
{Name: "stress_relief_activated", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "Whether stress relief is currently activated"},
}

func (s *StressRelief) Start() error {
s.Logger.Debug().Logf("Starting StressRelief system")
defer func() { s.Logger.Debug().Logf("Finished starting StressRelief system") }()
Expand All @@ -115,10 +122,9 @@ func (s *StressRelief) Start() error {
s.Health.Register(StressReliefHealthKey, 3*time.Second)

// register stress level metrics
s.RefineryMetrics.Register("cluster_stress_level", "gauge")
s.RefineryMetrics.Register("individual_stress_level", "gauge")
s.RefineryMetrics.Register("stress_level", "gauge")
s.RefineryMetrics.Register("stress_relief_activated", "gauge")
for _, m := range stressReliefMetrics {
s.RefineryMetrics.Register(m)
}

// We use an algorithms map so that we can name these algorithms, which makes it easier for several things:
// - change our mind about which algorithm to use
Expand Down
15 changes: 12 additions & 3 deletions collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func TestStressRelief_Monitor(t *testing.T) {
defer stop()
require.NoError(t, sr.Start())

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")
sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

Expand Down Expand Up @@ -81,7 +84,10 @@ func TestStressRelief_Peer(t *testing.T) {
defer stop()
require.NoError(t, sr.Start())

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")
sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

Expand Down Expand Up @@ -139,7 +145,10 @@ func TestStressRelief_OverallStressLevel(t *testing.T) {
sr.disableStressLevelReport = true
sr.Start()

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")
sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

Expand Down
8 changes: 8 additions & 0 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type Health struct {
Reporter
}

var healthMetrics = []metrics.Metadata{
{Name: "is_ready", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "Whether the system is ready to receive traffic"},
{Name: "is_alive", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "Whether the system is alive and reporting in"},
}

func (h *Health) Start() error {
// if we don't have a logger or metrics object, we'll use the null ones (makes testing easier)
if h.Logger == nil {
Expand All @@ -76,6 +81,9 @@ func (h *Health) Start() error {
if h.Metrics == nil {
h.Metrics = &metrics.NullMetrics{}
}
for _, metric := range healthMetrics {
h.Metrics.Register(metric)
}
h.timeouts = make(map[string]time.Duration)
h.timeLeft = make(map[string]time.Duration)
h.readies = make(map[string]bool)
Expand Down
Loading
Loading