From 4e07a5c61a7dc9ad762dde0048011ca9c5b465c8 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Mon, 14 Nov 2022 09:39:31 -0500 Subject: [PATCH] Extract Sent Cache to an interface for future expansion (#561) ## Which problem is this PR solving? This is a prep PR for further work on the sent trace cache. For improved scalability, we want to be able to store trace decision records for a longer time. The best way to implement this in a backwards-compatible way is to pull the mechanisms for managing decision records into an interface, and then implement the interface with the legacy logic. That's what this does. There are no expected changes in behavior, and all tests still pass. ## Short description of the changes - Define interfaces for a TraceSentCache and a TraceSentRecord - Implement code for those that duplicates the existing legacy logic - Refactor the places the code is used to use the new interfaces - Tweak span count data type so that it's not an int64 any more - Rename SpanCount to DescendantCount because now that we have links and events, they're not all Spans anymore, and future PRs will add additional counting functions. I haven't renamed the corresponding Get and Add functions because that's pretty messy. --- collect/cache/legacySentCache.go | 73 +++++++++++++++++++++++++++++++ collect/cache/traceSentCache.go | 24 ++++++++++ collect/collect.go | 52 +++++++--------------- collect/collect_benchmark_test.go | 3 +- collect/collect_test.go | 21 +++++---- sample/dynamic_ema.go | 2 +- tools/loadtest/.gitignore | 4 ++ types/event.go | 8 ++-- 8 files changed, 133 insertions(+), 54 deletions(-) create mode 100644 collect/cache/legacySentCache.go create mode 100644 collect/cache/traceSentCache.go create mode 100644 tools/loadtest/.gitignore diff --git a/collect/cache/legacySentCache.go b/collect/cache/legacySentCache.go new file mode 100644 index 0000000000..b3a84834c0 --- /dev/null +++ b/collect/cache/legacySentCache.go @@ -0,0 +1,73 @@ +package cache + +import ( + lru "github.com/hashicorp/golang-lru" + "github.com/honeycombio/refinery/types" +) + +// legacySentRecord is Refinery's original traceSent cache. It keeps the same records +// for both kept and dropped traces and the size of the sent cache is set based on the size +// of the live trace cache. + +// legacySentRecord is an internal record we leave behind when sending a trace to remember +// our decision for the future, so any delinquent spans that show up later can +// be dropped or passed along. +type legacySentRecord struct { + keep bool // true if the trace was kept, false if it was dropped + rate uint // sample rate used when sending the trace + spanCount uint // number of spans in the trace (we decorate the root span with this) +} + +func (t *legacySentRecord) Kept() bool { + return t.keep +} + +func (t *legacySentRecord) Rate() uint { + return t.rate +} + +func (t *legacySentRecord) DescendantCount() uint { + return uint(t.spanCount) +} + +func (t *legacySentRecord) Count(*types.Span) { + t.spanCount++ +} + +// Make sure it implements TraceSentRecord +var _ TraceSentRecord = (*legacySentRecord)(nil) + +type legacySentCache struct { + sentTraceCache *lru.Cache +} + +// Make sure it implements TraceSentCache +var _ TraceSentCache = (*legacySentCache)(nil) + +func NewLegacySentCache(capacity int) (TraceSentCache, error) { + stc, err := lru.New(capacity) + if err != nil { + return nil, err + } + return &legacySentCache{sentTraceCache: stc}, nil +} + +func (c *legacySentCache) Record(trace *types.Trace, keep bool) { + // record this decision in the sent record LRU for future spans + sentRecord := legacySentRecord{ + keep: keep, + rate: trace.SampleRate, + spanCount: trace.DescendantCount(), + } + c.sentTraceCache.Add(trace.TraceID, &sentRecord) +} + +func (c *legacySentCache) Check(span *types.Span) (TraceSentRecord, bool) { + if sentRecord, found := c.sentTraceCache.Get(span.TraceID); found { + if sr, ok := sentRecord.(*legacySentRecord); ok { + sr.Count(span) + return sr, true + } + } + return nil, false +} diff --git a/collect/cache/traceSentCache.go b/collect/cache/traceSentCache.go new file mode 100644 index 0000000000..e55e3b0def --- /dev/null +++ b/collect/cache/traceSentCache.go @@ -0,0 +1,24 @@ +package cache + +import ( + "github.com/honeycombio/refinery/types" +) + +type TraceSentRecord interface { + // Kept returns whether the trace was kept (sampled and sent to honeycomb) or dropped. + Kept() bool + // Rate() returns the sample rate for the trace + Rate() uint + // DescendantCount returns the count of items associated with the trace, including all types of children like span links and span events. + DescendantCount() uint + // Count records additional spans in the totals + Count(*types.Span) +} + +type TraceSentCache interface { + // Record preserves the record of a trace being sent or not. + Record(trace *types.Trace, keep bool) + // Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true, + // else nil and false. + Check(span *types.Span) (TraceSentRecord, bool) +} diff --git a/collect/collect.go b/collect/collect.go index 24f136d81b..cf664bf9b9 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -9,7 +9,6 @@ import ( "sync" "time" - lru "github.com/hashicorp/golang-lru" "github.com/honeycombio/refinery/collect/cache" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/logger" @@ -73,7 +72,7 @@ type InMemCollector struct { cache cache.Cache datasetSamplers map[string]sample.Sampler - sentTraceCache *lru.Cache + sentTraceCache cache.TraceSentCache incoming chan *types.Span fromPeer chan *types.Span @@ -82,15 +81,6 @@ type InMemCollector struct { hostname string } -// traceSentRecord is the bit we leave behind when sending a trace to remember -// our decision for the future, so any delinquent spans that show up later can -// be dropped or passed along. -type traceSentRecord struct { - keep bool // true if the trace was kept, false if it was dropped - rate uint // sample rate used when sending the trace - spanCount int64 // number of spans in the trace (we decorate the root span with this) -} - func (i *InMemCollector) Start() error { i.Logger.Debug().Logf("Starting InMemCollector") defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }() @@ -121,11 +111,10 @@ func (i *InMemCollector) Start() error { i.Metrics.Register(TraceSendEjectedFull, "counter") i.Metrics.Register(TraceSendEjectedMemsize, "counter") - stc, err := lru.New(imcConfig.CacheCapacity * 5) // keep 5x ring buffer size + i.sentTraceCache, err = cache.NewLegacySentCache(imcConfig.CacheCapacity * 5) // (keep 5x ring buffer size) if err != nil { return err } - i.sentTraceCache = stc i.incoming = make(chan *types.Span, imcConfig.CacheCapacity*3) i.fromPeer = make(chan *types.Span, imcConfig.CacheCapacity*3) @@ -425,16 +414,13 @@ func (i *InMemCollector) processSpan(sp *types.Span) { trace := i.cache.Get(sp.TraceID) if trace == nil { // if the trace has already been sent, just pass along the span - if sentRecord, found := i.sentTraceCache.Get(sp.TraceID); found { - if sr, ok := sentRecord.(*traceSentRecord); ok { - i.Metrics.Increment("trace_sent_cache_hit") - // bump the count of records on this trace -- if the root span isn't - // the last late span, then it won't be perfect, but it will be better than - // having none at all - sentRecord.(*traceSentRecord).spanCount++ - i.dealWithSentTrace(sr.keep, sr.rate, sentRecord.(*traceSentRecord).spanCount, sp) - return - } + if sr, found := i.sentTraceCache.Check(sp); found { + i.Metrics.Increment("trace_sent_cache_hit") + // bump the count of records on this trace -- if the root span isn't + // the last late span, then it won't be perfect, but it will be better than + // having none at all + i.dealWithSentTrace(sr.Kept(), sr.Rate(), sr.DescendantCount(), sp) + return } // trace hasn't already been sent (or this span is really old); let's // create a new trace to hold it @@ -464,7 +450,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // if the trace we got back from the cache has already been sent, deal with the // span. if trace.Sent { - i.dealWithSentTrace(trace.KeepSample, trace.SampleRate, trace.SpanCount(), sp) + i.dealWithSentTrace(trace.KeepSample, trace.SampleRate, trace.DescendantCount(), sp) } // great! trace is live. add the span. @@ -486,7 +472,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // dealWithSentTrace handles a span that has arrived after the sampling decision // on the trace has already been made, and it obeys that decision by either // sending the span immediately or dropping it. -func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount int64, sp *types.Span) { +func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount uint, sp *types.Span) { if i.Config.GetIsDryRun() { field := i.Config.GetDryRunFieldName() // if dry run mode is enabled, we keep all traces and mark the spans with the sampling decision @@ -502,7 +488,7 @@ func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount mergeTraceAndSpanSampleRates(sp, sampleRate) // if this span is a late root span, possibly update it with our current span count if i.Config.GetAddSpanCountToRoot() && isRootSpan(sp) { - sp.Data["meta.span_count"] = spanCount + sp.Data["meta.span_count"] = int64(spanCount) } i.Transmission.EnqueueSpan(sp) return @@ -559,7 +545,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { traceDur := time.Since(trace.ArrivalTime) i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds())) - i.Metrics.Histogram("trace_span_count", float64(trace.SpanCount())) + i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount())) if trace.RootSpan != nil { i.Metrics.Increment("trace_send_has_root") } else { @@ -584,7 +570,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { // If we have a root span, update it with the count before determining the SampleRate. if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil { - trace.RootSpan.Data["meta.span_count"] = trace.SpanCount() + trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount()) } // use sampler key to find sampler; create and cache if not found @@ -599,13 +585,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { trace.KeepSample = shouldSend logFields["reason"] = reason - // record this decision in the sent record LRU for future spans - sentRecord := traceSentRecord{ - keep: shouldSend, - rate: rate, - spanCount: trace.SpanCount(), - } - i.sentTraceCache.Add(trace.TraceID, &sentRecord) + i.sentTraceCache.Record(trace, shouldSend) // if we're supposed to drop this trace, and dry run mode is not enabled, then we're done. if !shouldSend && !i.Config.GetIsDryRun() { @@ -628,7 +608,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) { // update the root span (if we have one, which we might not if the trace timed out) // with the final total as of our send time if i.Config.GetAddSpanCountToRoot() && isRootSpan(sp) { - sp.Data["meta.span_count"] = sentRecord.spanCount + sp.Data["meta.span_count"] = int64(trace.DescendantCount()) } if i.Config.GetIsDryRun() { diff --git a/collect/collect_benchmark_test.go b/collect/collect_benchmark_test.go index fda9ea7014..6171dd583d 100644 --- a/collect/collect_benchmark_test.go +++ b/collect/collect_benchmark_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - lru "github.com/hashicorp/golang-lru" "github.com/stretchr/testify/assert" "github.com/honeycombio/refinery/collect/cache" @@ -35,7 +34,7 @@ func BenchmarkCollect(b *testing.B) { metric := &metrics.MockMetrics{} metric.Start() - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(b, err, "lru cache should start") coll := &InMemCollector{ diff --git a/collect/collect_test.go b/collect/collect_test.go index d0609ea4c6..0d444186ce 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/facebookgo/inject" - lru "github.com/hashicorp/golang-lru" "github.com/stretchr/testify/assert" "github.com/honeycombio/refinery/collect/cache" @@ -48,7 +47,7 @@ func TestAddRootSpan(t *testing.T) { c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -126,7 +125,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -184,7 +183,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -245,7 +244,7 @@ func TestAddSpan(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -318,7 +317,7 @@ func TestDryRunMode(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -585,7 +584,7 @@ func TestOldMaxAlloc(t *testing.T) { } c := cache.NewInMemCache(1000, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -689,7 +688,7 @@ func TestStableMaxAlloc(t *testing.T) { c := cache.NewInMemCache(1000, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -777,7 +776,7 @@ func TestAddSpanNoBlock(t *testing.T) { } c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -849,7 +848,7 @@ func TestAddSpanCount(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc @@ -918,7 +917,7 @@ func TestLateRootGetsSpanCount(t *testing.T) { } c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c - stc, err := lru.New(15) + stc, err := cache.NewLegacySentCache(15) assert.NoError(t, err, "lru cache should start") coll.sentTraceCache = stc diff --git a/sample/dynamic_ema.go b/sample/dynamic_ema.go index 3483e1a94b..34a88b4820 100644 --- a/sample/dynamic_ema.go +++ b/sample/dynamic_ema.go @@ -54,7 +54,7 @@ func (d *EMADynamicSampler) Start() error { } d.dynsampler.Start() - // Register stastics this package will produce + // Register statistics this package will produce d.Metrics.Register("dynsampler_num_dropped", "counter") d.Metrics.Register("dynsampler_num_kept", "counter") d.Metrics.Register("dynsampler_sample_rate", "histogram") diff --git a/tools/loadtest/.gitignore b/tools/loadtest/.gitignore new file mode 100644 index 0000000000..af7a074c2b --- /dev/null +++ b/tools/loadtest/.gitignore @@ -0,0 +1,4 @@ +.direnv +.tool-versions +__* +.DS_Store \ No newline at end of file diff --git a/types/event.go b/types/event.go index 68eee8a97a..9e0b4384be 100644 --- a/types/event.go +++ b/types/event.go @@ -94,14 +94,14 @@ func (t *Trace) CacheImpact(traceTimeout time.Duration) int { return t.totalImpact } -// GetSpans returns the list of spans in this trace +// GetSpans returns the list of descendants in this trace func (t *Trace) GetSpans() []*Span { return t.spans } -// SpanCount gets the number of spans currently in this trace as int64 -func (t *Trace) SpanCount() int64 { - return int64(len(t.spans)) +// DescendantCount gets the number of descendants of all kinds currently in this trace +func (t *Trace) DescendantCount() uint { + return uint(len(t.spans)) } func (t *Trace) GetSamplerKey() (string, bool) {