Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cuckoo-based drop cache #567

Merged
merged 13 commits into from
Nov 22, 2022
76 changes: 76 additions & 0 deletions collect/cache/cuckoo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cache

import (
"sync"

cuckoo "github.com/panmari/cuckoofilter"
)

// This wraps a cuckoo filter implementation in a way that lets us keep it running forever
// without filling up.
// You must call Maintain() periodically, most likely from a goroutine.
// We maintain two filters that are out of sync; when current is full, future is half full
// and we then discard current and replace it with the future, and then create a new, empty future.
// This means that we need to use the firstTime flag to avoid putting anything into future
// until after current reaches .5.
type CuckooTraceChecker struct {
current *cuckoo.Filter
future *cuckoo.Filter
mut sync.RWMutex
capacity uint
firstTime bool
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
}

func NewCuckooTraceChecker(capacity uint) *CuckooTraceChecker {
return &CuckooTraceChecker{
capacity: capacity,
current: cuckoo.NewFilter(capacity),
future: cuckoo.NewFilter(capacity),
}
}

// Add puts a traceID into the filter.
func (c *CuckooTraceChecker) Add(traceID string) {
c.mut.Lock()
defer c.mut.Unlock()
c.current.Insert([]byte(traceID))
// don't add anything to future until we're no longer in the 'firstTime' section.
if !c.firstTime {
c.future.Insert([]byte(traceID))
}
}

// Check tests if a traceID is (very probably) in the filter.
func (c *CuckooTraceChecker) Check(traceID string) bool {
b := []byte(traceID)
c.mut.RLock()
defer c.mut.RUnlock()
return c.current.Lookup(b)
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
}

// Maintain should be called periodically; if the current filter is full, it replaces
// it with the future filter and creates a new future filter.
func (c *CuckooTraceChecker) Maintain() {
c.mut.RLock()
dropFactor := c.current.LoadFactor()
c.mut.RUnlock()

// once the current one is half full, we can drop the firstTime check
if (c.firstTime && dropFactor > 0.5) || dropFactor > 0.99 {
c.mut.Lock()
defer c.mut.Unlock()
c.firstTime = false
// if the current one is full, cycle the filters
if dropFactor > 0.99 {
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
c.current = c.future
c.future = cuckoo.NewFilter(c.capacity)
}
}
}

// SetNextCapacity adjusts the capacity that will be set for the future filter on the next replacement.
func (c *CuckooTraceChecker) SetNextCapacity(capacity uint) {
c.mut.Lock()
defer c.mut.Unlock()
c.capacity = capacity
}
189 changes: 189 additions & 0 deletions collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package cache

import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/types"
)

// cuckooSentCache extends Refinery's legacy cache. It keeps the same records
// for kept traces but adds a pair of cuckoo filters to record dropped traces.
// This allows many more traces to be kept in the cache; now only kept records
// are retained in the cache of sentRecords.
// The size of the sent cache is still set based on the size of the live trace cache,
// and the size of the dropped cache is an independent value.

// cuckooKeptRecord is an internal record we leave behind when keeping a trace to remember
// our decision for the future. We only store them if the record was kept.
type cuckooKeptRecord struct {
rate uint // sample rate used when sending the trace
spanCount uint // number of spans in the trace (we decorate the root span with this)
}

func (t *cuckooKeptRecord) Kept() bool {
return true
}

func (t *cuckooKeptRecord) Rate() uint {
return t.rate
}

func (t *cuckooKeptRecord) DescendantCount() uint {
return uint(t.spanCount)
}

func (t *cuckooKeptRecord) Count(*types.Span) {
t.spanCount++
}

// Make sure it implements TraceSentRecord
var _ TraceSentRecord = (*cuckooKeptRecord)(nil)

// cuckooSentRecord is what we return when the trace was dropped.
// It's always the same one.
type cuckooDroppedRecord struct{}

func (t *cuckooDroppedRecord) Kept() bool {
return false
}

func (t *cuckooDroppedRecord) Rate() uint {
return 0
}

func (t *cuckooDroppedRecord) DescendantCount() uint {
return 0
}

func (t *cuckooDroppedRecord) Count(*types.Span) {
}

