diff --git a/collect/cache/legacySentCache.go b/collect/cache/legacySentCache.go new file mode 100644 index 0000000000..b3a84834c0 --- /dev/null +++ b/collect/cache/legacySentCache.go @@ -0,0 +1,73 @@ +package cache + +import ( + lru "github.com/hashicorp/golang-lru" + "github.com/honeycombio/refinery/types" +) + +// legacySentRecord is Refinery's original traceSent cache. It keeps the same records +// for both kept and dropped traces and the size of the sent cache is set based on the size +// of the live trace cache. + +// legacySentRecord is an internal record we leave behind when sending a trace to remember +// our decision for the future, so any delinquent spans that show up later can +// be dropped or passed along. +type legacySentRecord struct { + keep bool // true if the trace was kept, false if it was dropped + rate uint // sample rate used when sending the trace + spanCount uint // number of spans in the trace (we decorate the root span with this) +} + +func (t *legacySentRecord) Kept() bool { + return t.keep +} + +func (t *legacySentRecord) Rate() uint { + return t.rate +} + +func (t *legacySentRecord) DescendantCount() uint { + return uint(t.spanCount) +} + +func (t *legacySentRecord) Count(*types.Span) { + t.spanCount++ +} + +// Make sure it implements TraceSentRecord +var _ TraceSentRecord = (*legacySentRecord)(nil) + +type legacySentCache struct { + sentTraceCache *lru.Cache +} + +// Make sure it implements TraceSentCache +var _ TraceSentCache = (*legacySentCache)(nil) + +func NewLegacySentCache(capacity int) (TraceSentCache, error) { + stc, err := lru.New(capacity) + if err != nil { + return nil, err + } + return &legacySentCache{sentTraceCache: stc}, nil +} + +func (c *legacySentCache) Record(trace *types.Trace, keep bool) { + // record this decision in the sent record LRU for future spans + sentRecord := legacySentRecord{ + keep: keep, + rate: trace.SampleRate, + spanCount: trace.DescendantCount(), + } + c.sentTraceCache.Add(trace.TraceID, &sentRecord) +} + +func (c *legacySentCache) Check(span *types.Span) (TraceSentRecord, bool) { + if sentRecord, found := c.sentTraceCache.Get(span.TraceID); found { + if sr, ok := sentRecord.(*legacySentRecord); ok { + sr.Count(span) + return sr, true + } + } + return nil, false +} diff --git a/collect/cache/traceSentCache.go b/collect/cache/traceSentCache.go new file mode 100644 index 0000000000..e55e3b0def --- /dev/null +++ b/collect/cache/traceSentCache.go @@ -0,0 +1,24 @@ +package cache + +import ( + "github.com/honeycombio/refinery/types" +) + +type TraceSentRecord interface { + // Kept returns whether the trace was kept (sampled and sent to honeycomb) or dropped. + Kept() bool + // Rate() returns the sample rate for the trace + Rate() uint + // DescendantCount returns the count of items associated with the trace, including all types of children like span links and span events. + DescendantCount() uint + // Count records additional spans in the totals + Count(*types.Span) +} + +type TraceSentCache interface { + // Record preserves the record of a trace being sent or not. + Record(trace *types.Trace, keep bool) + // Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true, + // else nil and false. + Check(span *types.Span) (TraceSentRecord, bool) +} diff --git a/collect/collect.go b/collect/collect.go index 24f136d81b..cf664bf9b9 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -9,7 +9,6 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru" "github.com/honeycombio/refinery/collect/cache" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/logger" @@ -73,7 +72,7 @@ type InMemCollector struct { cache cache.Cache datasetSamplers map[string]sample.Sampler - sentTraceCache *lru.Cache + sentTraceCache cache.TraceSentCache incoming chan *types.Span fromPeer chan *types.Span @@ -82,15 +81,6 @@ type InMemCollector struct { hostname string } -// traceSentRecord is the bit we leave behind when sending a trace to remember -// our decision for the future, so any delinquent spans that show up later can -// be dropped or passed along. -type traceSentRecord struct { - keep bool // true if the trace was kept, false if it was dropped - rate uint // sample rate used when sending the trace - spanCount int64 // number of spans in the trace (we decorate the root span with this) -} - func (i *InMemCollector) Start() error { i.Logger.Debug().Logf("Starting InMemCollector") defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }() @@ -121,11 +111,10 @@ func (i *InMemCollector) Start() error { i.Metrics.Register(TraceSendEjectedFull, "counter") i.Metrics.Register(TraceSendEjectedMemsize, "counter") - stc, err := lru.New(imcConfig.CacheCapacity * 5) // keep 5x ring buffer size + i.sentTraceCache, err = cache.NewLegacySentCache(imcConfig.CacheCapacity * 5) // (keep 5x ring buffer size) if err != nil { return err } - i.sentTraceCache = stc i.incoming = make(chan *types.Span, imcConfig.CacheCapacity*3) i.fromPeer = make(chan *types.Span, imcConfig.CacheCapacity*3) @@ -425,16 +414,13 @@ func (i *InMemCollector) processSpan(sp *types.Span) { trace := i.cache.Get(sp.TraceID) if trace == nil { // if the trace has already been sent, just pass along the span - if sentRecord, found := i.sentTraceCache.Get(sp.TraceID); found { - if sr, ok := sentRecord.(*traceSentRecord); ok { - i.Metrics.Increment("trace_sent_cache_hit") - // bump the count of records on this trace -- if the root span isn't - // the last late span, then it won't be perfect, but it will be better than - // having none at all - sentRecord.(*traceSentRecord).spanCount++ - i.dealWithSentTrace(sr.keep, sr.rate, sentRecord.(*traceSentRecord).spanCount, sp) - return - } + if sr, found := i.sentTraceCache.Check(sp); found { + i.Metrics.Increment("trace_sent_cache_hit") + // bump the count of records on this trace -- if the root span isn't + // the last late span, then it won't be perfect, but it will be better than + // having none at all + i.dealWithSentTrace(sr.Kept(), sr.Rate(), sr.DescendantCount(), sp) + return } // trace hasn't already been sent (or this span is really old); let's // create a new trace to hold it @@ -464,7 +450,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // if the trace we got back from the cache has already been sent, deal with the // span. if trace.Sent { - i.dealWithSentTrace(trace.KeepSample, trace.SampleRate, trace.SpanCount(), sp) + i.dealWithSentTrace(trace.KeepSample, trace.SampleRate, trace.DescendantCount(), sp) } // great! trace is live. add the span. @@ -486,7 +472,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // dealWithSentTrace handles a span that has arrived after the sampling decision // on the trace has already been made, and it obeys that decision by either // sending the span immediately or dropping it. -func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount int64, sp *types.Span) { +func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount uint, sp *types.Span) { if i.Config.GetIsDryRun() { field := i.Config.GetDryRunFieldName() // if dry run mode is enabled, we keep all traces and mark the spans with the sampling decision @@ -502,7 +488,7 @@ func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount mergeTraceAndSpanSampleRates(sp, sampleRate) // if this span is a late root span, possibly update it with our current span count if i.Config.GetAddSpanCountToRoot() && isRootSpan(sp) { - sp.Data["meta.span_count"] = spanCount + sp.Data["meta.span_count"] = int64(spanCount) } i.Transmission.EnqueueSpan(sp) return @@ -559,7 +545,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { traceDur := time.Since(trace.ArrivalTime) i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds())) - i.Metrics.Histogram("trace_span_count", float64(trace.SpanCount())) + i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount())) if trace.RootSpan != nil { i.Metrics.Increment("trace_send_has_root") } else { @@ -584,7 +570,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { // If we have a root span, update it with the count before determining the SampleRate. if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil { - trace.RootSpan.Data["meta.span_count"] = trace.SpanCount() + trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount()) } // use sampler key to find sampler; create and cache if not found @@ -599,13 +585,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { trace.KeepSample = shouldSend logFields["reason"] = reason - // record this decision in the sent record LRU for future spans - sentRecord := traceSentRecord{ - keep: shouldSend, - rate: rate, - spanCount: trace.SpanCount(), - } - i.sentTraceCache.Add(trace.TraceID, &sentRecord) + i.sentTraceCache.Record(trace, shouldSend) // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. if !shouldSend && !i.Config.GetIsDryRun() { @@ -628,7 +608,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { // update the root span (if we have one, which we might not if the trace timed out) // with the final total as of our send time if i.Config.GetAddSpanCountToRoot() && isRootSpan(sp) { - sp.Data["meta.span_count"] = sentRecord.spanCount + sp.Data["meta.span_count"] = int64(trace.DescendantCount()) } if i.Config.GetIsDryRun() { diff --git a/collect/collect_benchmark_test.go b/collect/collect_benchmark_test.go index fda9ea7014..6171dd583d 100644 --- a/collect/collect_benchmark_test.go +++ b/collect/collect_benchmark_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - lru "github.com/hashicorp/golang-lru" "github.com/stretchr/testify/assert" "github.com/honeycombio/refinery/collect/cache" @@ -35,7 +34,7 @@ func BenchmarkCollect(b *testing.B) { metric := &metrics.MockMetrics{} metric.Start() - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(b, err, "lru cache should start") coll := &InMemCollector{ diff --git a/collect/collect_test.go b/collect/collect_test.go index d0609ea4c6..0d444186ce 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/facebookgo/inject" - lru "github.com/hashicorp/golang-lru" "github.com/stretchr/testify/assert" "github.com/honeycombio/refinery/collect/cache" @@ -48,7 +47,7 @@ func TestAddRootSpan(t *testing.T) { c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -126,7 +125,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -184,7 +183,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -245,7 +244,7 @@ func TestAddSpan(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -318,7 +317,7 @@ func TestDryRunMode(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -585,7 +584,7 @@ func TestOldMaxAlloc(t *testing.T) { } c := cache.NewInMemCache(1000, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -689,7 +688,7 @@ func TestStableMaxAlloc(t *testing.T) { c := cache.NewInMemCache(1000, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -777,7 +776,7 @@ func TestAddSpanNoBlock(t *testing.T) { } c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -849,7 +848,7 @@ func TestAddSpanCount(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -918,7 +917,7 @@ func TestLateRootGetsSpanCount(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc diff --git a/sample/dynamic_ema.go b/sample/dynamic_ema.go index 3483e1a94b..34a88b4820 100644 --- a/sample/dynamic_ema.go +++ b/sample/dynamic_ema.go @@ -54,7 +54,7 @@ func (d *EMADynamicSampler) Start() error { } d.dynsampler.Start() - // Register stastics this package will produce + // Register statistics this package will produce d.Metrics.Register("dynsampler_num_dropped", "counter") d.Metrics.Register("dynsampler_num_kept", "counter") d.Metrics.Register("dynsampler_sample_rate", "histogram") diff --git a/tools/loadtest/.gitignore b/tools/loadtest/.gitignore new file mode 100644 index 0000000000..af7a074c2b --- /dev/null +++ b/tools/loadtest/.gitignore @@ -0,0 +1,4 @@ +.direnv +.tool-versions +__* +.DS_Store \ No newline at end of file diff --git a/types/event.go b/types/event.go index 68eee8a97a..9e0b4384be 100644 --- a/types/event.go +++ b/types/event.go @@ -94,14 +94,14 @@ func (t *Trace) CacheImpact(traceTimeout time.Duration) int { return t.totalImpact } -// GetSpans returns the list of spans in this trace +// GetSpans returns the list of descendants in this trace func (t *Trace) GetSpans() []*Span { return t.spans } -// SpanCount gets the number of spans currently in this trace as int64 -func (t *Trace) SpanCount() int64 { - return int64(len(t.spans)) +// DescendantCount gets the number of descendants of all kinds currently in this trace +func (t *Trace) DescendantCount() uint { + return uint(len(t.spans)) } func (t *Trace) GetSamplerKey() (string, bool) {