Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cuckoo-based drop cache #567

Merged
merged 13 commits into from
Nov 22, 2022
97 changes: 97 additions & 0 deletions collect/cache/cuckoo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package cache

import (
"sync"

"github.com/honeycombio/refinery/metrics"
cuckoo "github.com/panmari/cuckoofilter"
)

// These are the names of metrics tracked for the cuckoo filter
const (
CurrentLoadFactor = "cuckoo_current_load_factor"
FutureLoadFactor = "cuckoo_future_load_factor"
CurrentCapacity = "cuckoo_current_capacity"
)

// This wraps a cuckoo filter implementation in a way that lets us keep it running forever
// without filling up.
// A cuckoo filter can't be emptied (you can delete individual items if you know what they are,
// but you can't get their names from the filter. Consequently, what we do is keep *two* filters,
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
// current and future. The current one is the one we use to check against, and when we add, we
// add to both. But the future one is started *after* the current one, so that when the current
// gets too full, we can discard it, replace it with future, and then start a new, empty future.
// This is why the future filter is nil until the current filter reaches .5.
// You must call Maintain() periodically, most likely from a goroutine. The call is cheap,
// and the timing isn't very critical. The effect of going above "capacity" is an increased
// false positive rate, but the filter continues to function.
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
type CuckooTraceChecker struct {
current *cuckoo.Filter
future *cuckoo.Filter
mut sync.RWMutex
capacity uint
met metrics.Metrics
}

func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker {
return &CuckooTraceChecker{
capacity: capacity,
current: cuckoo.NewFilter(capacity),
future: nil,
met: m,
}
}

// Add puts a traceID into the filter.
func (c *CuckooTraceChecker) Add(traceID string) {
c.mut.Lock()
defer c.mut.Unlock()
c.current.Insert([]byte(traceID))
// don't add anything to future if it doesn't exist yet
if c.future != nil {
c.future.Insert([]byte(traceID))
}
}

// Check tests if a traceID is (very probably) in the filter.
func (c *CuckooTraceChecker) Check(traceID string) bool {
b := []byte(traceID)
c.mut.RLock()
defer c.mut.RUnlock()
return c.current.Lookup(b)
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
}

// Maintain should be called periodically; if the current filter is full, it replaces
// it with the future filter and creates a new future filter.
func (c *CuckooTraceChecker) Maintain() {
c.mut.RLock()
currentLoadFactor := c.current.LoadFactor()
c.met.Gauge(CurrentLoadFactor, currentLoadFactor)
if c.future != nil {
c.met.Gauge(FutureLoadFactor, c.future.LoadFactor())
}
c.met.Gauge(CurrentCapacity, c.capacity)
c.mut.RUnlock()

// once the current one is half loaded, we can start using the future one too
if c.future == nil && currentLoadFactor > 0.5 {
c.mut.Lock()
c.future = cuckoo.NewFilter(c.capacity)
c.mut.Unlock()
}

// if the current one is full, cycle the filters
if currentLoadFactor > 0.99 {
c.mut.Lock()
defer c.mut.Unlock()
c.current = c.future
c.future = cuckoo.NewFilter(c.capacity)
}
}

// SetNextCapacity adjusts the capacity that will be set for the future filter on the next replacement.
func (c *CuckooTraceChecker) SetNextCapacity(capacity uint) {
c.mut.Lock()
defer c.mut.Unlock()
c.capacity = capacity
}
189 changes: 189 additions & 0 deletions collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package cache

import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/types"
)

// cuckooSentCache extends Refinery's legacy cache. It keeps the same records
// for kept traces but adds a pair of cuckoo filters to record dropped traces.
// This allows many more traces to be kept in the cache; now only kept records
// are retained in the cache of sentRecords.
// The size of the sent cache is still set based on the size of the live trace cache,
// and the size of the dropped cache is an independent value.

// cuckooKeptRecord is an internal record we leave behind when keeping a trace to remember
// our decision for the future. We only store them if the record was kept.
type cuckooKeptRecord struct {
rate uint // sample rate used when sending the trace
spanCount uint // number of spans in the trace (we decorate the root span with this)
}

func (t *cuckooKeptRecord) Kept() bool {
return true
}

func (t *cuckooKeptRecord) Rate() uint {
return t.rate
}

func (t *cuckooKeptRecord) DescendantCount() uint {
return uint(t.spanCount)
}

func (t *cuckooKeptRecord) Count(*types.Span) {
t.spanCount++
}

// Make sure it implements TraceSentRecord
var _ TraceSentRecord = (*cuckooKeptRecord)(nil)

// cuckooSentRecord is what we return when the trace was dropped.
// It's always the same one.
type cuckooDroppedRecord struct{}