// Make sure it implements TraceSentRecord
var _ TraceSentRecord = (*cuckooDroppedRecord)(nil)

type cuckooSentCache struct {
kept *lru.Cache
dropped *CuckooTraceChecker
cfg config.SampleCacheConfig

// The done channel is used to decide when to terminate the monitor
// goroutine. When resizing the cache, we write to the channel, but
// when terminating the system, call Stop() to close the channel.
// Either one causes the goroutine to shut down, and in resizing
// we then start a new monitor.
done chan struct{}

// This mutex is for managing kept traces
keptMut sync.Mutex
}

// Make sure it implements TraceSentCache
var _ TraceSentCache = (*cuckooSentCache)(nil)

func NewCuckooSentCache(cfg config.SampleCacheConfig) (TraceSentCache, error) {
stc, err := lru.New(int(cfg.KeptSize))
if err != nil {
return nil, err
}
dropped := NewCuckooTraceChecker(cfg.DroppedSize)

cache := &cuckooSentCache{
kept: stc,
dropped: dropped,
cfg: cfg,
done: make(chan struct{}),
}
// TODO: metrics for when this puppy gets cycled
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
go cache.monitor()
return cache, nil
}

// goroutine to monitor the cache and cycle the size check periodically
func (c *cuckooSentCache) monitor() {
ticker := time.NewTicker(c.cfg.SizeCheckInterval)
for {
select {
case <-ticker.C:
c.dropped.Maintain()
case <-c.done:
return
}
}
}

// Stop halts the monitor goroutine
func (c *cuckooSentCache) Stop() {
close(c.done)
}

func (c *cuckooSentCache) Record(trace *types.Trace, keep bool) {
if keep {
// record this decision in the sent record LRU for future spans
sentRecord := cuckooKeptRecord{
rate: trace.SampleRate,
spanCount: trace.DescendantCount(),
}
c.keptMut.Lock()
defer c.keptMut.Unlock()
c.kept.Add(trace.TraceID, &sentRecord)
return
}
// if we're not keeping it, save it in the dropped trace filter
c.dropped.Add(trace.TraceID)
}

func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, bool) {
// was it dropped?
if c.dropped.Check(span.TraceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, false
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
}
// was it kept?
c.keptMut.Lock()
defer c.keptMut.Unlock()
if sentRecord, found := c.kept.Get(span.TraceID); found {
if sr, ok := sentRecord.(*cuckooKeptRecord); ok {
// if we kept it, then this span being checked needs counting too
sr.Count(span)
return sr, true
}
}
// we have no memory of this place
return nil, false
}

func (c *cuckooSentCache) Resize(cfg config.SampleCacheConfig) error {
stc, err := lru.New(int(cfg.KeptSize))
if err != nil {
return err
}

// grab all the items in the current cache; if it's larger than
// what will fit in the new one, discard the oldest ones
// (we don't have to do anything with the ones we discard, this is
// the trace decisions cache).
c.keptMut.Lock()
defer c.keptMut.Unlock()
keys := c.kept.Keys()
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
if len(keys) > int(cfg.KeptSize) {
keys = keys[len(keys)-int(cfg.KeptSize):]
}
// copy all the keys to the new cache in order
for _, k := range keys {
if v, found := c.kept.Get(k); found {
stc.Add(k, v)
}
}
c.kept = stc

// also set up the drop cache size to change eventually
c.dropped.SetNextCapacity(cfg.DroppedSize)

// shut down the old monitor and create a new one
c.done <- struct{}{}
go c.monitor()
return nil
}
11 changes: 11 additions & 0 deletions collect/cache/legacySentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
lru "github.com/hashicorp/golang-lru"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/types"
)

Expand Down Expand Up @@ -71,3 +72,13 @@ func (c *legacySentCache) Check(span *types.Span) (TraceSentRecord, bool) {
}
return nil, false
}

// legacy Stop does nothing
// Stop halts the monitor goroutine
func (c *legacySentCache) Stop() {
}

// legacy Resize does nothing
func (c *legacySentCache) Resize(cfg config.SampleCacheConfig) error {
return nil
}
5 changes: 5 additions & 0 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/types"
)

