diff --git a/collect/collect.go b/collect/collect.go index 2e8218618e..71fd1c663e 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" "github.com/honeycombio/refinery/collect/cache" "github.com/honeycombio/refinery/config" @@ -35,8 +36,8 @@ import ( const ( keptTraceDecisionTopic = "trace_decision_kept" droppedTraceDecisionTopic = "trace_decision_dropped" - traceDecisionsBufferSize = 10_000 decisionMessageBufferSize = 10_000 + defaultDropDecisionTicker = 1 * time.Second ) var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.") @@ -113,10 +114,9 @@ type InMemCollector struct { dropDecisionMessages chan string keptDecisionMessages chan string - keptDecisions chan *TraceDecision - dropDecisions chan []string - hostname string + dropDecisionBatch chan string + hostname string } var inMemCollectorMetrics = []metrics.Metadata{ @@ -155,6 +155,7 @@ var inMemCollectorMetrics = []metrics.Metadata{ {Name: "collector_redistribute_traces_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of redistributing traces to peers"}, {Name: "collector_collect_loop_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of the collect loop, the primary event processing goroutine"}, {Name: "collector_outgoing_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of traces waiting to be send to upstream"}, + {Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"}, } func (i *InMemCollector) Start() error { @@ -202,18 +203,19 @@ func (i *InMemCollector) Start() error { } if !i.Config.GetCollectionConfig().EnableTraceLocality { - i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions) - i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions) i.keptDecisionMessages = make(chan string, decisionMessageBufferSize) i.dropDecisionMessages = make(chan string, decisionMessageBufferSize) - i.dropDecisions = make(chan []string, traceDecisionsBufferSize) - i.keptDecisions = make(chan *TraceDecision, traceDecisionsBufferSize) + i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions) + i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions) + + i.dropDecisionBatch = make(chan string, 1000) } // spin up one collector because this is a single threaded collector go i.collect() go i.sendTraces() - go i.processDecisionMessages() + // spin up a drop decision batch sender + go i.sendDropDecisions() return nil } @@ -390,13 +392,13 @@ func (i *InMemCollector) collect() { return case <-i.redistributeTimer.Notify(): i.redistributeTraces(ctx) - case td, ok := <-i.keptDecisions: + case msg, ok := <-i.keptDecisionMessages: if !ok { // channel's been closed; we should shut down. return } - i.processKeptDecision(td) + i.processKeptDecision(msg) case sp, ok := <-i.fromPeer: if !ok { // channel's been closed; we should shut down. @@ -409,20 +411,20 @@ func (i *InMemCollector) collect() { case <-i.done: span.End() return - case ids, ok := <-i.dropDecisions: + case msg, ok := <-i.dropDecisionMessages: if !ok { // channel's been closed; we should shut down. return } - i.processDropDecisions(ids) - case td, ok := <-i.keptDecisions: + i.processDropDecisions(msg) + case msg, ok := <-i.keptDecisionMessages: if !ok { // channel's been closed; we should shut down. return } - i.processKeptDecision(td) + i.processKeptDecision(msg) case <-ticker.C: select { case <-i.done: @@ -1031,6 +1033,10 @@ func (i *InMemCollector) Stop() error { close(i.fromPeer) close(i.outgoingTraces) + if !i.Config.GetCollectionConfig().EnableTraceLocality { + close(i.dropDecisionBatch) + } + return nil } @@ -1394,7 +1400,13 @@ func (r *redistributeNotifier) run() { } func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg string) { + if len(msg) == 0 { + return + } + select { + case <-i.done: + return case <-ctx.Done(): return case i.keptDecisionMessages <- msg: @@ -1403,7 +1415,13 @@ func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg strin } } func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg string) { + if len(msg) == 0 { + return + } + select { + case <-i.done: + return case <-ctx.Done(): return case i.dropDecisionMessages <- msg: @@ -1412,7 +1430,13 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st } } -func (i *InMemCollector) processDropDecisions(ids []string) { +func (i *InMemCollector) processDropDecisions(msg string) { + ids := newDroppedTraceDecision(msg) + + if len(ids) == 0 { + return + } + toDelete := generics.NewSet[string]() for _, id := range ids { @@ -1431,7 +1455,12 @@ func (i *InMemCollector) processDropDecisions(ids []string) { i.cache.RemoveTraces(toDelete) } -func (i *InMemCollector) processKeptDecision(td *TraceDecision) { +func (i *InMemCollector) processKeptDecision(msg string) { + td, err := newKeptTraceDecision(msg) + if err != nil { + i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err) + return + } toDelete := generics.NewSet[string]() trace := i.cache.Get(td.TraceID) // if we don't have the trace in the cache, we don't need to do anything @@ -1449,43 +1478,6 @@ func (i *InMemCollector) processKeptDecision(td *TraceDecision) { i.cache.RemoveTraces(toDelete) } -func (i *InMemCollector) processDecisionMessages() { - for { - select { - case <-i.done: - return - case msg := <-i.keptDecisionMessages: - td, err := newKeptTraceDecision(msg) - if err != nil { - i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err) - } - - select { - case <-i.done: - return - case i.keptDecisions <- td: - default: - i.Logger.Error().Logf("trace decision channel is full. Dropping decisions") - } - - case msg := <-i.dropDecisionMessages: - ids := newDroppedTraceDecision(msg) - - if len(ids) == 0 { - continue - } - - select { - case <-i.done: - return - case i.dropDecisions <- ids: - default: - i.Logger.Error().Logf("trace decision channel is full. Dropping decisions") - } - } - } -} - func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) { if !i.IsMyTrace(trace.ID()) { err := errors.New("cannot make a decision for partial traces") @@ -1584,7 +1576,6 @@ func (i *InMemCollector) IsMyTrace(traceID string) bool { } func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) { - topic := keptTraceDecisionTopic var ( decisionMsg string err error @@ -1593,8 +1584,9 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis if td.Kept { decisionMsg, err = newKeptDecisionMessage(td) } else { - topic = droppedTraceDecisionTopic - decisionMsg, err = newDroppedDecisionMessage(td.TraceID) + // if we're dropping the trace, we should add it to the batch so we can send it later + i.dropDecisionBatch <- td.TraceID + return } if err != nil { @@ -1608,7 +1600,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis }).Logf("Failed to create trace decision message") } - err = i.PubSub.Publish(ctx, topic, decisionMsg) + err = i.PubSub.Publish(ctx, keptTraceDecisionTopic, decisionMsg) if err != nil { i.Logger.Error().WithFields(map[string]interface{}{ "trace_id": td.TraceID, @@ -1620,3 +1612,82 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis }).Logf("Failed to publish trace decision") } } + +func (i *InMemCollector) sendDropDecisions() { + if i.Config.GetCollectionConfig().EnableTraceLocality { + return + } + + timerInterval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval) + if i.Config.GetCollectionConfig().DropDecisionSendInterval == 0 { + timerInterval = defaultDropDecisionTicker + } + + // use a timer here so that we don't send a batch immediately after + // reaching the max batch size + timer := i.Clock.NewTimer(timerInterval) + defer timer.Stop() + traceIDs := make([]string, 0, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize) + send := false + eg := &errgroup.Group{} + for { + select { + case <-i.done: + eg.Wait() + return + case id, ok := <-i.dropDecisionBatch: + if !ok { + eg.Wait() + return + } + // if we get a trace ID, add it to the list + traceIDs = append(traceIDs, id) + // if we exceeded the max count, we need to send + if len(traceIDs) >= i.Config.GetCollectionConfig().MaxDropDecisionBatchSize { + send = true + } + case <-timer.Chan(): + // timer fired, so send what we have + send = true + } + + // if we need to send, do so + if send && len(traceIDs) > 0 { + i.Metrics.Histogram("collector_drop_decision_batch_count", len(traceIDs)) + + // copy the traceIDs so we can clear the list + idsToProcess := make([]string, len(traceIDs)) + copy(idsToProcess, traceIDs) + // clear the list + traceIDs = traceIDs[:0] + + // now process the result in a goroutine so we can keep listening + eg.Go(func() error { + select { + case <-i.done: + return nil + default: + msg, err := newDroppedDecisionMessage(idsToProcess...) + if err != nil { + i.Logger.Error().Logf("Failed to marshal dropped trace decision") + } + err = i.PubSub.Publish(context.Background(), droppedTraceDecisionTopic, msg) + if err != nil { + i.Logger.Error().Logf("Failed to publish dropped trace decision") + } + } + + return nil + }) + if !timer.Stop() { + select { + case <-timer.Chan(): + default: + } + } + + timer.Reset(timerInterval) + send = false + } + } +} diff --git a/collect/collect_test.go b/collect/collect_test.go index 6986e6edcb..acaae78428 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -76,6 +76,8 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe }, done: make(chan struct{}), keptDecisionMessages: make(chan string, 50), + dropDecisionMessages: make(chan string, 50), + dropDecisionBatch: make(chan string, 5), Peers: &peer.MockPeers{ Peers: []string{"api1", "api2"}, }, @@ -262,9 +264,11 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { coll.incoming = make(chan *types.Span, 5) coll.fromPeer = make(chan *types.Span, 5) + coll.dropDecisionBatch = make(chan string, 5) coll.outgoingTraces = make(chan sendableTrace, 5) coll.datasetSamplers = make(map[string]sample.Sampler) go coll.collect() + go coll.sendDropDecisions() go coll.sendTraces() defer coll.Stop() @@ -2107,3 +2111,72 @@ func TestCreateDecisionSpan(t *testing.T) { expected.Data["meta.refinery.root"] = true assert.EqualValues(t, expected, ds) } + +func TestSendDropDecisions(t *testing.T) { + conf := &config.MockConfig{ + GetTracesConfigVal: config.TracesConfig{ + SendTicker: config.Duration(2 * time.Millisecond), + SendDelay: config.Duration(1 * time.Millisecond), + TraceTimeout: config.Duration(60 * time.Second), + MaxBatchSize: 500, + }, + GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1}, + ParentIdFieldNames: []string{"trace.parent_id", "parentId"}, + GetCollectionConfigVal: config.CollectionConfig{ + ShutdownDelay: config.Duration(1 * time.Millisecond), + }, + } + transmission := &transmit.MockTransmission{} + transmission.Start() + defer transmission.Stop() + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + defer peerTransmission.Stop() + coll := newTestCollector(conf, transmission, peerTransmission) + coll.dropDecisionBatch = make(chan string, 5) + + messages := make(chan string, 5) + coll.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, func(ctx context.Context, msg string) { + messages <- msg + }) + + // drop decisions should be sent once the timer expires + collectionCfg := conf.GetCollectionConfig() + collectionCfg.DropDecisionSendInterval = config.Duration(2 * time.Millisecond) + conf.GetCollectionConfigVal = collectionCfg + + closed := make(chan struct{}) + go func() { + coll.sendDropDecisions() + close(closed) + }() + + coll.dropDecisionBatch <- "trace1" + close(coll.dropDecisionBatch) + droppedMessage := <-messages + assert.Equal(t, "trace1", droppedMessage) + + <-closed + + // drop decision should be sent once it reaches the batch size + collectionCfg = conf.GetCollectionConfig() + collectionCfg.DropDecisionSendInterval = config.Duration(60 * time.Second) + collectionCfg.MaxDropDecisionBatchSize = 5 + conf.GetCollectionConfigVal = collectionCfg + coll.dropDecisionBatch = make(chan string, 5) + + closed = make(chan struct{}) + go func() { + coll.sendDropDecisions() + close(closed) + }() + + for i := 0; i < 5; i++ { + coll.dropDecisionBatch <- fmt.Sprintf("trace%d", i) + } + close(coll.dropDecisionBatch) + droppedMessage = <-messages + assert.Equal(t, "trace0,trace1,trace2,trace3,trace4", droppedMessage) + + <-closed +} diff --git a/config/file_config.go b/config/file_config.go index b5cd00ae31..7fd3245d0b 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -314,6 +314,9 @@ type CollectionConfig struct { DisableRedistribution bool `yaml:"DisableRedistribution"` ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"` EnableTraceLocality bool `yaml:"EnableTraceLocality"` + + MaxDropDecisionBatchSize int `yaml:"MaxDropDecisionBatchSize" default:"1000"` + DropDecisionSendInterval Duration `yaml:"DropDecisionSendInterval" default:"1s"` } // GetMaxAlloc returns the maximum amount of memory to use for the cache. diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index cf659c1799..7d00099c3a 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -1342,6 +1342,26 @@ groups: description: > The `HealthCheckTimeout` setting specifies the maximum duration allowed for the health checks of the collection subsystems to complete. If a subsystem does not respond within this timeout period, it will be marked as unhealthy. This timeout value should be set carefully to ensure that transient delays do not lead to unnecessary failure detection while still allowing for timely identification of actual health issues. + - name: MaxDropDecisionBatchSize + type: int + valuetype: nondefault + firstversion: v2.9 + default: 1000 + reload: false + summary: Maximum size for batching drop decisions. + description: > + Specifies the maximum number of drop decisions that can be batched before triggering processing. This ensures that drop decisions do not accumulate indefinitely, allowing timely and efficient management of spans by processing them in bulk. If the number of drop decisions reaches this limit, the batch will be processed immediately, even if the timeout has not been reached. + + - name: DropDecisionSendInterval + type: duration + valuetype: nondefault + firstversion: v2.9 + default: 1s + reload: false + summary: Interval for sending drop decisions in batches. + description: > + Sets the time interval for sending accumulated drop decisions in batches. This ensures that drop decisions are processed at regular intervals, improving efficiency by reducing the frequency of network transmissions. If the maximum batch size (MaxDropDecisionBatchSize) is reached before this interval elapses, the batch will be sent immediately. + - name: BufferSizes title: "Buffer Sizes" description: >