Skip to content

Commit

Permalink
feat(scaler.go): add support for a Wait modifier on scaler to increas…
Browse files Browse the repository at this point in the history
…e or decrease duration based on an interval

This change also corrects some linter errors and adds my new linter configuration
  • Loading branch information
benjivesterby committed Mar 25, 2023
1 parent 11f75df commit 44da329
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 146 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ jobs:
with:
go-version: ${{ matrix.go-version }}
stable: false
- name: Fuzz
run: go test -fuzz=./... -fuzztime=10s
- name: Test
run: go test -failfast ./... -race -coverprofile=coverage.txt -covermode=atomic
run: go test -fuzz=. -fuzztime=10s -failfast ./... -race -coverprofile=coverage.txt -covermode=atomic
- name: Push Coverage to codecov.io
uses: codecov/codecov-action@v3
with:
Expand Down
404 changes: 277 additions & 127 deletions .golangci.yml

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ func Floats[T float](size int) []T {
return out
}

func IntTests[T integer](tests, cap int) [][]T {
func IntTests[T integer](tests, max int) [][]T {
out := make([][]T, tests)

for i := range out {
out[i] = Ints[T](cap)
out[i] = Ints[T](max)
}

return out
}

func FloatTests[T float](tests, cap int) [][]T {
func FloatTests[T float](tests, max int) [][]T {
out := make([][]T, tests)

for i := range out {
out[i] = Floats[T](cap)
out[i] = Floats[T](max)
}

return out
Expand Down
2 changes: 2 additions & 0 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import "context"

// defaultCtx is the default context used by the stream package. This is
// hardcoded to context.Background() but can be overridden by the unit tests.
//
//nolint:gochecknoglobals // this is on purpose
var defaultCtx = context.Background()

// _ctx returns a valid Context with CancelFunc even if it the
Expand Down
90 changes: 87 additions & 3 deletions scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,38 @@ import (
// is blocking and the Wait time has been reached, then the Scaler will spawn
// a new layer2 which will increase throughput for the Scaler, and Scaler
// will attempt to send the data to the layer2 channel once more. This process
// will repeat until a successful send occurs. (This should only loop twice)
// will repeat until a successful send occurs. (This should only loop twice).
type Scaler[T, U any] struct {
Wait time.Duration
Life time.Duration
Fn InterceptFunc[T, U]

// WaitModifier is used to modify the Wait time based on the number of
// times the Scaler has scaled up. This is useful for systems
// that are CPU bound and need to scale up more quickly.
WaitModifier DurationScaler

wScale *DurationScaler
}

var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required")

// Exec starts the internal Scaler routine (the first layer of processing) and
// returns the output channel where the resulting data from the Fn function
// will be sent.
//
//nolint:funlen // This really can't be broken up any further
func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
ctx = _ctx(ctx)

// set the configured tick as a pointer for execution
s.wScale = &s.WaitModifier
// set the original wait time on the ticker
s.wScale.originalDuration = s.Wait

// Fn is REQUIRED!
if s.Fn == nil {
return nil, fmt.Errorf("invalid <nil> InterceptFunc")
return nil, ErrFnRequired
}

// Create outbound channel
Expand Down Expand Up @@ -81,6 +97,8 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
l2 := make(chan T)
ticker := time.NewTicker(s.Wait)
defer ticker.Stop()
step := 0
var stepMu sync.RWMutex

scaleLoop:
for {
Expand All @@ -102,8 +120,21 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
wg.Add(1)
wgMu.Unlock()

if !s.WaitModifier.inactive() {
stepMu.Lock()
step++
stepMu.Unlock()
}

go func() {
defer wg.Done()
if !s.WaitModifier.inactive() {
defer func() {
stepMu.Lock()
step--
stepMu.Unlock()
}()
}

Pipe(ctx, s.layer2(ctx, l2), out)
}()
Expand All @@ -112,9 +143,11 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
}
}

stepMu.RLock()
// Reset the ticker so that it does not immediately trip the
// case statement on loop.
ticker.Reset(s.Wait)
ticker.Reset(s.wScale.scaledDuration(s.Wait, step))
stepMu.RUnlock()
}
}
}()
Expand Down Expand Up @@ -181,3 +214,54 @@ func (s Scaler[T, U]) layer2(ctx context.Context, in <-chan T) <-chan U {

return out
}

