Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(experiment): make drop decision queue size configurable #1472

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions collect/cache/cuckoo.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type CuckooTraceChecker struct {

const (
// This is how many items can be in the Add Queue before we start blocking on Add.
AddQueueDepth = 1000
defaultAddQueueDepth = 1000
// This is how long we'll sleep between possible lock cycles.
AddQueueSleepTime = 100 * time.Microsecond
)
Expand All @@ -52,13 +52,16 @@ var cuckooTraceCheckerMetrics = []metrics.Metadata{
{Name: AddQueueLockTime, Type: metrics.Histogram, Unit: metrics.Microseconds, Description: "the time spent holding the add queue lock"},
}

func NewCuckooTraceChecker(capacity uint, m metrics.Metrics) *CuckooTraceChecker {
func NewCuckooTraceChecker(capacity uint, addQueueDepth uint, m metrics.Metrics) *CuckooTraceChecker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addQueueDepth param isn't used, should it be used on line 62?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch 😮‍💨

if addQueueDepth == 0 {
addQueueDepth = defaultAddQueueDepth
}
c := &CuckooTraceChecker{
capacity: capacity,
current: cuckoo.NewFilter(capacity),
future: nil,
met: m,
addch: make(chan string, AddQueueDepth),
addch: make(chan string, addQueueDepth),
}
for _, metric := range cuckooTraceCheckerMetrics {
m.Register(metric)
Expand Down
2 changes: 1 addition & 1 deletion collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func NewCuckooSentCache(cfg config.SampleCacheConfig, met metrics.Metrics) (Trac
if err != nil {
return nil, err
}
dropped := NewCuckooTraceChecker(cfg.DroppedSize, met)
dropped := NewCuckooTraceChecker(cfg.DroppedSize, cfg.DroppedQueueSize, met)
// we want to keep track of the most recent dropped traces so we can avoid
// checking them in the dropped filter, which can have contention issues
// under high load. So we use a cache with TTL to keep track of the most
Expand Down
10 changes: 5 additions & 5 deletions collect/cache/cuckoo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func BenchmarkCuckooTraceChecker_Add(b *testing.B) {
traceIDs[i] = genID(32)
}

c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{})
c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.Add(traceIDs[i])
Expand All @@ -57,7 +57,7 @@ func BenchmarkCuckooTraceChecker_AddParallel(b *testing.B) {
}
})

c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{})
c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{})
ch := make(chan int, numGoroutines)
for i := 0; i < numGoroutines; i++ {
p.Go(func() {
Expand Down Expand Up @@ -89,7 +89,7 @@ func BenchmarkCuckooTraceChecker_Check(b *testing.B) {
traceIDs[i] = genID(32)
}

c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{})
c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{})
// add every other one to the filter
for i := 0; i < b.N; i += 2 {
if i%10000 == 0 {
Expand All @@ -111,7 +111,7 @@ func BenchmarkCuckooTraceChecker_CheckParallel(b *testing.B) {
traceIDs[i] = genID(32)
}

c := NewCuckooTraceChecker(1000000, &metrics.NullMetrics{})
c := NewCuckooTraceChecker(1000000, 10000, &metrics.NullMetrics{})
for i := 0; i < b.N; i += 2 {
if i%10000 == 0 {
c.Maintain()
Expand Down Expand Up @@ -165,7 +165,7 @@ func BenchmarkCuckooTraceChecker_CheckAddParallel(b *testing.B) {

met := &metrics.MockMetrics{}
met.Start()
c := NewCuckooTraceChecker(1000000, met)
c := NewCuckooTraceChecker(1000000, 10000, met)
const numCheckers = 30
const numAdders = 30

Expand Down
1 change: 1 addition & 0 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ type GRPCServerParameters struct {
type SampleCacheConfig struct {
KeptSize uint `yaml:"KeptSize" default:"10_000"`
DroppedSize uint `yaml:"DroppedSize" default:"1_000_000"`
DroppedQueueSize uint `yaml:"DroppedQueueSize" default: "10000"`
SizeCheckInterval Duration `yaml:"SizeCheckInterval" default:"10s"`
}

Expand Down
10 changes: 10 additions & 0 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,16 @@ groups:
Changing its size with live reload sets a future limit, but does not
have an immediate effect.

- name: DroppedQueueSize
type: int
valuetype: nondefault
default: 10000
reload: true
summary: is the maximum number of in-flight drop decision allowed before adding to the dropped trace cache.
description: >
The dropped decision queue is used to buffer drop decisions before they are stored in the decision cache.
If this queue fills up, then subsequent drop decisions will be discarded.

- name: SizeCheckInterval
v1group: SampleCacheConfig/SampleCache
v1name: SizeCheckInterval
Expand Down
4 changes: 3 additions & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func (r *Router) SetVersion(ver string) {
var routerMetrics = []metrics.Metadata{
{Name: "_router_proxied", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of events proxied to another refinery"},
{Name: "_router_event", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of events received"},
{Name: "_router_batch", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of events received"},
{Name: "_router_otlp", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of batches of otlp requests received"},
{Name: "_router_span", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of spans received"},
{Name: "_router_dropped", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of events dropped because the channel was full"},
{Name: "_router_nonspan", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "the number of non-span events received"},
Expand Down Expand Up @@ -537,6 +539,7 @@ func (router *Router) processOTLPRequest(
batches []huskyotlp.Batch,
apiKey string,
incomingUserAgent string) error {
router.Metrics.Increment(router.incomingOrPeer + "_router_otlp")

router.Metrics.Increment(router.incomingOrPeer + "_router_otlp")

Expand Down Expand Up @@ -612,7 +615,6 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
if processed {
if !kept {
return nil

}

// If the span was kept, we want to generate a probe that we'll forward
Expand Down