Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve performance of stress relief #604

Merged
merged 6 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 55 additions & 32 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type StressRelief struct {
stressLevel uint
reason string
stressed bool
belowMin bool
stayOnUntil time.Time
minDuration time.Duration
RefineryMetrics metrics.Metrics `inject:"metrics"`
Logger logger.Logger `inject:""`
Expand Down Expand Up @@ -90,22 +90,27 @@ func (s *StressRelief) Start() error {
}

// start our monitor goroutine that periodically calls recalc
go func(d *StressRelief) {
go func(s *StressRelief) {
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
d.Recalc()
case <-d.Done:
d.Logger.Debug().Logf("Stopping StressRelief system")
s.Recalc()
case <-s.Done:
s.Logger.Debug().Logf("Stopping StressRelief system")
return
}
}
}(s)
return nil
}

func (s *StressRelief) Stop() error {
close(s.Done)
return nil
}

func (s *StressRelief) UpdateFromConfig(cfg config.StressReliefConfig) error {
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -114,6 +119,16 @@ func (s *StressRelief) UpdateFromConfig(cfg config.StressReliefConfig) error {
case "never", "":
s.mode = Never
case "monitor":
// If we're switching into monitor mode from some other state (which
// happens on startup), we will start up in stressed mode for a
// configurable time to try to make sure that we can handle the load
// before we start processing it in earnest. This is to help address the
// problem of trying to bring a new node into an already-overloaded
// cluster. If the time is 0 we won't do this at all.
if s.mode != Monitor && cfg.StartStressedDuration != 0 {
s.stressed = true
s.stayOnUntil = time.Now().Add(cfg.StartStressedDuration)
}
s.mode = Monitor
case "always":
s.mode = Always
Expand All @@ -130,16 +145,19 @@ func (s *StressRelief) UpdateFromConfig(cfg config.StressReliefConfig) error {
s.sampleRate = 1
}
s.minDuration = cfg.MinimumActivationDuration

s.Logger.Debug().
WithField("activation_level", s.activateLevel).
WithField("deactivation_level", s.deactivateLevel).
WithField("sampling_rate", s.sampleRate).
WithField("min_duration", s.minDuration).
WithField("startup_duration", cfg.MinimumActivationDuration).
Logf("StressRelief parameters")

// Get the actual upper bound - the largest possible value divided by
// the sample rate. In the case where the sample rate is 1, this should
// sample every value.
// Get the actual upper bound - the largest possible 64-bit value divided by
// the sample rate. This is used because the hash with which we sample is a
// uint64. In the case where the sample rate is 1, this should sample every
// value.
s.upperBound = math.MaxUint64 / s.sampleRate

return nil
Expand Down Expand Up @@ -224,9 +242,11 @@ func (s *StressRelief) square(num, denom string) float64 {
// under the presumption that if we're using less than half of RAM,
func (s *StressRelief) sigmoid(num, denom string) float64 {
r := s.ratio(num, denom)
// this is an S curve from 0 to 1, centered around 0.5 -- determined
// by messing around with a graphing calculator
stress := .395*math.Atan(6*(r-0.5)) + 0.5
// This is an S curve from 0 to 1, centered around 0.5 -- constants were
// empirically determined by messing around with a graphing calculator. The
// only reason you might change these is if you want to change the bendiness
// of the S curve.
stress := 0.400305589*math.Atan(6*(r-0.5)) + 0.5
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
s.Logger.Debug().
WithField("algorithm", "sigmoid").
WithField("result", stress).
Expand Down Expand Up @@ -261,44 +281,47 @@ func (s *StressRelief) Recalc() {
s.Logger.Debug().WithField("stress_level", level).WithField("reason", reason).Logf("calculated stress level")

s.lock.Lock()
defer s.lock.Unlock()

s.stressLevel = uint(level)
s.reason = reason
s.lock.Unlock()
}

func (s *StressRelief) StressLevel() uint {
s.lock.RLock()
defer s.lock.RUnlock()
return s.stressLevel
}

// Stressed() indicates whether the system should act as if it's stressed.
// Note that the stress_level metric is independent of mode.
func (s *StressRelief) Stressed() bool {
s.lock.Lock()
defer s.lock.Unlock()
switch s.mode {
case Never:
s.stressed = false
case Always:
s.stressed = true
case Monitor:
// If it's off, should we activate it?
if !s.stressed && s.stressLevel >= s.activateLevel {
s.stressed = true
// we want make sure that stress relief is on for a minimum time
s.belowMin = true
time.AfterFunc(s.minDuration, func() {
s.lock.Lock()
s.belowMin = false
s.lock.Unlock()
})
s.Logger.Info().WithField("stress_level", s.stressLevel).WithField("reason", s.reason).Logf("StressRelief has been activated")
}
if s.stressed && !s.belowMin && s.stressLevel < s.deactivateLevel {
// We want make sure that stress relief is below the deactivate level
// for a minimum time after the last time we said it should be, so
// whenever it's above that value we push the time out.
if s.stressed && s.stressLevel >= s.deactivateLevel {
s.stayOnUntil = time.Now().Add(s.minDuration)
}
// If it's on, should we deactivate it?
if s.stressed && s.stressLevel < s.deactivateLevel && time.Now().After(s.stayOnUntil) {
s.stressed = false
s.Logger.Info().WithField("stress_level", s.stressLevel).Logf("StressRelief has been deactivated")
}
}
}

func (s *StressRelief) StressLevel() uint {
s.lock.RLock()
defer s.lock.RUnlock()
return s.stressLevel
}

// Stressed() indicates whether the system should act as if it's stressed.
// Note that the stress_level metric is independent of mode.
func (s *StressRelief) Stressed() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.stressed
}

Expand Down
2 changes: 2 additions & 0 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type StressReliefConfig struct {
DeactivationLevel uint
StressSamplingRate uint64
MinimumActivationDuration time.Duration
StartStressedDuration time.Duration
}

// NewConfig creates a new config struct
Expand Down Expand Up @@ -192,6 +193,7 @@ func NewConfig(config, rules string, errorCallback func(error)) (Config, error)
c.SetDefault("StressRelief.DeactivationLevel", 25)
c.SetDefault("StressRelief.StressSamplingRate", 100)
c.SetDefault("StressRelief.MinimumActivationDuration", 10*time.Second)
c.SetDefault("StressRelief.StartStressedDuration", 3*time.Second)

c.SetConfigFile(config)
err := c.ReadInConfig()
Expand Down
9 changes: 9 additions & 0 deletions config_complete.toml
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,12 @@ MetricsReportingInterval = 3
# Default = 10s
# Eligible for live reload.
# MinimumActivationDuration = 10s

# MinimumStartupDuration is used when switching into Monitor mode.
# When stress monitoring is enabled, it will start up in stressed mode for a
# at least this amount of time to try to make sure that Refinery can handle the load
# before it begins processing it in earnest. This is to help address the
# problem of trying to bring a new node into an already-overloaded
# cluster. If this duration is 0, Refinery will not start in stressed mode.
# This can provide faster startup at the possible cost of startup instability.
# MinimumStartupDuration = 3s