Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ThroughputLimit in samplers #1300

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
Expand Down Expand Up @@ -198,6 +199,8 @@ func newStartedApp(
&inject.Object{Value: samplerFactory},
&inject.Object{Value: &health.Health{}},
&inject.Object{Value: clockwork.NewRealClock()},
&inject.Object{Value: &pubsub.LocalPubSub{}},
&inject.Object{Value: &collect.EMAThroughputCalculator{}, Name: "throughputCalculator"},
&inject.Object{Value: &collect.MockStressReliever{}, Name: "stressRelief"},
&inject.Object{Value: &a},
)
Expand Down
1 change: 1 addition & 0 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func main() {
{Value: version, Name: "version"},
{Value: samplerFactory},
{Value: stressRelief, Name: "stressRelief"},
{Value: &collect.EMAThroughputCalculator{}, Name: "throughputCalculator"},
{Value: &health.Health{}},
{Value: &configwatcher.ConfigWatcher{}},
{Value: &a},
Expand Down
37 changes: 31 additions & 6 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ type InMemCollector struct {
Health health.Recorder `inject:""`
Sharder sharder.Sharder `inject:""`

Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
Peers peer.Peers `inject:""`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
ThroughputCalculator *EMAThroughputCalculator `inject:"throughputCalculator"`
Peers peer.Peers `inject:""`

// For test use only
BlockOnAddSpan bool
Expand Down Expand Up @@ -128,6 +129,10 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register("trace_send_on_shutdown", "counter")
i.Metrics.Register("trace_forwarded_on_shutdown", "counter")

i.Metrics.Register("original_sample_rate_before_multi", "histogram")
i.Metrics.Register("sample_rate_multi", "histogram")
i.Metrics.Register("trace_aggregate_sample_rate", "histogram")

i.Metrics.Register(TraceSendGotRoot, "counter")
i.Metrics.Register(TraceSendExpired, "counter")
i.Metrics.Register(TraceSendSpanLimit, "counter")
Expand Down Expand Up @@ -660,6 +665,7 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
}
}
if keep {
i.ThroughputCalculator.IncrementEventCount(1)
i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Sending span because of previous decision to send trace")
mergeTraceAndSpanSampleRates(sp, tr.Rate(), isDryRun)
// if this span is a late root span, possibly update it with our current span count
Expand Down Expand Up @@ -781,7 +787,25 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
}

// make sampling decision and update the trace
rate, shouldSend, reason, key := sampler.GetSampleRate(trace)
originalRate, reason, key := sampler.GetSampleRate(trace)
sampleRateMultiplier := i.ThroughputCalculator.GetSamplingRateMultiplier()
i.Metrics.Histogram("original_sample_rate_before_multi", originalRate)
i.Metrics.Histogram("sample_rate_multi", sampleRateMultiplier)

// counting the expected number of spans based on the original sample rate
// this will tell us the throughput we would have sent without the adjustment from the multiplier
i.ThroughputCalculator.IncrementEventCount(float64(trace.DescendantCount()) / float64(originalRate))

// TODO: if the sample rate returned by the sampler is set to 1, we should not
// modify the sample rate with the multiplier
var rate uint
if originalRate == 1 {
rate = originalRate
} else {
rate = uint(float64(originalRate) * sampleRateMultiplier)
}
shouldSend := sampler.MakeSamplingDecision(rate, trace)

trace.SetSampleRate(rate)
trace.KeepSample = shouldSend
logFields["reason"] = reason
Expand All @@ -799,6 +823,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
i.Logger.Info().WithFields(logFields).Logf("Dropping trace because of sampling")
return
}

i.Metrics.Increment("trace_send_kept")
// This will observe sample rate decisions only if the trace is kept
i.Metrics.Histogram("trace_kept_sample_rate", float64(rate))
Expand Down
33 changes: 22 additions & 11 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
Expand Down Expand Up @@ -50,14 +51,15 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I
healthReporter.Start()

