Skip to content

Commit

Permalink
feat: Deprecate integer seconds and replace with time.Duration (#59)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Some samplers were using integer seconds rather than time.Duration for
specifying intervals. Durations are the right answer, especially for
configurations since they can be specified with fairly natural strings
like `1m30s` or `300ms`.

## Short description of the changes

- Deprecate things like ClearFrequencySec
- Add ClearFrequencyDuration and the logic so that both cannot be
specified, and that if *Sec is specified, then it's converted to a
Duration in the proper units
- Fix the math to use durations
- Write tests to make sure it all works

Fixes #56.
  • Loading branch information
kentquirk authored Mar 21, 2023
1 parent 141159f commit 2f3e10b
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 68 deletions.
34 changes: 24 additions & 10 deletions avgsamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dynsampler
import (
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"time"
Expand All @@ -13,21 +14,27 @@ import (
// the correct average. This method breaks down when total traffic is low
// because it will be excessively sampled.
//
// Keys that occur only once within ClearFrequencySec will always have a sample
// rate of 1. Keys that occur more frequently will be sampled on a logarithmic
// curve. In other words, every key will be represented at least once per
// ClearFrequencySec and more frequent keys will have their sample rate
// increased proportionally to wind up with the goal sample rate.
// Keys that occur only once within ClearFrequencyDuration will always have a
// sample rate of 1. Keys that occur more frequently will be sampled on a
// logarithmic curve. In other words, every key will be represented at least
// once per ClearFrequencyDuration and more frequent keys will have their sample
// rate increased proportionally to wind up with the goal sample rate.
type AvgSampleRate struct {
// ClearFrequencySec is how often the counters reset in seconds; default 30
// DEPRECATED -- use ClearFrequencyDuration.
// ClearFrequencySec is how often the counters reset in seconds.
ClearFrequencySec int

// ClearFrequencyDuration is how often the counters reset as a Duration.
// Note that either this or ClearFrequencySec can be specified, but not both.
// If neither one is set, the default is 30s.
ClearFrequencyDuration time.Duration

// GoalSampleRate is the average sample rate we're aiming for, across all
// events. Default 10
GoalSampleRate int

// MaxKeys, if greater than 0, limits the number of distinct keys used to build
// the sample rate map within the interval defined by `ClearFrequencySec`. Once
// the sample rate map within the interval defined by `ClearFrequencyDuration`. Once
// MaxKeys is reached, new keys will not be included in the sample rate map, but
// existing keys will continue to be be counted.
MaxKeys int
Expand All @@ -49,9 +56,16 @@ var _ Sampler = (*AvgSampleRate)(nil)

func (a *AvgSampleRate) Start() error {
// apply defaults
if a.ClearFrequencySec == 0 {
a.ClearFrequencySec = 30
if a.ClearFrequencyDuration != 0 && a.ClearFrequencySec != 0 {
return fmt.Errorf("the ClearFrequencySec configuration value is deprecated; use only ClearFrequencyDuration")
}

if a.ClearFrequencyDuration == 0 && a.ClearFrequencySec == 0 {
a.ClearFrequencyDuration = 30 * time.Second
} else if a.ClearFrequencySec != 0 {
a.ClearFrequencyDuration = time.Duration(a.ClearFrequencySec) * time.Second
}

if a.GoalSampleRate == 0 {
a.GoalSampleRate = 10
}
Expand All @@ -66,7 +80,7 @@ func (a *AvgSampleRate) Start() error {

// spin up calculator
go func() {
ticker := time.NewTicker(time.Second * time.Duration(a.ClearFrequencySec))
ticker := time.NewTicker(a.ClearFrequencyDuration)
defer ticker.Stop()
for {
select {
Expand Down
33 changes: 33 additions & 0 deletions avgsamplerate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,36 @@ func randomString(length int) string {
rand.Read(b)
return fmt.Sprintf("%x", b)
}

func TestAvgSampleRate_Start(t *testing.T) {
tests := []struct {
name string
ClearFrequencySec int
ClearFrequencyDuration time.Duration
wantDuration time.Duration
wantErr bool
}{
{"sec only", 2, 0, 2 * time.Second, false},
{"dur only", 0, 1003 * time.Millisecond, 1003 * time.Millisecond, false},
{"default", 0, 0, 30 * time.Second, false},
{"both", 2, 2 * time.Second, 0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &AvgSampleRate{
ClearFrequencySec: tt.ClearFrequencySec,
ClearFrequencyDuration: tt.ClearFrequencyDuration,
}
err := a.Start()
if (err != nil) != tt.wantErr {
t.Errorf("AvgSampleRate error = %v, wantErr %v", err, tt.wantErr)
}
if err == nil {
defer a.Stop()
if tt.wantDuration != a.ClearFrequencyDuration {
t.Errorf("AvgSampleRate duration mismatch = want %v, got %v", tt.wantDuration, a.ClearFrequencyDuration)
}
}
})
}
}
30 changes: 22 additions & 8 deletions avgsamplewithmin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dynsampler

import (
"fmt"
"math"
"sync"
"time"
Expand All @@ -13,21 +14,27 @@ import (
// method, without the failings it shows on the low end of total traffic
// throughput
//
// Keys that occur only once within ClearFrequencySec will always have a sample
// Keys that occur only once within ClearFrequencyDuration will always have a sample
// rate of 1. Keys that occur more frequently will be sampled on a logarithmic
// curve. In other words, every key will be represented at least once per
// ClearFrequencySec and more frequent keys will have their sample rate
// ClearFrequencyDuration and more frequent keys will have their sample rate
// increased proportionally to wind up with the goal sample rate.
type AvgSampleWithMin struct {
// ClearFrequencySec is how often the counters reset in seconds; default 30
// DEPRECATED -- use ClearFrequencyDuration.
// ClearFrequencySec is how often the counters reset in seconds.
ClearFrequencySec int

// ClearFrequencyDuration is how often the counters reset as a Duration.
// Note that either this or ClearFrequencySec can be specified, but not both.
// If neither one is set, the default is 30s.
ClearFrequencyDuration time.Duration

// GoalSampleRate is the average sample rate we're aiming for, across all
// events. Default 10
GoalSampleRate int

// MaxKeys, if greater than 0, limits the number of distinct keys used to build
// the sample rate map within the interval defined by `ClearFrequencySec`. Once
// the sample rate map within the interval defined by `ClearFrequencyDuration`. Once
// MaxKeys is reached, new keys will not be included in the sample rate map, but
// existing keys will continue to be be counted.
MaxKeys int
Expand All @@ -53,9 +60,16 @@ var _ Sampler = (*AvgSampleWithMin)(nil)

func (a *AvgSampleWithMin) Start() error {
// apply defaults
if a.ClearFrequencySec == 0 {
a.ClearFrequencySec = 30
if a.ClearFrequencyDuration != 0 && a.ClearFrequencySec != 0 {
return fmt.Errorf("the ClearFrequencySec configuration value is deprecated; use only ClearFrequencyDuration")
}

if a.ClearFrequencyDuration == 0 && a.ClearFrequencySec == 0 {
a.ClearFrequencyDuration = 30 * time.Second
} else if a.ClearFrequencySec != 0 {
a.ClearFrequencyDuration = time.Duration(a.ClearFrequencySec) * time.Second
}

if a.GoalSampleRate == 0 {
a.GoalSampleRate = 10
}
Expand All @@ -70,7 +84,7 @@ func (a *AvgSampleWithMin) Start() error {

// spin up calculator
go func() {
ticker := time.NewTicker(time.Second * time.Duration(a.ClearFrequencySec))
ticker := time.NewTicker(a.ClearFrequencyDuration)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -116,7 +130,7 @@ func (a *AvgSampleWithMin) updateMaps() {
}
goalCount := float64(sumEvents) / float64(a.GoalSampleRate)
// check to see if we fall below the minimum
if sumEvents < float64(a.MinEventsPerSec*a.ClearFrequencySec) {
if sumEvents < float64(a.MinEventsPerSec)*a.ClearFrequencyDuration.Seconds() {
// we still need to go through each key to set sample rates individually
for k := range tmpCounts {
newSavedSampleRates[k] = 1
Expand Down
40 changes: 37 additions & 3 deletions avgsamplewithmin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestAvgSampleWithMinUpdateMaps(t *testing.T) {
a := &AvgSampleWithMin{
GoalSampleRate: 20,
MinEventsPerSec: 50,
ClearFrequencySec: 30,
GoalSampleRate: 20,
MinEventsPerSec: 50,
ClearFrequencyDuration: 30 * time.Second,
}
tsts := []struct {
inputSampleCount map[string]float64
Expand Down Expand Up @@ -255,3 +256,36 @@ func TestAvgSampleWithMinMaxKeys(t *testing.T) {
assert.Equal(t, 3, len(a.currentCounts))
assert.Equal(t, 2., a.currentCounts["one"])
}

func TestAvgSampleWithMin_Start(t *testing.T) {
tests := []struct {
name string
ClearFrequencySec int
ClearFrequencyDuration time.Duration
wantDuration time.Duration
wantErr bool
}{
{"sec only", 2, 0, 2 * time.Second, false},
{"dur only", 0, 1003 * time.Millisecond, 1003 * time.Millisecond, false},
{"default", 0, 0, 30 * time.Second, false},
{"both", 2, 2 * time.Second, 0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &AvgSampleWithMin{
ClearFrequencySec: tt.ClearFrequencySec,
ClearFrequencyDuration: tt.ClearFrequencyDuration,
}
err := a.Start()
if (err != nil) != tt.wantErr {
t.Errorf("AvgSampleWithMin error = %v, wantErr %v", err, tt.wantErr)
}
if err == nil {
defer a.Stop()
if tt.wantDuration != a.ClearFrequencyDuration {
t.Errorf("AvgSampleWithMin duration mismatch = want %v, got %v", tt.wantDuration, a.ClearFrequencyDuration)
}
}
})
}
}
27 changes: 21 additions & 6 deletions emasamplerate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dynsampler
import (
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"time"
Expand All @@ -28,10 +29,17 @@ import (
// given window and more frequent keys will have their sample rate
// increased proportionally to wind up with the goal sample rate.
type EMASampleRate struct {
// DEPRECATED -- use AdjustmentIntervalDuration
// AdjustmentInterval defines how often (in seconds) we adjust the moving average from
// recent observations. Default 15s
// recent observations.
AdjustmentInterval int

// AdjustmentIntervalDuration is how often we adjust the moving average from
// recent observations.
// Note that either this or AdjustmentInterval can be specified, but not both.
// If neither one is set, the default is 15s.
AdjustmentIntervalDuration time.Duration

// Weight is a value between (0, 1) indicating the weighting factor used to adjust
// the EMA. With larger values, newer data will influence the average more, and older
// values will be factored out more quickly. In mathematical literature concerning EMA,
Expand All @@ -57,7 +65,7 @@ type EMASampleRate struct {

// BurstMultiple, if set, is multiplied by the sum of the running average of counts to define
// the burst detection threshold. If total counts observed for a given interval exceed the threshold
// EMA is updated immediately, rather than waiting on the AdjustmentInterval.
// EMA is updated immediately, rather than waiting on the AdjustmentIntervalDuration.
// Defaults to 2; negative value disables. With a default of 2, if your traffic suddenly doubles,
// burst detection will kick in.
BurstMultiple float64
Expand Down Expand Up @@ -92,9 +100,16 @@ var _ Sampler = (*EMASampleRate)(nil)

func (e *EMASampleRate) Start() error {
// apply defaults
if e.AdjustmentInterval == 0 {
e.AdjustmentInterval = 15
if e.AdjustmentIntervalDuration != 0 && e.AdjustmentInterval != 0 {
return fmt.Errorf("the AdjustmentInterval configuration value is deprecated; use only AdjustmentIntervalDuration")
}

if e.AdjustmentIntervalDuration == 0 && e.AdjustmentInterval == 0 {
e.AdjustmentIntervalDuration = 15 * time.Second
} else if e.AdjustmentInterval != 0 {
e.AdjustmentIntervalDuration = time.Duration(e.AdjustmentInterval) * time.Second
}

if e.GoalSampleRate == 0 {
e.GoalSampleRate = 10
}
Expand Down Expand Up @@ -123,14 +138,14 @@ func (e *EMASampleRate) Start() error {
e.done = make(chan struct{})

go func() {
ticker := time.NewTicker(time.Second * time.Duration(e.AdjustmentInterval))
ticker := time.NewTicker(e.AdjustmentIntervalDuration)
defer ticker.Stop()
for {
select {
case <-e.burstSignal:
// reset ticker when we get a burst
ticker.Stop()
ticker = time.NewTicker(time.Second * time.Duration(e.AdjustmentInterval))
ticker = time.NewTicker(e.AdjustmentIntervalDuration)
e.updateMaps()
case <-ticker.C:
e.updateMaps()
Expand Down
37 changes: 35 additions & 2 deletions emasamplerate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestEMAAgesOutSmallValues(t *testing.T) {

func TestEMABurstDetection(t *testing.T) {
// Set the adjustment interval very high so that we never run the regular interval
e := &EMASampleRate{AdjustmentInterval: 3600}
e := &EMASampleRate{AdjustmentIntervalDuration: 1 * time.Hour}
err := e.Start()
assert.Nil(t, err)

Expand Down Expand Up @@ -326,7 +326,7 @@ func TestEMABurstDetection(t *testing.T) {
}

func TestEMAUpdateMapsRace(t *testing.T) {
e := &EMASampleRate{AdjustmentInterval: 3600}
e := &EMASampleRate{AdjustmentIntervalDuration: 1 * time.Hour}
e.testSignalMapsDone = make(chan struct{}, 1000)
err := e.Start()
assert.Nil(t, err)
Expand Down Expand Up @@ -500,3 +500,36 @@ func TestEMASampleRateMultiHitsTargetRate(t *testing.T) {
}
}
}

func TestEMASampleRate_Start(t *testing.T) {
tests := []struct {
name string
AdjustmentInterval int
AdjustmentIntervalDuration time.Duration
wantDuration time.Duration
wantErr bool
}{
{"sec only", 2, 0, 2 * time.Second, false},
{"dur only", 0, 1003 * time.Millisecond, 1003 * time.Millisecond, false},
{"default", 0, 0, 15 * time.Second, false},
{"both", 2, 2 * time.Second, 0, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := &EMASampleRate{
AdjustmentInterval: tt.AdjustmentInterval,
AdjustmentIntervalDuration: tt.AdjustmentIntervalDuration,
}
err := a.Start()
if (err != nil) != tt.wantErr {
t.Errorf("EMASampleRate error = %v, wantErr %v", err, tt.wantErr)
}
if err == nil {
defer a.Stop()
if tt.wantDuration != a.AdjustmentIntervalDuration {
t.Errorf("EMASampleRate duration mismatch = want %v, got %v", tt.wantDuration, a.AdjustmentIntervalDuration)
}
}
})
}
}
Loading

0 comments on commit 2f3e10b

Please sign in to comment.