func (t *cuckooDroppedRecord) Kept() bool {
return false
}

func (t *cuckooDroppedRecord) Rate() uint {
return 0
}

func (t *cuckooDroppedRecord) DescendantCount() uint {
return 0
}

func (t *cuckooDroppedRecord) Count(*types.Span) {
}

// Make sure it implements TraceSentRecord
var _ TraceSentRecord = (*cuckooDroppedRecord)(nil)

type cuckooSentCache struct {
kept *lru.Cache
dropped *CuckooTraceChecker
cfg config.SampleCacheConfig

// The done channel is used to decide when to terminate the monitor
// goroutine. When resizing the cache, we write to the channel, but
// when terminating the system, call Stop() to close the channel.
// Either one causes the goroutine to shut down, and in resizing
// we then start a new monitor.
done chan struct{}

// This mutex is for managing kept traces
keptMut sync.Mutex
}

// Make sure it implements TraceSentCache
var _ TraceSentCache = (*cuckooSentCache)(nil)

func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (TraceSentCache, error) {
stc, err := lru.New(int(cfg.KeptSize))
if err != nil {
return nil, err
}
dropped := NewCuckooTraceChecker(cfg.DroppedSize, met)

cache := &cuckooSentCache{
kept: stc,
dropped: dropped,
cfg: cfg,
done: make(chan struct{}),
}
go cache.monitor()
return cache, nil
}

// goroutine to monitor the cache and cycle the size check periodically
func (c *cuckooSentCache) monitor() {
ticker := time.NewTicker(c.cfg.SizeCheckInterval)
for {
select {
case <-ticker.C:
c.dropped.Maintain()
case <-c.done:
return
}
}
}

// Stop halts the monitor goroutine
func (c *cuckooSentCache) Stop() {
close(c.done)
}

func (c *cuckooSentCache) Record(trace *types.Trace, keep bool) {
if keep {
// record this decision in the sent record LRU for future spans
sentRecord := cuckooKeptRecord{
rate: trace.SampleRate,
spanCount: trace.DescendantCount(),
}
c.keptMut.Lock()
defer c.keptMut.Unlock()
c.kept.Add(trace.TraceID, &sentRecord)
return
}
// if we're not keeping it, save it in the dropped trace filter
c.dropped.Add(trace.TraceID)
}

func (c *cuckooSentCache) Check(span *types.Span) (TraceSentRecord, bool) {
// was it dropped?
if c.dropped.Check(span.TraceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, false
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
}
// was it kept?
c.keptMut.Lock()
defer c.keptMut.Unlock()
if sentRecord, found := c.kept.Get(span.TraceID); found {
if sr, ok := sentRecord.(*cuckooKeptRecord); ok {
// if we kept it, then this span being checked needs counting too
sr.Count(span)
return sr, true
}
}
// we have no memory of this place
return nil, false
}

func (c *cuckooSentCache) Resize(cfg config.SampleCacheConfig) error {
stc, err := lru.New(int(cfg.KeptSize))
if err != nil {
return err
}

// grab all the items in the current cache; if it's larger than
// what will fit in the new one, discard the oldest ones
// (we don't have to do anything with the ones we discard, this is
// the trace decisions cache).
c.keptMut.Lock()
defer c.keptMut.Unlock()
keys := c.kept.Keys()
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
if len(keys) > int(cfg.KeptSize) {
keys = keys[len(keys)-int(cfg.KeptSize):]
}
// copy all the keys to the new cache in order
for _, k := range keys {
if v, found := c.kept.Get(k); found {
stc.Add(k, v)
}
}
c.kept = stc

// also set up the drop cache size to change eventually
c.dropped.SetNextCapacity(cfg.DroppedSize)

// shut down the old monitor and create a new one
c.done <- struct{}{}
go c.monitor()
return nil
}
11 changes: 11 additions & 0 deletions collect/cache/legacySentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
lru "github.com/hashicorp/golang-lru"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/types"
)

Expand Down Expand Up @@ -71,3 +72,13 @@ func (c *legacySentCache) Check(span *types.Span) (TraceSentRecord, bool) {
}
return nil, false
}

// legacy Stop does nothing
// Stop halts the monitor goroutine
func (c *legacySentCache) Stop() {
}

// legacy Resize does nothing
func (c *legacySentCache) Resize(cfg config.SampleCacheConfig) error {
return nil
}
5 changes: 5 additions & 0 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/types"
)

Expand All @@ -21,4 +22,8 @@ type TraceSentCache interface {
// Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// else nil and false.
Check(span *types.Span) (TraceSentRecord, bool)
// Stop halts the cache in preparation for shutdown
Stop()
// Resize adjusts the size of the cache according to the Config passed in
Resize(cfg config.SampleCacheConfig) error
}
Loading