Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SpanLimit (includes some config changes) #1266

Merged
merged 10 commits into from
Aug 12, 2024
16 changes: 9 additions & 7 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ func newStartedApp(
enableHostMetadata bool,
) (*App, inject.Graph) {
c := &config.MockConfig{
GetSendDelayVal: 1 * time.Millisecond,
GetTraceTimeoutVal: 10 * time.Millisecond,
GetMaxBatchSizeVal: 500,
GetTracesConfigVal: config.TracesConfig{
SendTicker: config.Duration(2 * time.Millisecond),
SendDelay: config.Duration(1 * time.Millisecond),
TraceTimeout: config.Duration(10 * time.Millisecond),
MaxBatchSize: 500,
},
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
SendTickerVal: 2 * time.Millisecond,
PeerManagementType: "file",
GetUpstreamBufferSizeVal: 10000,
GetPeerBufferSizeVal: 10000,
Expand Down Expand Up @@ -159,7 +161,7 @@ func newStartedApp(
sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer"))
peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
MaxBatchSize: c.GetMaxBatchSize(),
MaxBatchSize: c.GetTracesConfigVal.MaxBatchSize,
BatchTimeout: libhoney.DefaultBatchTimeout,
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
Expand Down Expand Up @@ -241,7 +243,7 @@ func TestAppIntegration(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()

time.Sleep(5 * app.Config.GetSendTickerValue())
time.Sleep(5 * app.Config.GetTracesConfig().GetSendTickerValue())

require.EventuallyWithT(t, func(collect *assert.CollectT) {
events := sender.Events()
Expand Down Expand Up @@ -441,7 +443,7 @@ func TestHostMetadataSpanAdditions(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()

time.Sleep(5 * app.Config.GetSendTickerValue())
time.Sleep(5 * app.Config.GetTracesConfig().GetSendTickerValue())

events := sender.Events()
require.Len(t, events, 1)
Expand Down
8 changes: 4 additions & 4 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func main() {
userAgentAddition := "refinery/" + version
upstreamClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
MaxBatchSize: c.GetMaxBatchSize(),
BatchTimeout: c.GetBatchTimeout(),
MaxBatchSize: c.GetTracesConfig().GetMaxBatchSize(),
BatchTimeout: time.Duration(c.GetTracesConfig().GetBatchTimeout()),
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetUpstreamBufferSize()),
UserAgentAddition: userAgentAddition,
Expand All @@ -183,8 +183,8 @@ func main() {

peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
MaxBatchSize: c.GetMaxBatchSize(),
BatchTimeout: c.GetBatchTimeout(),
MaxBatchSize: c.GetTracesConfig().GetMaxBatchSize(),
BatchTimeout: time.Duration(c.GetTracesConfig().GetBatchTimeout()),
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
UserAgentAddition: userAgentAddition,
Expand Down
33 changes: 26 additions & 7 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func GetCollectorImplementation(c config.Config) Collector {
const (
TraceSendGotRoot = "trace_send_got_root"
TraceSendExpired = "trace_send_expired"
TraceSendSpanLimit = "trace_send_span_limit"
TraceSendEjectedFull = "trace_send_ejected_full"
TraceSendEjectedMemsize = "trace_send_ejected_memsize"
TraceSendLateSpan = "trace_send_late_span"
Expand Down Expand Up @@ -120,6 +121,7 @@ func (i *InMemCollector) Start() error {

i.Metrics.Register(TraceSendGotRoot, "counter")
i.Metrics.Register(TraceSendExpired, "counter")
i.Metrics.Register(TraceSendSpanLimit, "counter")
i.Metrics.Register(TraceSendEjectedFull, "counter")
i.Metrics.Register(TraceSendEjectedMemsize, "counter")
i.Metrics.Register(TraceSendLateSpan, "counter")
Expand Down Expand Up @@ -229,7 +231,7 @@ func (i *InMemCollector) checkAlloc() {
return
}
allTraces := existingCache.GetAll()
timeout := i.Config.GetTraceTimeout()
timeout := i.Config.GetTracesConfig().GetTraceTimeout()
if timeout == 0 {
timeout = 60 * time.Second
} // Sort traces by CacheImpact, heaviest first
Expand Down Expand Up @@ -314,7 +316,7 @@ func (i *InMemCollector) add(sp *types.Span, ch chan<- *types.Span) error {
// block is the only place we are allowed to modify any running data
// structures.
func (i *InMemCollector) collect() {
tickerDuration := i.Config.GetSendTickerValue()
tickerDuration := i.Config.GetTracesConfig().GetSendTickerValue()
ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()

Expand Down Expand Up @@ -379,11 +381,16 @@ func (i *InMemCollector) collect() {

func (i *InMemCollector) sendExpiredTracesInCache(now time.Time) {
traces := i.cache.TakeExpiredTraces(now)
spanLimit := uint32(i.Config.GetTracesConfig().SpanLimit)
for _, t := range traces {
if t.RootSpan != nil {
i.sendTrace(t, TraceSendGotRoot)
} else {
i.sendTrace(t, TraceSendExpired)
if t.SpanCount() > spanLimit {
i.sendTrace(t, TraceSendSpanLimit)
} else {
i.sendTrace(t, TraceSendExpired)
}
}
}
}
Expand All @@ -397,6 +404,8 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
i.Metrics.Down("spans_waiting")
}()

tcfg := i.Config.GetTracesConfig()

trace := i.cache.Get(sp.TraceID)
if trace == nil {
// if the trace has already been sent, just pass along the span
Expand All @@ -412,7 +421,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// create a new trace to hold it
i.Metrics.Increment("trace_accepted")

timeout := i.Config.GetTraceTimeout()
timeout := tcfg.GetTraceTimeout()
if timeout == 0 {
timeout = 60 * time.Second
}
Expand Down Expand Up @@ -450,16 +459,26 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
// great! trace is live. add the span.
trace.AddSpan(sp)

// if this is a root span, send the trace
markTraceForSending := false
// if the span count has exceeded our SpanLimit, mark the trace for sending
if tcfg.SpanLimit > 0 && uint(trace.SpanCount()) > tcfg.SpanLimit {
markTraceForSending = true
}

// if this is a root span, say so and send the trace
if i.isRootSpan(sp) {
timeout := i.Config.GetSendDelay()
markTraceForSending = true
trace.RootSpan = sp
}

if markTraceForSending {
timeout := tcfg.GetSendDelay()

if timeout == 0 {
timeout = 2 * time.Second
}

trace.SendBy = time.Now().Add(timeout)
trace.RootSpan = sp
}
}

Expand Down
Loading
Loading