Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch kept decisions #1419

Merged
merged 6 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 108 additions & 109 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
)

const (
keptTraceDecisionTopic = "trace_decision_kept"
droppedTraceDecisionTopic = "trace_decision_dropped"
decisionMessageBufferSize = 10_000
defaultDropDecisionTicker = 1 * time.Second
keptTraceDecisionTopic = "trace_decision_kept"
dropTraceDecisionTopic = "trace_decision_dropped"
decisionMessageBufferSize = 10_000
defaultDropDecisionTickerInterval = 1 * time.Second
defaultKeptDecisionTickerInterval = 100 * time.Millisecond
)

var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.")
Expand Down Expand Up @@ -113,8 +114,8 @@ type InMemCollector struct {
dropDecisionMessages chan string
keptDecisionMessages chan string

dropDecisionBatch chan string
keptDecisionBuffer chan string
dropDecisionBuffer chan TraceDecision
keptDecisionBuffer chan TraceDecision
hostname string
}

Expand Down Expand Up @@ -211,10 +212,10 @@ func (i *InMemCollector) Start() error {
i.keptDecisionMessages = make(chan string, decisionMessageBufferSize)
i.dropDecisionMessages = make(chan string, decisionMessageBufferSize)
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions)
i.PubSub.Subscribe(context.Background(), dropTraceDecisionTopic, i.signalDroppedTraceDecisions)

i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5)
i.keptDecisionBuffer = make(chan string, 100_000)
i.dropDecisionBuffer = make(chan TraceDecision, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5)
i.keptDecisionBuffer = make(chan TraceDecision, i.Config.GetCollectionConfig().MaxKeptDecisionBatchSize*5)
}

// spin up one collector because this is a single threaded collector
Expand Down Expand Up @@ -423,14 +424,14 @@ func (i *InMemCollector) collect() {
span.End()
return
}
i.processDropDecisions(msg)
i.processTraceDecisions(msg, dropDecision)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processKeptDecision(msg)
i.processTraceDecisions(msg, keptDecision)
case <-ticker.C:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)
Expand Down Expand Up @@ -1063,7 +1064,7 @@ func (i *InMemCollector) Stop() error {
close(i.outgoingTraces)

if !i.Config.GetCollectionConfig().EnableTraceLocality {
close(i.dropDecisionBatch)
close(i.dropDecisionBuffer)
close(i.keptDecisionBuffer)
}

Expand Down Expand Up @@ -1354,57 +1355,60 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st
i.Logger.Warn().Logf("dropped trace decision channel is full. Dropping message")
}
}
func (i *InMemCollector) processTraceDecisions(msg string, decisionType decisionType) {

func (i *InMemCollector) processDropDecisions(msg string) {
i.Metrics.Increment("drop_decisions_received")
i.Metrics.Increment(fmt.Sprintf("%s_decisions_received", decisionType.String()))
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
if len(msg) == 0 {
return
}

ids := newDroppedTraceDecision(msg)
// Deserialize the message into trace decisions
decisions := make([]TraceDecision, 0)
var err error
switch decisionType {
case keptDecision:
decisions, err = newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal kept trace decision message. %s", err)
return
}
case dropDecision:
decisions, err = newDroppedTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal drop trace decision message. %s", err)
return
}
default:
i.Logger.Error().Logf("unknown decision type %s while processing trace decisions", decisionType)
return
}

if len(ids) == 0 {
if len(decisions) == 0 {
return
}

toDelete := generics.NewSet[string]()
for _, id := range ids {

trace := i.cache.Get(id)
// if we don't have the trace in the cache, we don't need to do anything
for _, decision := range decisions {
// Assume TraceDecision implements a common interface like TraceID
trace := i.cache.Get(decision.TraceID)
if trace == nil {
i.Logger.Debug().Logf("trace not found in cache for trace decision")
i.Logger.Debug().Logf("trace not found in cache for %s decision", decisionType.String())
continue
}
toDelete.Add(id)

i.sampleTraceCache.Record(trace, false, "")

}

i.cache.RemoveTraces(toDelete)
}
toDelete.Add(decision.TraceID)

func (i *InMemCollector) processKeptDecision(msg string) {
i.Metrics.Increment("kept_decisions_received")
if decisionType == keptDecision {
trace.SetSampleRate(decision.SampleRate)
trace.KeepSample = decision.Kept
}

td, err := newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
return
}
i.sampleTraceCache.Record(trace, decision.Kept, decision.KeptReason)

toDelete := generics.NewSet[string]()
trace := i.cache.Get(td.TraceID)
// if we don't have the trace in the cache, we don't need to do anything
if trace == nil {
i.Logger.Debug().Logf("trace not found in cache for trace decision")
return
// Send the processed trace (only in Kept case)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can consolidate these two if blocks together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that we should still call send for dropped traces since the trace_send_dropped metric is in there and it does check against trace.Kept inside send anyway

if decisionType == keptDecision {
i.send(context.Background(), trace, &decision)
}
}
toDelete.Add(td.TraceID)
trace.SetSampleRate(td.SampleRate)
trace.KeepSample = td.Kept

i.sampleTraceCache.Record(trace, td.Kept, td.KeptReason)

i.send(context.Background(), trace, td)

i.cache.RemoveTraces(toDelete)
}
Expand Down Expand Up @@ -1506,35 +1510,17 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
_, span := otelutil.StartSpanWith(ctx, i.Tracer, "publishTraceDecision", "decision", td.Kept)
defer span.End()

