Skip to content

Commit

Permalink
Extract Sent Cache to an interface for future expansion (#561)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

This is a prep PR for further work on the sent trace cache. For improved
scalability, we want to be able to store trace decision records for a
longer time. The best way to implement this in a backwards-compatible
way is to pull the mechanisms for managing decision records into an
interface, and then implement the interface with the legacy logic.
That's what this does. There are no expected changes in behavior, and
all tests still pass.

## Short description of the changes

- Define interfaces for a TraceSentCache and a TraceSentRecord
- Implement code for those that duplicates the existing legacy logic
- Refactor the places the code is used to use the new interfaces
- Tweak span count data type so that it's not an int64 any more
- Rename SpanCount to DescendantCount because now that we have links and
events, they're not all Spans anymore, and future PRs will add
additional counting functions. I haven't renamed the corresponding Get
and Add functions because that's pretty messy.
  • Loading branch information
kentquirk authored Nov 14, 2022
1 parent 4520d04 commit 4e07a5c
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 54 deletions.
73 changes: 73 additions & 0 deletions collect/cache/legacySentCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cache

import (
lru "github.com/hashicorp/golang-lru"
"github.com/honeycombio/refinery/types"
)

// legacySentRecord is Refinery's original traceSent cache. It keeps the same records
// for both kept and dropped traces and the size of the sent cache is set based on the size
// of the live trace cache.

// legacySentRecord is an internal record we leave behind when sending a trace to remember
// our decision for the future, so any delinquent spans that show up later can
// be dropped or passed along.
type legacySentRecord struct {
keep bool // true if the trace was kept, false if it was dropped
rate uint // sample rate used when sending the trace
spanCount uint // number of spans in the trace (we decorate the root span with this)
}

func (t *legacySentRecord) Kept() bool {
return t.keep
}

func (t *legacySentRecord) Rate() uint {
return t.rate
}

func (t *legacySentRecord) DescendantCount() uint {
return uint(t.spanCount)
}

func (t *legacySentRecord) Count(*types.Span) {
t.spanCount++
}

// Make sure it implements TraceSentRecord
var _ TraceSentRecord = (*legacySentRecord)(nil)

type legacySentCache struct {
sentTraceCache *lru.Cache
}

// Make sure it implements TraceSentCache
var _ TraceSentCache = (*legacySentCache)(nil)

func NewLegacySentCache(capacity int) (TraceSentCache, error) {
stc, err := lru.New(capacity)
if err != nil {
return nil, err
}
return &legacySentCache{sentTraceCache: stc}, nil
}

func (c *legacySentCache) Record(trace *types.Trace, keep bool) {
// record this decision in the sent record LRU for future spans
sentRecord := legacySentRecord{
keep: keep,
rate: trace.SampleRate,
spanCount: trace.DescendantCount(),
}
c.sentTraceCache.Add(trace.TraceID, &sentRecord)
}

func (c *legacySentCache) Check(span *types.Span) (TraceSentRecord, bool) {
if sentRecord, found := c.sentTraceCache.Get(span.TraceID); found {
if sr, ok := sentRecord.(*legacySentRecord); ok {
sr.Count(span)
return sr, true
}
}
return nil, false
}
24 changes: 24 additions & 0 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cache

import (
"github.com/honeycombio/refinery/types"
)

type TraceSentRecord interface {
// Kept returns whether the trace was kept (sampled and sent to honeycomb) or dropped.
Kept() bool
// Rate() returns the sample rate for the trace
Rate() uint
// DescendantCount returns the count of items associated with the trace, including all types of children like span links and span events.
DescendantCount() uint
// Count records additional spans in the totals
Count(*types.Span)
}

type TraceSentCache interface {
// Record preserves the record of a trace being sent or not.
Record(trace *types.Trace, keep bool)
// Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// else nil and false.
Check(span *types.Span) (TraceSentRecord, bool)
}
52 changes: 16 additions & 36 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/honeycombio/refinery/collect/cache"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
Expand Down Expand Up @@ -73,7 +72,7 @@ type InMemCollector struct {
cache cache.Cache
datasetSamplers map[string]sample.Sampler

sentTraceCache *lru.Cache
sentTraceCache cache.TraceSentCache

incoming chan *types.Span
fromPeer chan *types.Span
Expand All @@ -82,15 +81,6 @@ type InMemCollector struct {
hostname string
}

// traceSentRecord is the bit we leave behind when sending a trace to remember
// our decision for the future, so any delinquent spans that show up later can
// be dropped or passed along.
type traceSentRecord struct {
keep bool // true if the trace was kept, false if it was dropped
rate uint // sample rate used when sending the trace
spanCount int64 // number of spans in the trace (we decorate the root span with this)
}

func (i *InMemCollector) Start() error {
i.Logger.Debug().Logf("Starting InMemCollector")
defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }()
Expand Down Expand Up @@ -121,11 +111,10 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register(TraceSendEjectedFull, "counter")
i.Metrics.Register(TraceSendEjectedMemsize, "counter")

stc, err := lru.New(imcConfig.CacheCapacity * 5) // keep 5x ring buffer size
i.sentTraceCache, err = cache.NewLegacySentCache(imcConfig.CacheCapacity * 5) // (keep 5x ring buffer size)
if err != nil {
return err
}
i.sentTraceCache = stc

i.incoming = make(chan *types.Span, imcConfig.CacheCapacity*3)
i.fromPeer = make(chan *types.Span, imcConfig.CacheCapacity*3)
Expand Down Expand Up @@ -425,16 +414,13 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
trace := i.cache.Get(sp.TraceID)
if trace == nil {
// if the trace has already been sent, just pass along the span
if sentRecord, found := i.sentTraceCache.Get(sp.TraceID); found {
if sr, ok := sentRecord.(*traceSentRecord); ok {
i.Metrics.Increment("trace_sent_cache_hit")
// bump the count of records on this trace -- if the root span isn't
// the last late span, then it won't be perfect, but it will be better than
// having none at all
sentRecord.(*traceSentRecord).spanCount++
i.dealWithSentTrace(sr.keep, sr.rate, sentRecord.(*traceSentRecord).spanCount, sp)
return
}
if sr, found := i.sentTraceCache.Check(sp); found {
i.Metrics.Increment("trace_sent_cache_hit")
// bump the count of records on this trace -- if the root span isn't
// the last late span, then it won't be perfect, but it will be better than
// having none at all
i.dealWithSentTrace(sr.Kept(), sr.Rate(), sr.DescendantCount(), sp)
return
}
// trace hasn't already been sent (or this span is really old); let's
// create a new trace to hold it
Expand Down Expand Up @@ -464,7 +450,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// if the trace we got back from the cache has already been sent, deal with the
// span.
if trace.Sent {
i.dealWithSentTrace(trace.KeepSample, trace.SampleRate, trace.SpanCount(), sp)
i.dealWithSentTrace(trace.KeepSample, trace.SampleRate, trace.DescendantCount(), sp)
}

// great! trace is live. add the span.
Expand All @@ -486,7 +472,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// dealWithSentTrace handles a span that has arrived after the sampling decision
// on the trace has already been made, and it obeys that decision by either
// sending the span immediately or dropping it.
func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount int64, sp *types.Span) {
func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount uint, sp *types.Span) {
if i.Config.GetIsDryRun() {
field := i.Config.GetDryRunFieldName()
// if dry run mode is enabled, we keep all traces and mark the spans with the sampling decision
Expand All @@ -502,7 +488,7 @@ func (i *InMemCollector) dealWithSentTrace(keep bool, sampleRate uint, spanCount
mergeTraceAndSpanSampleRates(sp, sampleRate)
// if this span is a late root span, possibly update it with our current span count
if i.Config.GetAddSpanCountToRoot() && isRootSpan(sp) {
sp.Data["meta.span_count"] = spanCount
sp.Data["meta.span_count"] = int64(spanCount)
}
i.Transmission.EnqueueSpan(sp)
return
Expand Down Expand Up @@ -559,7 +545,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) {

traceDur := time.Since(trace.ArrivalTime)
i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds()))
i.Metrics.Histogram("trace_span_count", float64(trace.SpanCount()))
i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount()))
if trace.RootSpan != nil {
i.Metrics.Increment("trace_send_has_root")
} else {
Expand All @@ -584,7 +570,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) {

// If we have a root span, update it with the count before determining the SampleRate.
if i.Config.GetAddSpanCountToRoot() && trace.RootSpan != nil {
trace.RootSpan.Data["meta.span_count"] = trace.SpanCount()
trace.RootSpan.Data["meta.span_count"] = int64(trace.DescendantCount())
}

// use sampler key to find sampler; create and cache if not found
Expand All @@ -599,13 +585,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) {
trace.KeepSample = shouldSend
logFields["reason"] = reason

// record this decision in the sent record LRU for future spans
sentRecord := traceSentRecord{
keep: shouldSend,
rate: rate,
spanCount: trace.SpanCount(),
}
i.sentTraceCache.Add(trace.TraceID, &sentRecord)
i.sentTraceCache.Record(trace, shouldSend)

// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
if !shouldSend && !i.Config.GetIsDryRun() {
Expand All @@ -628,7 +608,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) {
// update the root span (if we have one, which we might not if the trace timed out)
// with the final total as of our send time
if i.Config.GetAddSpanCountToRoot() && isRootSpan(sp) {
sp.Data["meta.span_count"] = sentRecord.spanCount
sp.Data["meta.span_count"] = int64(trace.DescendantCount())
}

if i.Config.GetIsDryRun() {
Expand Down
3 changes: 1 addition & 2 deletions collect/collect_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/stretchr/testify/assert"

"github.com/honeycombio/refinery/collect/cache"
Expand Down Expand Up @@ -35,7 +34,7 @@ func BenchmarkCollect(b *testing.B) {
metric := &metrics.MockMetrics{}
metric.Start()

stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(b, err, "lru cache should start")

coll := &InMemCollector{
Expand Down
21 changes: 10 additions & 11 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/facebookgo/inject"
lru "github.com/hashicorp/golang-lru"
"github.com/stretchr/testify/assert"

"github.com/honeycombio/refinery/collect/cache"
Expand Down Expand Up @@ -48,7 +47,7 @@ func TestAddRootSpan(t *testing.T) {

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -126,7 +125,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -184,7 +183,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -245,7 +244,7 @@ func TestAddSpan(t *testing.T) {
}
c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -318,7 +317,7 @@ func TestDryRunMode(t *testing.T) {
}
c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -585,7 +584,7 @@ func TestOldMaxAlloc(t *testing.T) {
}
c := cache.NewInMemCache(1000, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -689,7 +688,7 @@ func TestStableMaxAlloc(t *testing.T) {

c := cache.NewInMemCache(1000, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -777,7 +776,7 @@ func TestAddSpanNoBlock(t *testing.T) {
}
c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -849,7 +848,7 @@ func TestAddSpanCount(t *testing.T) {
}
c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down Expand Up @@ -918,7 +917,7 @@ func TestLateRootGetsSpanCount(t *testing.T) {
}
c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := lru.New(15)
stc, err := cache.NewLegacySentCache(15)
assert.NoError(t, err, "lru cache should start")
coll.sentTraceCache = stc

Expand Down
2 changes: 1 addition & 1 deletion sample/dynamic_ema.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (d *EMADynamicSampler) Start() error {
}
d.dynsampler.Start()

// Register stastics this package will produce
// Register statistics this package will produce
d.Metrics.Register("dynsampler_num_dropped", "counter")
d.Metrics.Register("dynsampler_num_kept", "counter")
d.Metrics.Register("dynsampler_sample_rate", "histogram")
Expand Down
4 changes: 4 additions & 0 deletions tools/loadtest/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.direnv
.tool-versions
__*
.DS_Store
8 changes: 4 additions & 4 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ func (t *Trace) CacheImpact(traceTimeout time.Duration) int {
return t.totalImpact
}

// GetSpans returns the list of spans in this trace
// GetSpans returns the list of descendants in this trace
func (t *Trace) GetSpans() []*Span {
return t.spans
}

// SpanCount gets the number of spans currently in this trace as int64
func (t *Trace) SpanCount() int64 {
return int64(len(t.spans))
// DescendantCount gets the number of descendants of all kinds currently in this trace
func (t *Trace) DescendantCount() uint {
return uint(len(t.spans))
}

func (t *Trace) GetSamplerKey() (string, bool) {
Expand Down

0 comments on commit 4e07a5c

Please sign in to comment.