Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only enalbe stress relief for the entire cluster together #1413

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (i *InMemCollector) Start() error {
defer func() { i.Logger.Debug().Logf("Finished starting InMemCollector") }()
imcConfig := i.Config.GetCollectionConfig()
i.cache = cache.NewInMemCache(imcConfig.CacheCapacity, i.Metrics, i.Logger)
i.StressRelief.UpdateFromConfig(i.Config.GetStressReliefConfig())
i.StressRelief.UpdateFromConfig()

// listen for config reloads
i.Config.RegisterReloadCallback(i.sendReloadSignal)
Expand Down Expand Up @@ -243,7 +243,7 @@ func (i *InMemCollector) reloadConfigs() {

i.sampleTraceCache.Resize(i.Config.GetSampleCacheConfig())

i.StressRelief.UpdateFromConfig(i.Config.GetStressReliefConfig())
i.StressRelief.UpdateFromConfig()

// clear out any samplers that we have previously created
// so that the new configuration will be propagated
Expand Down
30 changes: 19 additions & 11 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
const stressReliefTopic = "refinery-stress-relief"

type StressReliever interface {
UpdateFromConfig(cfg config.StressReliefConfig)
UpdateFromConfig()
Recalc() uint
Stressed() bool
GetSampleRate(traceID string) (rate uint, keep bool, reason string)
Expand All @@ -41,10 +41,10 @@ type MockStressReliever struct {
SampleRate uint
}

func (m *MockStressReliever) Start() error { return nil }
func (m *MockStressReliever) UpdateFromConfig(cfg config.StressReliefConfig) {}
func (m *MockStressReliever) Recalc() uint { return 0 }
func (m *MockStressReliever) Stressed() bool { return m.IsStressed }
func (m *MockStressReliever) Start() error { return nil }
func (m *MockStressReliever) UpdateFromConfig() {}
func (m *MockStressReliever) Recalc() uint { return 0 }
func (m *MockStressReliever) Stressed() bool { return m.IsStressed }
func (m *MockStressReliever) GetSampleRate(traceID string) (rate uint, keep bool, reason string) {
return m.SampleRate, m.ShouldKeep, "mock"
}
Expand Down Expand Up @@ -76,6 +76,7 @@ var _ StressReliever = &StressRelief{}

type StressRelief struct {
RefineryMetrics metrics.Metrics `inject:"metrics"`
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Health health.Recorder `inject:""`
PubSub pubsub.PubSub `inject:""`
Expand Down Expand Up @@ -141,8 +142,8 @@ func (s *StressRelief) Start() error {

// All of the numerator metrics are gauges. The denominator metrics are constants.
s.calcs = []StressReliefCalculation{
{Numerator: "collector_peer_queue_length", Denominator: "PEER_CAP", Algorithm: "sqrt", Reason: "CacheCapacity (peer)"},
{Numerator: "collector_incoming_queue_length", Denominator: "INCOMING_CAP", Algorithm: "sqrt", Reason: "CacheCapacity (incoming)"},
{Numerator: "collector_peer_queue_length", Denominator: "PEER_CAP", Algorithm: "sqrt", Reason: "PeerQueueSize"},
{Numerator: "collector_incoming_queue_length", Denominator: "INCOMING_CAP", Algorithm: "sqrt", Reason: "IncomingQueueSize"},
{Numerator: "libhoney_peer_queue_length", Denominator: "PEER_BUFFER_SIZE", Algorithm: "sqrt", Reason: "PeerBufferSize"},
{Numerator: "libhoney_upstream_queue_length", Denominator: "UPSTREAM_BUFFER_SIZE", Algorithm: "sqrt", Reason: "UpstreamBufferSize"},
{Numerator: "memory_heap_allocation", Denominator: "MEMORY_MAX_ALLOC", Algorithm: "sigmoid", Reason: "MaxAlloc"},
Expand Down Expand Up @@ -248,10 +249,12 @@ func (s *StressRelief) onStressLevelUpdate(ctx context.Context, msg string) {
}
}

func (s *StressRelief) UpdateFromConfig(cfg config.StressReliefConfig) {
func (s *StressRelief) UpdateFromConfig() {
s.lock.Lock()
defer s.lock.Unlock()

cfg := s.Config.GetStressReliefConfig()

switch cfg.Mode {
case "never", "":
s.mode = Never
Expand Down Expand Up @@ -416,9 +419,14 @@ func (s *StressRelief) Recalc() uint {
s.lock.Lock()
defer s.lock.Unlock()

// The overall stress level is the max of the individual and cluster stress levels
// If a single node is under significant stress, it can activate stress relief mode
s.overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel)))
overallStressLevel := clusterStressLevel

if s.Config.GetCollectionConfig().EnableTraceLocality {
// The overall stress level is the max of the individual and cluster stress levels
// If a single node is under significant stress, it can activate stress relief mode
overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel)))
}
s.overallStressLevel = overallStressLevel
s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel)

s.reason = reason
Expand Down
132 changes: 110 additions & 22 deletions collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ func TestStressRelief_Monitor(t *testing.T) {
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

cfg := config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 50,
SamplingRate: 2,
MinimumActivationDuration: config.Duration(5 * time.Second),
sr.Config = &config.MockConfig{
StressRelief: config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 50,
SamplingRate: 2,
MinimumActivationDuration: config.Duration(5 * time.Second),
},
}

// On startup, the stress relief should not be active
sr.UpdateFromConfig(cfg)
sr.UpdateFromConfig()
require.False(t, sr.Stressed())

// Test 1
Expand Down Expand Up @@ -91,16 +92,17 @@ func TestStressRelief_Peer(t *testing.T) {

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

cfg := config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
SamplingRate: 2,
MinimumActivationDuration: config.Duration(5 * time.Second),
sr.Config = &config.MockConfig{
StressRelief: config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
SamplingRate: 2,
MinimumActivationDuration: config.Duration(5 * time.Second),
},
}

// On startup, the stress relief should not be active
sr.UpdateFromConfig(cfg)
sr.UpdateFromConfig()
require.False(t, sr.Stressed())

// activate stress relief in one refinery
Expand Down Expand Up @@ -152,15 +154,101 @@ func TestStressRelief_OverallStressLevel(t *testing.T) {

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

cfg := config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
sr.Config = &config.MockConfig{
StressRelief: config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
},
}
// On startup, the stress relief should not be active
sr.UpdateFromConfig()
require.False(t, sr.Stressed())

// Test 1
// when a single peer's individual stress level is above the activation level
// the overall stress level should be cased on the cluster's stress level
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 965)
clock.Advance(time.Second * 1)
sr.stressLevels = make(map[string]stressReport, 100)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 10,
timestamp: sr.Clock.Now(),
}
}