return &InMemCollector{
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
ThroughputCalculator: &EMAThroughputCalculator{},
SamplerFactory: &sample.SamplerFactory{
Config: conf,
Metrics: s,
Expand Down Expand Up @@ -423,14 +425,20 @@ func TestDryRunMode(t *testing.T) {
var traceID2 = "def456"
var traceID3 = "ghi789"
// sampling decisions based on trace ID
sampleRate1, keepTraceID1, _, _ := sampler.GetSampleRate(&types.Trace{TraceID: traceID1})
trace1 := &types.Trace{TraceID: traceID1}
sampleRate1, _, _ := sampler.GetSampleRate(trace1)
keepTraceID1 := sampler.MakeSamplingDecision(sampleRate1, trace1)
// would be dropped if dry run mode was not enabled
assert.False(t, keepTraceID1)
assert.Equal(t, uint(10), sampleRate1)
sampleRate2, keepTraceID2, _, _ := sampler.GetSampleRate(&types.Trace{TraceID: traceID2})
trace2 := &types.Trace{TraceID: traceID2}
sampleRate2, _, _ := sampler.GetSampleRate(trace2)
keepTraceID2 := sampler.MakeSamplingDecision(sampleRate2, trace2)
assert.True(t, keepTraceID2)
assert.Equal(t, uint(10), sampleRate2)
sampleRate3, keepTraceID3, _, _ := sampler.GetSampleRate(&types.Trace{TraceID: traceID3})
trace3 := &types.Trace{TraceID: traceID3}
sampleRate3, _, _ := sampler.GetSampleRate(trace3)
keepTraceID3 := sampler.MakeSamplingDecision(sampleRate3, trace3)
// would be dropped if dry run mode was not enabled
assert.False(t, keepTraceID3)
assert.Equal(t, uint(10), sampleRate3)
Expand Down Expand Up @@ -827,8 +835,11 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &sharder.SingleServerSharder{}},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "metrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
&inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"},
&inject.Object{Value: &pubsub.LocalPubSub{}},
&inject.Object{Value: &EMAThroughputCalculator{}, Name: "throughputCalculator"},
&inject.Object{Value: &peer.MockPeers{}},
)
if err != nil {
Expand Down
205 changes: 205 additions & 0 deletions collect/throughput_calculator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package collect

import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
)

const emaThroughputTopic = "ema_throughput"

// EMAThroughputCalculator encapsulates the logic to calculate a throughput value using an Exponential Moving Average (EMA).
type EMAThroughputCalculator struct {
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Clock clockwork.Clock `inject:""`
Pubsub pubsub.PubSub `inject:""`
Peer peer.Peers `inject:""`

throughputLimit uint
weight float64 // Smoothing factor for EMA
intervalLength time.Duration // Length of the interval
hostID string

mut sync.RWMutex
throughputs map[string]throughputReport
clusterEMA uint
weightedEventTotal float64 // Internal count of events in the current interval
done chan struct{}
}

// NewEMAThroughputCalculator creates a new instance of EMAThroughputCalculator.
func (c *EMAThroughputCalculator) Start() error {
cfg := c.Config.GetThroughputCalculatorConfig()
c.throughputLimit = uint(cfg.Limit)
c.done = make(chan struct{})

// if throughput limit is not set, disable the calculator
if c.throughputLimit == 0 {
return nil
}

c.intervalLength = time.Duration(cfg.AdjustmentInterval)
if c.intervalLength == 0 {
c.intervalLength = 15 * time.Second
}

c.weight = cfg.Weight
if c.weight == 0 {
c.weight = 0.5
}

peerID, err := c.Peer.GetInstanceID()
if err != nil {
return err
}
c.hostID = peerID
c.throughputs = make(map[string]throughputReport)

c.Metrics.Register("cluster_throughput", "gauge")
c.Metrics.Register("cluster_ema_throughput", "gauge")
c.Metrics.Register("individual_throughput", "gauge")
c.Metrics.Register("ema_throughput_publish_error", "counter")
// Subscribe to the throughput topic so we can react to throughput
// changes in the cluster.
c.Pubsub.Subscribe(context.Background(), emaThroughputTopic, c.onThroughputUpdate)

// have a centralized peer metric service that's responsible for publishing and
// receiving peer metrics
// it could have a channel that's receiving metrics from different source
// it then only send a message if the value has changed and it has passed the configured interval for the metric
// there could be a third case that basically says you have to send it now because we have passed the configured interval and we haven't send a message about this metric since the last interval
go func() {
ticker := c.Clock.NewTicker(c.intervalLength)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only publish if the throughput is different from the previous calculation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

defer ticker.Stop()

for {
select {
case <-c.done:
return
case <-ticker.Chan():
currentThroughput := c.updateEMA()
err := c.Pubsub.Publish(context.Background(), emaThroughputTopic, newThroughputMessage(currentThroughput, peerID).String())
if err != nil {
c.Metrics.Count("ema_throughput_publish_error", 1)
}
}
}

}()

return nil
}

func (c *EMAThroughputCalculator) onThroughputUpdate(ctx context.Context, msg string) {
throughputMsg, err := unmarshalThroughputMessage(msg)
if err != nil {
return
}
c.mut.Lock()
c.throughputs[throughputMsg.peerID] = throughputReport{
key: throughputMsg.peerID,
throughput: throughputMsg.throughput,
timestamp: c.Clock.Now(),
}
c.mut.Unlock()
}

func (c *EMAThroughputCalculator) Stop() {
close(c.done)
}

// IncrementEventCount increments the internal event count by a specified amount.
func (c *EMAThroughputCalculator) IncrementEventCount(count float64) {
c.mut.Lock()
c.weightedEventTotal += count
c.mut.Unlock()
}

// updateEMA calculates the current throughput and updates the EMA.
func (c *EMAThroughputCalculator) updateEMA() uint {
c.mut.Lock()
defer c.mut.Unlock()

var totalThroughput float64

for _, report := range c.throughputs {
if c.Clock.Since(report.timestamp) > c.intervalLength*2 {
delete(c.throughputs, report.key)
continue
}

totalThroughput += float64(report.throughput)
}
c.Metrics.Gauge("cluster_throughput", totalThroughput)
c.clusterEMA = uint(math.Ceil(c.weight*totalThroughput + (1-c.weight)*float64(c.clusterEMA)))
c.Metrics.Gauge("cluster_ema_throughput", c.clusterEMA)

// calculating throughput for the next interval
currentThroughput := float64(c.weightedEventTotal) / c.intervalLength.Seconds()
c.Metrics.Gauge("individual_throughput", currentThroughput)
c.weightedEventTotal = 0 // Reset the event count for the new interval

return uint(currentThroughput)
}

// GetSamplingRateMultiplier calculates and returns a sampling rate multiplier
// based on the difference between the configured throughput limit and the current throughput.
func (c *EMAThroughputCalculator) GetSamplingRateMultiplier() float64 {
if c.throughputLimit == 0 {
return 1.0 // No limit set, so no adjustment needed
}

c.mut.RLock()
currentEMA := c.clusterEMA
c.mut.RUnlock()

if currentEMA <= c.throughputLimit {
return 1.0 // Throughput is within the limit, no adjustment needed
}

return float64(currentEMA) / float64(c.throughputLimit)
}

type throughputReport struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code can be refactored to be a shared logic in both stress relief and throughput calculator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they're quite similar.

Would it make sense to go even farther, and bundle the updates into the same messages? So the system maintains a map of named values that can be updated internally by each peer, and the peers send the map through pubsub?

key string
throughput uint
timestamp time.Time
}

type throughputMessage struct {
peerID string
throughput uint
}

func newThroughputMessage(throughput uint, peerID string) *throughputMessage {
return &throughputMessage{throughput: throughput, peerID: peerID}
}

func (msg *throughputMessage) String() string {
return msg.peerID + "|" + fmt.Sprint(msg.throughput)
}

func unmarshalThroughputMessage(msg string) (*throughputMessage, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives me an idea. Instead of taking a string for a message type, Pubsub could take a PubsubMessage, which would maybe just embed encoding.TextMarshaler and encoding.TextUnmarshaler.

That would kind of normalize the way we do these pack and unpack things for pubsub.

Or we could build a general-purpose PubsubMessage class that has the ability to add named fields.

if len(msg) < 2 {
return nil, fmt.Errorf("empty message")
}

parts := strings.SplitN(msg, "|", 2)
throughput, err := strconv.Atoi(parts[1])
if err != nil {
return nil, err
}

return newThroughputMessage(uint(throughput), parts[0]), nil
}
Loading
Loading