Skip to content

Commit

Permalink
feat: use pub/sub for stress relief
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Jul 22, 2024
1 parent 023fe0f commit d55be52
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 78 deletions.
1 change: 1 addition & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func newStartedApp(
&inject.Object{Value: transmit.NewDefaultTransmission(peerClient, metricsr, "peer"), Name: "peerTransmission"},
&inject.Object{Value: shrdr},
&inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"},
&inject.Object{Value: clockwork.NewRealClock()},
&inject.Object{Value: collector},
&inject.Object{Value: metricsr, Name: "metrics"},
&inject.Object{Value: metricsr, Name: "genericMetrics"},
Expand Down
64 changes: 40 additions & 24 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
)

Expand All @@ -34,7 +35,7 @@ type Collector interface {
AddSpanFromPeer(*types.Span) error
Stressed() bool
GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string)
ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string)
ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool)
}

func GetCollectorImplementation(c config.Config) Collector {
Expand All @@ -54,6 +55,7 @@ const (
type InMemCollector struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Clock clockwork.Clock `inject:""`
Tracer trace.Tracer `inject:"tracer"`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
Expand Down Expand Up @@ -98,8 +100,6 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register("collector_peer_queue_length", "gauge")
i.Metrics.Register("collector_incoming_queue_length", "gauge")
i.Metrics.Register("collector_peer_queue", "histogram")
i.Metrics.Register("stress_level", "gauge")
i.Metrics.Register("stress_relief_activated", "gauge")
i.Metrics.Register("collector_cache_size", "gauge")
i.Metrics.Register("memory_heap_allocation", "gauge")
i.Metrics.Register("span_received", "counter")
Expand Down Expand Up @@ -323,12 +323,6 @@ func (i *InMemCollector) collect() {
i.Metrics.Histogram("collector_peer_queue", float64(len(i.fromPeer)))
i.Metrics.Gauge("collector_incoming_queue_length", float64(len(i.incoming)))
i.Metrics.Gauge("collector_peer_queue_length", float64(len(i.fromPeer)))
i.Metrics.Gauge("stress_level", float64(i.StressRelief.StressLevel()))
if i.StressRelief.Stressed() {
i.Metrics.Gauge("stress_relief_activated", 1)
} else {
i.Metrics.Gauge("stress_relief_activated", 0)
}

// Always drain peer channel before doing anything else. By processing peer
// traffic preferentially we avoid the situation where the cluster essentially
Expand Down Expand Up @@ -469,23 +463,42 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// cache as "kept".
// It doesn't do any logging and barely touches metrics; this is about as
// minimal as we can make it.
func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
now := time.Now()
trace := &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Dataset: sp.Dataset,
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now,
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool, keep bool) {
_, span := otelutil.StartSpanWith(context.Background(), i.Tracer, "collector.ProcessSpanImmediately", "trace_id", sp.TraceID)
defer span.End()

if !i.StressRelief.ShouldSampleDeterministically(sp.TraceID) {
otelutil.AddSpanField(span, "nondeterministic", 1)
return false, false
}

var rate uint
record, reason, found := i.sampleTraceCache.Check(sp)
if !found {
rate, keep, reason = i.StressRelief.GetSampleRate(sp.TraceID)
now := i.Clock.Now()
trace := &types.Trace{
APIHost: sp.APIHost,
APIKey: sp.APIKey,
Dataset: sp.Dataset,
TraceID: sp.TraceID,
ArrivalTime: now,
SendBy: now,
}
// we do want a record of how we disposed of traces in case more come in after we've
// turned off stress relief (if stress relief is on we'll keep making the same decisions)
i.sampleTraceCache.Record(trace, keep, reason)
} else {
rate = record.Rate()
keep = record.Kept()
}

if !keep {
i.Metrics.Increment("dropped_from_stress")
return
return true, false
}

i.Metrics.Increment("kept_from_stress")
// ok, we're sending it, so decorate it first
sp.Event.Data["meta.stressed"] = true
if i.Config.GetAddRuleReasonToTrace() {
Expand All @@ -494,9 +507,12 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampl
if i.hostname != "" {
sp.Data["meta.refinery.local_hostname"] = i.hostname
}

i.addAdditionalAttributes(sp)
mergeTraceAndSpanSampleRates(sp, sampleRate, i.Config.GetIsDryRun())
mergeTraceAndSpanSampleRates(sp, rate, i.Config.GetIsDryRun())
i.Transmission.EnqueueSpan(sp)

return true, true
}

// dealWithSentTrace handles a span that has arrived after the sampling decision
Expand Down
2 changes: 2 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/facebookgo/inject"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
Expand Down Expand Up @@ -746,6 +747,7 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &config.MockConfig{}},
&inject.Object{Value: &logger.NullLogger{}},
&inject.Object{Value: noop.NewTracerProvider().Tracer("test"), Name: "tracer"},
&inject.Object{Value: clockwork.NewRealClock()},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
Expand Down
4 changes: 3 additions & 1 deletion collect/mockCollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func (m *MockCollector) GetStressedSampleRate(traceID string) (rate uint, keep b
return 0, false, ""
}

func (m *MockCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
func (m *MockCollector) ProcessSpanImmediately(sp *types.Span) (bool, bool) {
m.Spans <- sp

return true, true
}

func (m *MockCollector) Stressed() bool {
Expand Down
Loading

0 comments on commit d55be52

Please sign in to comment.