Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send drop decisions in batch #1402

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 129 additions & 58 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/honeycombio/refinery/collect/cache"
"github.com/honeycombio/refinery/config"
Expand All @@ -35,8 +36,8 @@ import (
const (
keptTraceDecisionTopic = "trace_decision_kept"
droppedTraceDecisionTopic = "trace_decision_dropped"
traceDecisionsBufferSize = 10_000
decisionMessageBufferSize = 10_000
defaultDropDecisionTicker = 1 * time.Second
)

var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.")
Expand Down Expand Up @@ -113,10 +114,9 @@ type InMemCollector struct {

dropDecisionMessages chan string
keptDecisionMessages chan string
keptDecisions chan *TraceDecision
dropDecisions chan []string

hostname string
dropDecisionBatch chan string
hostname string
}

var inMemCollectorMetrics = []metrics.Metadata{
Expand Down Expand Up @@ -155,6 +155,7 @@ var inMemCollectorMetrics = []metrics.Metadata{
{Name: "collector_redistribute_traces_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of redistributing traces to peers"},
{Name: "collector_collect_loop_duration_ms", Type: metrics.Histogram, Unit: metrics.Milliseconds, Description: "duration of the collect loop, the primary event processing goroutine"},
{Name: "collector_outgoing_queue", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of traces waiting to be send to upstream"},
{Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"},
}

func (i *InMemCollector) Start() error {
Expand Down Expand Up @@ -202,18 +203,19 @@ func (i *InMemCollector) Start() error {
}

if !i.Config.GetCollectionConfig().EnableTraceLocality {
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions)
i.keptDecisionMessages = make(chan string, decisionMessageBufferSize)
i.dropDecisionMessages = make(chan string, decisionMessageBufferSize)
i.dropDecisions = make(chan []string, traceDecisionsBufferSize)
i.keptDecisions = make(chan *TraceDecision, traceDecisionsBufferSize)
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions)

i.dropDecisionBatch = make(chan string, 1000)
}

// spin up one collector because this is a single threaded collector
go i.collect()
go i.sendTraces()
go i.processDecisionMessages()
// spin up a drop decision batch sender
go i.sendDropDecisions()

return nil
}
Expand Down Expand Up @@ -390,13 +392,13 @@ func (i *InMemCollector) collect() {
return
case <-i.redistributeTimer.Notify():
i.redistributeTraces(ctx)
case td, ok := <-i.keptDecisions:
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
return
}

i.processKeptDecision(td)
i.processKeptDecision(msg)
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
Expand All @@ -409,20 +411,20 @@ func (i *InMemCollector) collect() {
case <-i.done:
span.End()
return
case ids, ok := <-i.dropDecisions:
case msg, ok := <-i.dropDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
return
}

i.processDropDecisions(ids)
case td, ok := <-i.keptDecisions:
i.processDropDecisions(msg)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
return
}

i.processKeptDecision(td)
i.processKeptDecision(msg)
case <-ticker.C:
select {
case <-i.done:
Expand Down Expand Up @@ -1031,6 +1033,10 @@ func (i *InMemCollector) Stop() error {
close(i.fromPeer)
close(i.outgoingTraces)

if !i.Config.GetCollectionConfig().EnableTraceLocality {
close(i.dropDecisionBatch)
}

return nil
}

Expand Down Expand Up @@ -1394,7 +1400,13 @@ func (r *redistributeNotifier) run() {
}

func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg string) {
if len(msg) == 0 {
return
}

select {
case <-i.done:
return
case <-ctx.Done():
return
case i.keptDecisionMessages <- msg:
Expand All @@ -1403,7 +1415,13 @@ func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg strin
}
}
func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg string) {
if len(msg) == 0 {
return
}

select {
case <-i.done:
return
case <-ctx.Done():
return
case i.dropDecisionMessages <- msg:
Expand All @@ -1412,7 +1430,13 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st
}
}

