Skip to content

Commit

Permalink
feat(scaler.go): fuzz testing of new scaled wait and max scale
Browse files Browse the repository at this point in the history
  • Loading branch information
benjivesterby committed Mar 27, 2023
1 parent 408766c commit 8f47c84
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 12 deletions.
50 changes: 40 additions & 10 deletions scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ import (
"time"
)

// MinWait is the absolute minimum wait time for the ticker. This is used to
// prevent the ticker from firing too often and causing too small of a wait
// time.
const MinWait = time.Millisecond

// MinLife is the minimum life time for the scaler. This is used to prevent
// the scaler from exiting too quickly, and causing too small of a lifetime.
const MinLife = time.Millisecond

// Scaler implements generic auto-scaling logic which starts with a net-zero
// set of processing routines (with the exception of the channel listener) and
// then scales up and down based on the CPU contention of a system and the speed
Expand Down Expand Up @@ -44,7 +53,7 @@ type Scaler[T, U any] struct {

// Max is the maximum number of layer2 routines that will be spawned.
// If Max is set to 0, then there is no limit.
Max int
Max uint

wScale *DurationScaler
}
Expand Down Expand Up @@ -76,13 +85,13 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
// because the caller did not specify a wait time. This means Scaler will
// likely always scale up rather than waiting for an existing layer2 routine
// to pick up data.
if s.Wait <= 0 {
s.Wait = time.Nanosecond
if s.Wait <= MinWait {
s.Wait = MinWait
}

// Minimum life of a spawned layer2 should be 1ms
if s.Life < time.Microsecond {
s.Life = time.Microsecond
if s.Life < MinLife {
s.Life = MinLife
}

go func() {
Expand All @@ -103,13 +112,14 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
ticker := time.NewTicker(s.Wait)
defer ticker.Stop()
step := 0
stepMu := sync.RWMutex{}

var max chan struct{}

if s.Max > 0 {
max = make(chan struct{}, s.Max)
if s.Max == 0 {
close(max) // no limit
for i := uint(0); i < s.Max; i++ {
max <- struct{}{}
}
}

Expand All @@ -129,7 +139,7 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
case <-ctx.Done():
return
case <-ticker.C:
if s.Max != 0 {
if max != nil {
select {
case <-ctx.Done():
return
Expand All @@ -145,7 +155,9 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
wgMu.Unlock()

if !s.WaitModifier.inactive() {
stepMu.Lock()
step++
stepMu.Unlock()
}

go func() {
Expand All @@ -162,7 +174,9 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {

if !s.WaitModifier.inactive() {
defer func() {
stepMu.Lock()
step--
stepMu.Unlock()
}()
}

Expand All @@ -173,9 +187,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
}
}

stepN := 0
if !s.WaitModifier.inactive() {
stepMu.RLock()
stepN = step
stepMu.RUnlock()
}

// Reset the ticker so that it does not immediately trip the
// case statement on loop.
ticker.Reset(s.wScale.scaledDuration(s.Wait, step))
ticker.Reset(s.wScale.scaledDuration(s.Wait, stepN))
}
}
}()
Expand Down Expand Up @@ -288,6 +309,10 @@ func (t *DurationScaler) scaledDuration(
dur time.Duration,
currentInterval int,
) time.Duration {
if dur < MinWait {
dur = MinWait
}

if t.inactive() {
return dur
}
Expand All @@ -299,7 +324,12 @@ func (t *DurationScaler) scaledDuration(

if currentInterval%t.Interval == 0 {
t.lastInterval = currentInterval
return dur + time.Duration(float64(t.originalDuration)*mod)
out := dur + time.Duration(float64(t.originalDuration)*mod)
if out < MinWait {
return MinWait
}

return out
}

return dur
Expand Down
73 changes: 71 additions & 2 deletions scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -391,7 +392,75 @@ func TestTickDur(t *testing.T) {

func FuzzTick(f *testing.F) {
f.Fuzz(func(t *testing.T, step, cStep int, mod float64, orig, dur int64) {
tick := &DurationScaler{Interval: step, ScalingFactor: mod, originalDuration: time.Duration(orig)}
_ = tick.scaledDuration(time.Duration(dur), cStep)
tick := &DurationScaler{
Interval: step,
ScalingFactor: mod,
originalDuration: time.Duration(orig),
}

v := tick.scaledDuration(time.Duration(dur), cStep)
if v < 0 {
t.Fatalf("negative duration: %v", v)
}
})
}

func FuzzScaler(f *testing.F) {
// Define InterceptFunc
interceptFunc := func(ctx context.Context, t int) (string, bool) {
return fmt.Sprintf("%d", t), true
}

f.Fuzz(func(
t *testing.T,
wait, life int64,
step, cStep int,
mod float64,
max uint,
in int,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tick := DurationScaler{
Interval: step,
ScalingFactor: mod,
}

// Initialize Scaler
scaler := Scaler[int, string]{
Wait: time.Millisecond * time.Duration(wait),
Life: time.Millisecond * time.Duration(life),
Fn: interceptFunc,
WaitModifier: tick,
Max: max,
}

// Create a simple input channel
input := make(chan int, 1)
defer close(input)

// Execute the Scaler
out, err := scaler.Exec(ctx, input)
if err != nil {
t.Errorf("Scaler Exec failed: %v", err)
t.Fail()
}

// Send input value and check output
input <- in

select {
case <-ctx.Done():
t.Errorf("Scaler Exec timed out")
t.Fail()
case res := <-out:
if res != fmt.Sprintf("%d", in) {
t.Errorf("Scaler Exec failed: expected %d, got %s", in, res)
t.Fail()
}

t.Logf("Scaler Exec succeeded: expected %d, got %s", in, res)
}
})
}
6 changes: 6 additions & 0 deletions testdata/fuzz/FuzzTick/db1d459b216861c8
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go test fuzz v1
int(-84)
int(0)
float64(-0.625)
int64(0)
int64(-94)
6 changes: 6 additions & 0 deletions testdata/fuzz/FuzzTick/eaae912ff48d75e8
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
go test fuzz v1
int(0)
int(0)
float64(0)
int64(0)
int64(-90)
8 changes: 8 additions & 0 deletions testdata/fuzz/Fuzz_Scaler/555659aba42d18b7
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
int64(0)
int64(0)
int(0)
int(29)
float64(0)
uint(36)
int(0)
8 changes: 8 additions & 0 deletions testdata/fuzz/Fuzz_Scaler/62629b316805e69d
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
int64(-62)
int64(-78)
int(54)
int(103)
float64(-0.16666666666666666)
uint(33)
int(76)
8 changes: 8 additions & 0 deletions testdata/fuzz/Fuzz_Scaler/b9bef74a2c85cbf3
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
go test fuzz v1
int64(-158)
int64(-95)
int(54)
int(19)
float64(-0.16666666666666666)
uint(33)
int(76)

0 comments on commit 8f47c84

Please sign in to comment.