Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: redistribute remaining traces during shutdown #1261

Merged
merged 23 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,14 @@ func TestAppIntegration(t *testing.T) {

time.Sleep(5 * app.Config.GetSendTickerValue())

events := sender.Events()
require.Len(t, events, 1)

assert.Equal(t, "dataset", events[0].Dataset)
assert.Equal(t, "bar", events[0].Data["foo"])
assert.Equal(t, "1", events[0].Data["trace.trace_id"])
assert.Equal(t, uint(1), events[0].Data["meta.refinery.original_sample_rate"])
require.EventuallyWithT(t, func(collect *assert.CollectT) {
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
events := sender.Events()
require.Len(collect, events, 1)
assert.Equal(collect, "dataset", events[0].Dataset)
assert.Equal(collect, "bar", events[0].Data["foo"])
assert.Equal(collect, "1", events[0].Data["trace.trace_id"])
assert.Equal(collect, uint(1), events[0].Data["meta.refinery.original_sample_rate"])
}, 2*time.Second, 10*time.Millisecond)

err = startstop.Stop(graph.Objects(), nil)
assert.NoError(t, err)
Expand Down
19 changes: 19 additions & 0 deletions collect/cache/cuckooSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,22 @@ func (c *cuckooSentCache) Resize(cfg config.SampleCacheConfig) error {
go c.monitor()
return nil
}

// Test checks if a trace was kept or dropped, and returns the reason if it was kept.
// The bool return value is true if the trace was found in the cache.
func (c *cuckooSentCache) Test(traceID string) (TraceSentRecord, string, bool) {
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
// was it dropped?
if c.dropped.Check(traceID) {
// we recognize it as dropped, so just say so; there's nothing else to do
return &cuckooDroppedRecord{}, "", true
}
// was it kept?
c.keptMut.Lock()
defer c.keptMut.Unlock()
if sentRecord, found := c.kept.Get(traceID); found {
reason, _ := c.sentReasons.Get(uint(sentRecord.reason))
return sentRecord, reason, true
}
// we have no memory of this place
return nil, "", false
}
3 changes: 3 additions & 0 deletions collect/cache/traceSentCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type TraceSentRecord interface {
type TraceSentCache interface {
// Record preserves the record of a trace being sent or not.
Record(trace KeptTrace, keep bool, reason string)
// Test if a trace is in the cache; if found, it returns the appropriate TraceSentRecord and true, else nil and false.
// It does not modify the count information.
Test(traceID string) (TraceSentRecord, string, bool)
// Check tests if a trace corresponding to the span is in the cache; if found, it returns the appropriate TraceSentRecord and true,
// else nil and false.
Check(span *types.Span) (TraceSentRecord, string, bool)
Expand Down
203 changes: 181 additions & 22 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ import (
"github.com/honeycombio/refinery/collect/cache"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/generics"
"github.com/honeycombio/refinery/internal/health"
"github.com/honeycombio/refinery/internal/otelutil"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
)

var ErrWouldBlock = errors.New("not adding span, channel buffer is full")
var CollectorHealthKey = "collector"

type Collector interface {
// AddSpan adds a span to be collected, buffered, and merged into a trace.
Expand Down Expand Up @@ -53,10 +56,13 @@ const (

// InMemCollector is a single threaded collector.
type InMemCollector struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Clock clockwork.Clock `inject:""`
Tracer trace.Tracer `inject:"tracer"`
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Clock clockwork.Clock `inject:""`
Tracer trace.Tracer `inject:"tracer"`
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
Health health.Recorder `inject:""`
Sharder sharder.Sharder `inject:""`

Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
Expand All @@ -77,6 +83,7 @@ type InMemCollector struct {
incoming chan *types.Span
fromPeer chan *types.Span
reload chan struct{}
done chan struct{}

hostname string
}
Expand All @@ -91,6 +98,8 @@ func (i *InMemCollector) Start() error {
// listen for config reloads
i.Config.RegisterReloadCallback(i.sendReloadSignal)

i.Health.Register(CollectorHealthKey, 3*time.Second)

i.Metrics.Register("trace_duration_ms", "histogram")
i.Metrics.Register("trace_span_count", "histogram")
i.Metrics.Register("collector_incoming_queue", "histogram")
Expand All @@ -108,6 +117,13 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register("trace_send_dropped", "counter")
i.Metrics.Register("trace_send_has_root", "counter")
i.Metrics.Register("trace_send_no_root", "counter")

i.Metrics.Register("trace_send_shutdown_total", "counter")
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
i.Metrics.Register("trace_send_shutdown_late", "counter")
i.Metrics.Register("trace_send_shutdown_expired", "counter")
i.Metrics.Register("trace_send_shutdown_root", "counter")
i.Metrics.Register("trace_send_shutdown_forwarded", "counter")

i.Metrics.Register(TraceSendGotRoot, "counter")
i.Metrics.Register(TraceSendExpired, "counter")
i.Metrics.Register(TraceSendEjectedFull, "counter")
Expand All @@ -129,6 +145,7 @@ func (i *InMemCollector) Start() error {
i.Metrics.Store("INCOMING_CAP", float64(cap(i.incoming)))
i.Metrics.Store("PEER_CAP", float64(cap(i.fromPeer)))
i.reload = make(chan struct{}, 1)
i.done = make(chan struct{})
i.datasetSamplers = make(map[string]sample.Sampler)

if i.Config.GetAddHostMetadataToTrace() {
Expand Down Expand Up @@ -313,6 +330,7 @@ func (i *InMemCollector) collect() {
defer i.mutex.Unlock()

for {
i.Health.Ready(CollectorHealthKey, true)
// record channel lengths as histogram but also as gauges
i.Metrics.Histogram("collector_incoming_queue", float64(len(i.incoming)))
i.Metrics.Histogram("collector_peer_queue", float64(len(i.fromPeer)))
Expand All @@ -324,6 +342,8 @@ func (i *InMemCollector) collect() {
// deadlocks because peers are waiting to get their events handed off to each
// other.
select {
case <-i.done:
return
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
Expand All @@ -332,8 +352,10 @@ func (i *InMemCollector) collect() {
i.processSpan(sp)
default:
select {
case <-i.done:
return
case <-ticker.C:
i.sendTracesInCache(time.Now())
i.sendExpiredTracesInCache(i.Clock.Now())
i.checkAlloc()

// Briefly unlock the cache, to allow test access.
Expand Down Expand Up @@ -361,7 +383,7 @@ func (i *InMemCollector) collect() {
}
}

func (i *InMemCollector) sendTracesInCache(now time.Time) {
func (i *InMemCollector) sendExpiredTracesInCache(now time.Time) {
traces := i.cache.TakeExpiredTraces(now)
for _, t := range traces {
if t.RootSpan != nil {
Expand All @@ -375,9 +397,8 @@ func (i *InMemCollector) sendTracesInCache(now time.Time) {
// processSpan does all the stuff necessary to take an incoming span and add it
// to (or create a new placeholder for) a trace.
func (i *InMemCollector) processSpan(sp *types.Span) {
ctx, span := otelutil.StartSpan(context.Background(), i.Tracer, "processSpan")
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()
defer func() {
span.End()
i.Metrics.Increment("span_processed")
i.Metrics.Down("spans_waiting")
}()
Expand Down Expand Up @@ -495,9 +516,9 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool,

i.Metrics.Increment("kept_from_stress")
// ok, we're sending it, so decorate it first
sp.Event.Data["meta.stressed"] = true
sp.Data["meta.stressed"] = true
if i.Config.GetAddRuleReasonToTrace() {
sp.Event.Data["meta.refinery.reason"] = reason
sp.Data["meta.refinery.reason"] = reason
}
if i.hostname != "" {
sp.Data["meta.refinery.local_hostname"] = i.hostname
Expand All @@ -514,7 +535,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool,
// on the trace has already been made, and it obeys that decision by either
// sending the span immediately or dropping it.
func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSentRecord, sentReason string, sp *types.Span) {
ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{
_, span := otelutil.StartSpanMulti(ctx, i.Tracer, "dealWithSentTrace", map[string]interface{}{
"trace_id": sp.TraceID,
"sent_reason": sentReason,
"hostname": i.hostname,
Expand Down Expand Up @@ -606,7 +627,7 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo

func (i *InMemCollector) isRootSpan(sp *types.Span) bool {
// log event should never be considered a root span, check for that first
if signalType, _ := sp.Data["meta.signal_type"]; signalType == "log" {
if signalType := sp.Data["meta.signal_type"]; signalType == "log" {
return false
}
// check if the event has a parent id using the configured parent id field names
Expand Down Expand Up @@ -738,21 +759,19 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
}

func (i *InMemCollector) Stop() error {
// close the incoming channel and (TODO) wait for all collectors to finish
close(i.done)
// signal the health system to not be ready
// so that no new traces are accepted
i.Health.Ready(CollectorHealthKey, false)

close(i.incoming)
close(i.fromPeer)

i.mutex.Lock()
defer i.mutex.Unlock()

// purge the collector of any in-flight traces
if i.cache != nil {
traces := i.cache.GetAll()
for _, trace := range traces {
if trace != nil {
i.send(trace, TraceSendEjectedFull)
}
}
}
i.sendTracesInCache()

if i.Transmission != nil {
i.Transmission.Flush()
}
Expand All @@ -762,6 +781,146 @@ func (i *InMemCollector) Stop() error {
return nil
}

// sentTrace is a struct that holds a trace and the record of the decision made.
type sentTrace struct {
trace *types.Trace
record cache.TraceSentRecord
reason string
}

// sendTracesInCache sends all traces in the cache to their final destination.
// This is done on shutdown to ensure that all traces are sent before the collector
// is stopped.
func (i *InMemCollector) sendTracesInCache() {
wg := &sync.WaitGroup{}
if i.cache != nil {

// TODO: make this a config option
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

traces := i.cache.GetAll()
sentTraceChan := make(chan sentTrace, len(traces)/3)
forwardTraceChan := make(chan *types.Trace, len(traces)/3)
expiredTraceChan := make(chan *types.Trace, len(traces)/3)

wg.Add(1)
go func() {
defer wg.Done()
for {
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
if done := i.sendTracesOnShutdown(ctx, sentTraceChan, forwardTraceChan, expiredTraceChan); done {
return
}
}
}()

i.distributeTracesInCache(traces, sentTraceChan, forwardTraceChan, expiredTraceChan)

close(sentTraceChan)
close(expiredTraceChan)
close(forwardTraceChan)
}

wg.Wait()
}

// distributeTracesInCache takes a list of traces and sends them to the appropriate channel// based on the state of the trace.
func (i *InMemCollector) distributeTracesInCache(traces []*types.Trace, sentTraceChan chan sentTrace, forwardTraceChan chan *types.Trace, expiredTraceChan chan *types.Trace) {
now := i.Clock.Now()
for _, trace := range traces {
if trace != nil {

// first check if there's a trace decision
record, reason, found := i.sampleTraceCache.Test(trace.ID())
if found {
sentTraceChan <- sentTrace{trace, record, reason}
continue
}

// if trace has hit TraceTimeout, no need to forward it
if now.After(trace.SendBy) || trace.RootSpan != nil {
expiredTraceChan <- trace
continue
}

// if there's no trace decision, then we need forward
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
// the trace to its new home
forwardTraceChan <- trace
}
}
}

// sendTracesOnShutdown is a helper function that sends traces to their final destination
// on shutdown.
// It will return true if it times out waiting for traces to send or one of the traces channel has been closed.
func (i *InMemCollector) sendTracesOnShutdown(ctx context.Context, sentTraceChan <-chan sentTrace, forwardTraceChan <-chan *types.Trace, expiredTraceChan <-chan *types.Trace) (done bool) {
select {
case <-ctx.Done():
i.Logger.Error().Logf("Timed out waiting for traces to send")
return true

case tr, ok := <-sentTraceChan:
if !ok {
return true
}

i.Metrics.Count("trace_send_shutdown_total", 1)
i.Metrics.Count("trace_send_shutdown_late", 1)

for _, sp := range tr.trace.GetSpans() {
ctx, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_sent_trace", map[string]interface{}{"trace_id": tr.trace.TraceID, "hostname": i.hostname})

i.dealWithSentTrace(ctx, tr.record, tr.reason, sp)

span.End()
}

case tr, ok := <-expiredTraceChan:
if !ok {
return true
}

i.Metrics.Count("trace_send_shutdown_total", 1)

_, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_expired_trace", map[string]interface{}{"trace_id": tr.TraceID, "hostname": i.hostname})

if tr.RootSpan != nil {
i.Metrics.Count("trace_send_shutdown_root", 1)

i.send(tr, TraceSendGotRoot)

} else {
i.Metrics.Count("trace_send_shutdown_expired", 1)

i.send(tr, TraceSendExpired)
}
span.End()

case tr, ok := <-forwardTraceChan:
if !ok {
return true
}

i.Metrics.Count("trace_send_shutdown_total", 1)
i.Metrics.Count("trace_send_shutdown_forwarded", 1)

_, span := otelutil.StartSpanMulti(ctx, i.Tracer, "shutdown_forwarded_trace", map[string]interface{}{"trace_id": tr.TraceID, "hostname": i.hostname})

targetShard := i.Sharder.WhichShard(tr.ID())
url := targetShard.GetAddress()

otelutil.AddSpanField(span, "target_shard", url)

for _, sp := range tr.GetSpans() {
sp.APIHost = url
i.Transmission.EnqueueSpan(sp)
}
span.End()
}

return false
}

// Convenience method for tests.
func (i *InMemCollector) getFromCache(traceID string) *types.Trace {
i.mutex.RLock()
Expand Down
Loading
Loading