// DurationScaler is used to modify the time.Duration of a ticker or timer based on
// a configured step value and modifier (between -1 and 1) value.
type DurationScaler struct {
// Interval is the number the current step must be divisible by in order
// to modify the time.Duration.
Interval int

// ScalingFactor is a value between -1 and 1 that is used to modify the
// time.Duration of a ticker or timer. The value is multiplied by
// the ScalingFactor is multiplied by the duration for scaling.
ScalingFactor float64

// originalDuration is the time.Duration that was passed to the
// Scaler. This is used to reset the time.Duration of the ticker
// or timer.
originalDuration time.Duration

// lastInterval is the lastInterval step that was used to modify
// the time.Duration.
lastInterval int
}

func (t *DurationScaler) inactive() bool {
return t.Interval == 0 ||
(t.ScalingFactor == 0 ||
t.ScalingFactor <= -1 ||
t.ScalingFactor >= 1)
}

// scaledDuration returns the modified time.Duration based on the current step (cStep).
func (t *DurationScaler) scaledDuration(
dur time.Duration,
currentInterval int,
) time.Duration {
if t.inactive() {
return dur
}

mod := t.ScalingFactor
if currentInterval <= t.lastInterval {
mod = -mod
}

if currentInterval%t.Interval == 0 {
t.lastInterval = currentInterval
return dur + time.Duration(float64(t.originalDuration)*mod)
}

return dur
}
117 changes: 113 additions & 4 deletions scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ScalerTest[U ~[]T, T comparable](
}

