diff --git a/scaler_test.go b/scaler_test.go index b8e49f5..a0b9348 100644 --- a/scaler_test.go +++ b/scaler_test.go @@ -3,6 +3,7 @@ package stream import ( "context" "fmt" + "sync" "testing" "time" @@ -406,7 +407,6 @@ func FuzzTick(f *testing.F) { } func FuzzScaler(f *testing.F) { - // Define InterceptFunc interceptFunc := func(ctx context.Context, t int) (string, bool) { return fmt.Sprintf("%d", t), true } @@ -464,3 +464,129 @@ func FuzzScaler(f *testing.F) { } }) } + +func Test_Scaler_Max(t *testing.T) { + tests := map[string]struct { + max uint + send int + expected int + }{ + "max 0": { + max: 0, + send: 1000, + expected: 1000, + }, + "max 1": { + max: 1, + send: 10, + expected: 10, + }, + "max 2": { + max: 2, + send: 10, + expected: 10, + }, + "max 3": { + max: 3, + send: 10, + expected: 10, + }, + "max 4": { + max: 4, + send: 100, + expected: 100, + }, + "max 1000": { + max: 1000, + send: 10000, + expected: 10000, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + inited := 0 + initedMu := sync.Mutex{} + release := make(chan struct{}) + + interceptFunc := func(ctx context.Context, t int) (int, bool) { + defer func() { + initedMu.Lock() + defer initedMu.Unlock() + inited-- + }() + + initedMu.Lock() + inited++ + initedMu.Unlock() + + <-release + + return t, true + } + + // Initialize Scaler + scaler := Scaler[int, int]{ + Wait: time.Millisecond, + Life: time.Millisecond, + Fn: interceptFunc, + Max: test.max, + } + + // Create a simple input channel + input := make(chan int, test.send) + defer close(input) + + for i := 0; i < test.send; i++ { + input <- i + } + + // Execute the Scaler + out, err := scaler.Exec(ctx, input) + if err != nil { + t.Errorf("Scaler Exec failed: %v", err) + t.Fail() + } + + recv := 1 + + tloop: + for { + select { + case <-ctx.Done(): + t.Errorf("Scaler Exec timed out") + case _, ok := <-out: + if !ok { + break tloop + } + + recv++ + t.Logf("received %d", recv) + if recv >= test.expected { + break tloop + } + default: + time.Sleep(time.Millisecond) + + initedMu.Lock() + if test.max > 0 && inited > int(test.max) { + t.Errorf("Scaler Exec failed: expected %d, got %d", test.max, inited) + t.Fail() + } + initedMu.Unlock() + + // Release one goroutine + release <- struct{}{} + } + } + + if recv != test.expected { + t.Errorf("Scaler Exec failed: expected %d, got %d", test.expected, recv) + t.Fail() + } + }) + } +}