localLevel := sr.Recalc()
require.NotEqual(t, localLevel, sr.overallStressLevel)
require.False(t, sr.stressed)

// Test 2
// when a single peer's individual stress level is below the activation level
// and the rest of the cluster is above the activation level
// the single peer should remain in stress relief mode
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 85,
timestamp: sr.Clock.Now(),
}
}
localLevel = sr.Recalc()
require.Greater(t, sr.overallStressLevel, localLevel)
require.True(t, sr.stressed)

// Test 3
// Only when both the single peer's individual stress level and the cluster stress
// level is below the activation level, the stress relief should be deactivated.
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 1,
timestamp: sr.Clock.Now(),
}
}
clock.Advance(sr.minDuration * 2)
localLevel = sr.Recalc()
assert.NotEqual(t, sr.overallStressLevel, localLevel)
assert.False(t, sr.stressed)
}

func TestStressRelief_OverallStressLevel_EnableTraceLocality(t *testing.T) {
clock := clockwork.NewFakeClock()
sr, stop := newStressRelief(t, clock, nil)
defer stop()

// disable the automatic stress level recalculation
sr.disableStressLevelReport = true
sr.Start()

sr.RefineryMetrics.Register(metrics.Metadata{
Name: "collector_incoming_queue_length",
Type: metrics.Gauge,
})

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

sr.Config = &config.MockConfig{
StressRelief: config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
},
GetCollectionConfigVal: config.CollectionConfig{
EnableTraceLocality: true,
},
}
// On startup, the stress relief should not be active
sr.UpdateFromConfig(cfg)
sr.UpdateFromConfig()
require.False(t, sr.Stressed())

// Test 1
Expand Down