// Test that the scaler can be used with a nil context.
//nolint:staticcheck
//nolint:staticcheck // nil context on purpose
out, err := s.Exec(nil, testdata.Chan(ctx))
if err != nil {
t.Errorf("expected no error, got %v", err)
Expand Down Expand Up @@ -84,7 +84,7 @@ func Test_Scaler_Exec(t *testing.T) {
func Test_Scaler_NilFn(t *testing.T) {
s := Scaler[any, any]{}

//nolint:staticcheck
//nolint:staticcheck // nil context on purpose
_, err := s.Exec(nil, nil)
if err == nil {
t.Error("Expected error, got nil")
Expand All @@ -108,7 +108,7 @@ func Test_Scaler_NilCtx(t *testing.T) {
cancel()

// Test that the scaler can be used with a nil context.
//nolint:staticcheck
//nolint:staticcheck // nil context on purpose
out, err := s.Exec(nil, nil)
if err != nil {
t.Errorf("expected no error, got %v", err)
Expand All @@ -133,7 +133,7 @@ func Test_Scaler_CloseIn(t *testing.T) {
close(in)

// Test that the scaler can be used with a nil context.
//nolint:staticcheck
//nolint:staticcheck // nil context on purpose
out, err := s.Exec(nil, in)
if err != nil {
t.Errorf("expected no error, got %v", err)
Expand Down Expand Up @@ -286,3 +286,112 @@ func Test_Scaler_layer2_nosend(t *testing.T) {
t.Fatalf("expected 0 data to be sent, got 1")
}
}

func TestTickDur(t *testing.T) {
testCases := []struct {
name string
tick DurationScaler
duration time.Duration
currentStep int
expected time.Duration
}{
{
name: "Test case 1",
tick: DurationScaler{Interval: 3, ScalingFactor: 0.1, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 3,
expected: 11 * time.Second,
},
{
name: "Test case 2",
tick: DurationScaler{Interval: 5, ScalingFactor: -0.1, originalDuration: 20 * time.Second},
duration: 20 * time.Second,
currentStep: 10,
expected: 18 * time.Second,
},
{
name: "Test case 3",
tick: DurationScaler{Interval: 2, ScalingFactor: 0.5, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 4,
expected: 15 * time.Second,
},
{
name: "Test case 4",
tick: DurationScaler{Interval: 4, ScalingFactor: -0.5, originalDuration: 30 * time.Second},
duration: 30 * time.Second,
currentStep: 8,
expected: 15 * time.Second,
},
{
name: "Test case 5",
tick: DurationScaler{Interval: 3, ScalingFactor: 0.1, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 2,
expected: 10 * time.Second,
},
{
name: "Test case 6: Step is divisible, modifier in range",
tick: DurationScaler{Interval: 3, ScalingFactor: 0.1, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 3,
expected: 11 * time.Second,
},
{
name: "Test case 7: Step is not divisible, modifier in range",
tick: DurationScaler{Interval: 3, ScalingFactor: 0.1, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 2,
expected: 10 * time.Second,
},
{
name: "Test case 8: Step is divisible, modifier is zero",
tick: DurationScaler{Interval: 3, ScalingFactor: 0, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 3,
expected: 10 * time.Second,
},
{
name: "Test case 9: Step is divisible, modifier is out of range",
tick: DurationScaler{Interval: 3, ScalingFactor: 1, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 3,
expected: 10 * time.Second,
},
{
name: "Test case 10: Step is zero, modifier in range",
tick: DurationScaler{Interval: 0, ScalingFactor: 0.1, originalDuration: 10 * time.Second},
duration: 10 * time.Second,
currentStep: 3,
expected: 10 * time.Second,
},
{
name: "Test case 6: Step number decreases",
tick: DurationScaler{
Interval: 2,
ScalingFactor: 0.5,
originalDuration: 10 * time.Second,
lastInterval: 4,
},
duration: 15 * time.Second,
currentStep: 2,
expected: 10 * time.Second,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := (&tc.tick).scaledDuration(tc.duration, tc.currentStep)
if result != tc.expected {
t.Errorf("Expected: %v, got: %v", tc.expected, result)
}
})
}
}

func FuzzTick(f *testing.F) {
f.Fuzz(func(t *testing.T, step, cStep int, mod float64, orig, dur int64) {
tick := &DurationScaler{Interval: step, ScalingFactor: mod, originalDuration: time.Duration(orig)}
_ = tick.scaledDuration(time.Duration(dur), cStep)
})
}
2 changes: 1 addition & 1 deletion stream_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func Benchmark_Scaler(b *testing.B) {

for n := 0; n < b.N; n++ {
// Test that the scaler can be used with a nil context.
//nolint:staticcheck
//nolint:staticcheck // nil context on purpose
out, err := s.Exec(nil, testdata.Chan(ctx))
if err != nil {
b.Errorf("expected no error, got %v", err)
Expand Down
12 changes: 6 additions & 6 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func Test_Intercept_NotOk(t *testing.T) {
}
}

func Test_Intercept_ClosedChan(t *testing.T) {
func Test_Intercept_ClosedChan(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -322,7 +322,7 @@ func Test_Intercept_Canceled_On_Wait(t *testing.T) {
}
}

func Test_FanOut_Canceled_On_Wait(t *testing.T) {
func Test_FanOut_Canceled_On_Wait(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -415,7 +415,7 @@ func Test_Distribute(t *testing.T) {
DistributeTest(t, "float64", FloatTests[float64](100, 1000))
}

func Test_Distribute_Canceled_On_Wait(t *testing.T) {
func Test_Distribute_Canceled_On_Wait(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -431,7 +431,7 @@ func Test_Distribute_Canceled_On_Wait(t *testing.T) {
Distribute(ctx, in, out)
}

func Test_Distribute_ZeroOut(t *testing.T) {
func Test_Distribute_ZeroOut(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -501,7 +501,7 @@ func Test_FanOut(t *testing.T) {
}
}

func Test_FanOut_ZeroOut(t *testing.T) {
func Test_FanOut_ZeroOut(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -511,7 +511,7 @@ func Test_FanOut_ZeroOut(t *testing.T) {
FanOut(ctx, in)
}

func Test_FanIn_ZeroIn(t *testing.T) {
func Test_FanIn_ZeroIn(_ *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down

1 comment on commit 44da329

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Benchmark Results'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 44da329 Previous: 11f75df Ratio
Benchmark_Scaler 3868952 ns/op 829605 ns/op 4.66

This comment was automatically generated by workflow using github-action-benchmark.

CC: @benjivesterby

Please sign in to comment.