var (
decisionMsg string
err error
)

if td.Kept {
decisionMsg, err = newKeptDecisionMessage(td)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to create trace decision message")
return
}

select {
case i.keptDecisionBuffer <- decisionMsg:
case i.keptDecisionBuffer <- td:
default:
i.Metrics.Increment("collector_kept_decisions_queue_full")
i.Logger.Warn().Logf("kept trace decision buffer is full. Dropping message")
}
return
} else {
select {
case i.dropDecisionBatch <- td.TraceID:
case i.dropDecisionBuffer <- td:
default:
i.Metrics.Increment("collector_drop_decisions_queue_full")
i.Logger.Warn().Logf("drop trace decision buffer is full. Dropping message")
Expand All @@ -1547,93 +1533,106 @@ func (i *InMemCollector) sendKeptDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

ctx := context.Background()
for msg := range i.keptDecisionBuffer {
err := i.PubSub.Publish(ctx, keptTraceDecisionTopic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to publish trace decision")
}

interval := time.Duration(i.Config.GetCollectionConfig().KeptDecisionSendInterval)
if interval == 0 {
interval = defaultKeptDecisionTickerInterval
}
i.sendDecisions(i.keptDecisionBuffer, interval, i.Config.GetCollectionConfig().MaxKeptDecisionBatchSize, keptDecision)
}

func (i *InMemCollector) sendDropDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

timerInterval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
if i.Config.GetCollectionConfig().DropDecisionSendInterval == 0 {
timerInterval = defaultDropDecisionTicker
interval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
if interval == 0 {
interval = defaultDropDecisionTickerInterval
}
i.sendDecisions(i.dropDecisionBuffer, interval, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize, dropDecision)
}

// use a timer here so that we don't send a batch immediately after
// reaching the max batch size
timer := i.Clock.NewTimer(timerInterval)
// Unified sendDecisions function for batching and processing TraceDecisions
func (i *InMemCollector) sendDecisions(decisionChan <-chan TraceDecision, interval time.Duration, maxBatchSize int, decisionType decisionType) {
timer := i.Clock.NewTimer(interval)
defer timer.Stop()
traceIDs := make([]string, 0, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize)
decisions := make([]TraceDecision, 0, maxBatchSize)
send := false
eg := &errgroup.Group{}
ctx := context.Background()
var createDecisionMessage newDecisionMessage
var metricName, topic string
switch decisionType {
case keptDecision:
metricName = "collector_kept_decisions_batch_size"
topic = keptTraceDecisionTopic
createDecisionMessage = newKeptDecisionMessage
case dropDecision:
metricName = "collector_drop_decisions_batch_size"
topic = dropTraceDecisionTopic
createDecisionMessage = newDroppedDecisionMessage
default:
i.Logger.Error().Logf("Invalid decision type")
return // invalid decision type
}

for {
select {
case <-i.done:
eg.Wait()
return
case id, ok := <-i.dropDecisionBatch:
case td, ok := <-decisionChan:
if !ok {
eg.Wait()
return
}
// if we get a trace ID, add it to the list
traceIDs = append(traceIDs, id)
// if we exceeded the max count, we need to send
if len(traceIDs) >= i.Config.GetCollectionConfig().MaxDropDecisionBatchSize {
// Add TraceDecision to the batch
decisions = append(decisions, td)
if len(decisions) >= maxBatchSize {
send = true
}
case <-timer.Chan():
// timer fired, so send what we have
send = true
}

// if we need to send, do so
if send && len(traceIDs) > 0 {
i.Metrics.Histogram("collector_drop_decision_batch_count", len(traceIDs))
// Send the batch if ready
if send && len(decisions) > 0 {
i.Metrics.Histogram(metricName, len(decisions))

// copy the traceIDs so we can clear the list
idsToProcess := make([]string, len(traceIDs))
copy(idsToProcess, traceIDs)
// clear the list
traceIDs = traceIDs[:0]
// Copy current batch to process
decisionsToProcess := make([]TraceDecision, len(decisions))
copy(decisionsToProcess, decisions)
decisions = decisions[:0] // Reset the batch

// now process the result in a goroutine so we can keep listening
eg.Go(func() error {
select {
case <-i.done:
return nil
default:
msg, err := newDroppedDecisionMessage(idsToProcess...)
msg, err := createDecisionMessage(decisionsToProcess)
if err != nil {
i.Logger.Error().Logf("Failed to marshal dropped trace decision")
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to create trace decision message")
return nil
}
err = i.PubSub.Publish(context.Background(), droppedTraceDecisionTopic, msg)
err = i.PubSub.Publish(ctx, topic, msg)
if err != nil {
i.Logger.Error().Logf("Failed to publish dropped trace decision")
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to publish trace decision")
}
}

return nil
})

// Reset timer after send
if !timer.Stop() {
select {
case <-timer.Chan():
default:
}
}

timer.Reset(timerInterval)
timer.Reset(interval)
send = false
}
}
Expand Down
Loading