Expand All @@ -21,4 +22,8 @@ type TraceSentCache interface {
// Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// else nil and false.
Check(span *types.Span) (TraceSentRecord, bool)
// Stop halts the cache in preparation for shutdown
Stop()
// Resize adjusts the size of the cache according to the Config passed in
Resize(cfg config.SampleCacheConfig) error
}
30 changes: 23 additions & 7 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type InMemCollector struct {
cache cache.Cache
datasetSamplers map[string]sample.Sampler

sentTraceCache cache.TraceSentCache
sampleTraceCache cache.TraceSentCache

incoming chan *types.Span
fromPeer chan *types.Span
Expand Down Expand Up @@ -111,9 +111,20 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register(TraceSendEjectedFull, "counter")
i.Metrics.Register(TraceSendEjectedMemsize, "counter")

i.sentTraceCache, err = cache.NewLegacySentCache(imcConfig.CacheCapacity * 5) // (keep 5x ring buffer size)
if err != nil {
return err
sampleCacheConfig := i.Config.GetSampleCacheConfig()
switch sampleCacheConfig.Type {
case "legacy", "":
i.sampleTraceCache, err = cache.NewLegacySentCache(imcConfig.CacheCapacity * 5) // (keep 5x ring buffer size)
if err != nil {
return err
}
case "cuckoo":
i.sampleTraceCache, err = cache.NewCuckooSentCache(sampleCacheConfig)
if err != nil {
return err
}
default:
return fmt.Errorf("validation failure - sampleTraceCache had invalid config type '%s'", sampleCacheConfig.Type)
}

i.incoming = make(chan *types.Span, imcConfig.CacheCapacity*3)
Expand Down Expand Up @@ -165,8 +176,10 @@ func (i *InMemCollector) reloadConfigs() {
}
i.cache = c
} else {
i.Logger.Debug().Logf("skipping reloading the cache on config reload because it hasn't changed capacity")
i.Logger.Debug().Logf("skipping reloading the in-memory cache on config reload because it hasn't changed capacity")
}

i.sampleTraceCache.Resize(i.Config.GetSampleCacheConfig())
} else {
i.Logger.Error().WithField("cache", i.cache.(*cache.DefaultInMemCache)).Logf("skipping reloading the cache on config reload because it's not an in-memory cache")
}
Expand Down Expand Up @@ -414,7 +427,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
trace := i.cache.Get(sp.TraceID)
if trace == nil {
// if the trace has already been sent, just pass along the span
if sr, found := i.sentTraceCache.Check(sp); found {
if sr, found := i.sampleTraceCache.Check(sp); found {
i.Metrics.Increment("trace_sent_cache_hit")
// bump the count of records on this trace -- if the root span isn't
// the last late span, then it won't be perfect, but it will be better than
Expand Down Expand Up @@ -585,7 +598,7 @@ func (i *InMemCollector) send(trace *types.Trace, reason string) {
trace.KeepSample = shouldSend
logFields["reason"] = reason

i.sentTraceCache.Record(trace, shouldSend)
i.sampleTraceCache.Record(trace, shouldSend)

// if we're supposed to drop this trace, and dry run mode is not enabled, then we're done.
if !shouldSend && !i.Config.GetIsDryRun() {
Expand Down Expand Up @@ -642,6 +655,9 @@ func (i *InMemCollector) Stop() error {
if i.Transmission != nil {
i.Transmission.Flush()
}

i.sampleTraceCache.Stop()

return nil
}

Expand Down
12 changes: 6 additions & 6 deletions collect/collect_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func BenchmarkCollect(b *testing.B) {
Config: conf,
Logger: log,
},
BlockOnAddSpan: true,
cache: cache.NewInMemCache(3, metric, log),
incoming: make(chan *types.Span, 500),
fromPeer: make(chan *types.Span, 500),
datasetSamplers: make(map[string]sample.Sampler),
sentTraceCache: stc,
BlockOnAddSpan: true,
cache: cache.NewInMemCache(3, metric, log),
incoming: make(chan *types.Span, 500),
fromPeer: make(chan *types.Span, 500),
datasetSamplers: make(map[string]sample.Sampler),
sampleTraceCache: stc,
}
go coll.collect()

Expand Down
Loading