Skip to content

Commit

Permalink
feat: use pub/sub for stress relief (#1221)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

Enable stress relief management for Refinery cluster as a whole rather
than only for individual instances.

## Short description of the changes

- broadcast individual stress level through pub/sub to other peers
- calculate cluster stress level when individual cluster level changes
or on a timer
- during stress relief, deterministic sampling for a portion of traces
based on current cluster stress level
- add tests
  • Loading branch information
VinozzZ authored Jul 23, 2024
1 parent aaa6d9b commit 8c96628
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 93 deletions.
64 changes: 40 additions & 24 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
)

Expand All @@ -34,7 +35,7 @@ type Collector interface {
AddSpanFromPeer(*types.Span) error
Stressed() bool
GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string)
ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string)
ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool)
}

func GetCollectorImplementation(c config.Config) Collector {
Expand All @@ -54,6 +55,7 @@ const (
type InMemCollector struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Clock clockwork.Clock `inject:""`
Tracer trace.Tracer `inject:"tracer"`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
Expand Down Expand Up @@ -98,8 +100,6 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register("collector_peer_queue_length", "gauge")
i.Metrics.Register("collector_incoming_queue_length", "gauge")
i.Metrics.Register("collector_peer_queue", "histogram")
i.Metrics.Register("stress_level", "gauge")
i.Metrics.Register("stress_relief_activated", "gauge")
i.Metrics.Register("collector_cache_size", "gauge")
i.Metrics.Register("memory_heap_allocation", "gauge")
i.Metrics.Register("span_received", "counter")
Expand Down Expand Up @@ -323,12 +323,6 @@ func (i *InMemCollector) collect() {
i.Metrics.Histogram("collector_peer_queue", float64(len(i.fromPeer)))
i.Metrics.Gauge("collector_incoming_queue_length", float64(len(i.incoming)))
i.Metrics.Gauge("collector_peer_queue_length", float64(len(i.fromPeer)))
i.Metrics.Gauge("stress_level", float64(i.StressRelief.StressLevel()))
if i.StressRelief.Stressed() {
i.Metrics.Gauge("stress_relief_activated", 1)
} else {
i.Metrics.Gauge("stress_relief_activated", 0)
}

// Always drain peer channel before doing anything else. By processing peer
// traffic preferentially we avoid the situation where the cluster essentially
Expand Down Expand Up @@ -469,23 +463,42 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// cache as "kept".
// It doesn't do any logging and barely touches metrics; this is about as
// minimal as we can make it.
func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
now := time.Now()
trace := &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Dataset: sp.Dataset,
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now,
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool) {
_, span := otelutil.StartSpanWith(context.Background(), i.Tracer, "collector.ProcessSpanImmediately", "trace_id", sp.TraceID)
defer span.End()

if !i.StressRelief.ShouldSampleDeterministically(sp.TraceID) {
otelutil.AddSpanField(span, "nondeterministic", 1)
return false, false
}

var rate uint
record, reason, found := i.sampleTraceCache.Check(sp)
if !found {
rate, keep, reason = i.StressRelief.GetSampleRate(sp.TraceID)
now := i.Clock.Now()
trace := &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Dataset: sp.Dataset,
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now,
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
} else {
rate = record.Rate()
keep = record.Kept()
}

if !keep {
i.Metrics.Increment("dropped_from_stress")
return
return true, false
}

i.Metrics.Increment("kept_from_stress")
// ok, we're sending it, so decorate it first
sp.Event.Data["meta.stressed"] = true
if i.Config.GetAddRuleReasonToTrace() {
Expand All @@ -494,9 +507,12 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampl
if i.hostname != "" {
sp.Data["meta.refinery.local_hostname"] = i.hostname
}

i.addAdditionalAttributes(sp)
mergeTraceAndSpanSampleRates(sp, sampleRate, i.Config.GetIsDryRun())
mergeTraceAndSpanSampleRates(sp, rate, i.Config.GetIsDryRun())
i.Transmission.EnqueueSpan(sp)

return true, true
}

// dealWithSentTrace handles a span that has arrived after the sampling decision
Expand Down
2 changes: 2 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/facebookgo/inject"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
Expand Down Expand Up @@ -746,6 +747,7 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &config.MockConfig{}},
&inject.Object{Value: &logger.NullLogger{}},
&inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"},
&inject.Object{Value: clockwork.NewRealClock()},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
Expand Down
4 changes: 3 additions & 1 deletion collect/mockCollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func (m *MockCollector) GetStressedSampleRate(traceID string) (rate uint, keep b
return 0, false, ""
}

func (m *MockCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
func (m *MockCollector) ProcessSpanImmediately(sp *types.Span) (bool, bool) {
m.Spans <- sp

return true, true
}

func (m *MockCollector) Stressed() bool {
Expand Down
Loading

0 comments on commit 8c96628

Please sign in to comment.