func (i *InMemCollector) processDropDecisions(ids []string) {
func (i *InMemCollector) processDropDecisions(msg string) {
ids := newDroppedTraceDecision(msg)

if len(ids) == 0 {
return
}

toDelete := generics.NewSet[string]()
for _, id := range ids {

Expand All @@ -1431,7 +1455,12 @@ func (i *InMemCollector) processDropDecisions(ids []string) {
i.cache.RemoveTraces(toDelete)
}

func (i *InMemCollector) processKeptDecision(td *TraceDecision) {
func (i *InMemCollector) processKeptDecision(msg string) {
td, err := newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
return
}
toDelete := generics.NewSet[string]()
trace := i.cache.Get(td.TraceID)
// if we don't have the trace in the cache, we don't need to do anything
Expand All @@ -1449,43 +1478,6 @@ func (i *InMemCollector) processKeptDecision(td *TraceDecision) {

i.cache.RemoveTraces(toDelete)
}
func (i *InMemCollector) processDecisionMessages() {
for {
select {
case <-i.done:
return
case msg := <-i.keptDecisionMessages:
td, err := newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
}

select {
case <-i.done:
return
case i.keptDecisions <- td:
default:
i.Logger.Error().Logf("trace decision channel is full. Dropping decisions")
}

case msg := <-i.dropDecisionMessages:
ids := newDroppedTraceDecision(msg)

if len(ids) == 0 {
continue
}

select {
case <-i.done:
return
case i.dropDecisions <- ids:
default:
i.Logger.Error().Logf("trace decision channel is full. Dropping decisions")
}
}
}
}

func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) {
if !i.IsMyTrace(trace.ID()) {
err := errors.New("cannot make a decision for partial traces")
Expand Down Expand Up @@ -1584,7 +1576,6 @@ func (i *InMemCollector) IsMyTrace(traceID string) bool {
}

func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) {
topic := keptTraceDecisionTopic
var (
decisionMsg string
err error
Expand All @@ -1593,8 +1584,9 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
if td.Kept {
decisionMsg, err = newKeptDecisionMessage(td)
} else {
topic = droppedTraceDecisionTopic
decisionMsg, err = newDroppedDecisionMessage(td.TraceID)
// if we're dropping the trace, we should add it to the batch so we can send it later
i.dropDecisionBatch <- td.TraceID
return
}

if err != nil {
Expand All @@ -1608,7 +1600,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
}).Logf("Failed to create trace decision message")
}

err = i.PubSub.Publish(ctx, topic, decisionMsg)
err = i.PubSub.Publish(ctx, keptTraceDecisionTopic, decisionMsg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
Expand All @@ -1620,3 +1612,82 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
}).Logf("Failed to publish trace decision")
}
}

func (i *InMemCollector) sendDropDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

timerInterval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
if i.Config.GetCollectionConfig().DropDecisionSendInterval == 0 {
timerInterval = defaultDropDecisionTicker
}

// use a timer here so that we don't send a batch immediately after
// reaching the max batch size
timer := i.Clock.NewTimer(timerInterval)
defer timer.Stop()
traceIDs := make([]string, 0, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize)
send := false
eg := &errgroup.Group{}
for {
select {
case <-i.done:
eg.Wait()
return
case id, ok := <-i.dropDecisionBatch:
if !ok {
eg.Wait()
return
}
// if we get a trace ID, add it to the list
traceIDs = append(traceIDs, id)
// if we exceeded the max count, we need to send
if len(traceIDs) >= i.Config.GetCollectionConfig().MaxDropDecisionBatchSize {
send = true
}
case <-timer.Chan():
// timer fired, so send what we have
send = true
}

// if we need to send, do so
if send && len(traceIDs) > 0 {
i.Metrics.Histogram("collector_drop_decision_batch_count", len(traceIDs))

// copy the traceIDs so we can clear the list
idsToProcess := make([]string, len(traceIDs))
copy(idsToProcess, traceIDs)
// clear the list
traceIDs = traceIDs[:0]

// now process the result in a goroutine so we can keep listening
eg.Go(func() error {
select {
case <-i.done:
return nil
default:
msg, err := newDroppedDecisionMessage(idsToProcess...)
if err != nil {
i.Logger.Error().Logf("Failed to marshal dropped trace decision")
}
err = i.PubSub.Publish(context.Background(), droppedTraceDecisionTopic, msg)
if err != nil {
i.Logger.Error().Logf("Failed to publish dropped trace decision")
}
}

return nil
})
if !timer.Stop() {
select {
case <-timer.Chan():
default:
}
}

timer.Reset(timerInterval)
send = false
}
}
}
Loading
Loading