Skip to content

Commit

Permalink
add test for limiter
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Jul 23, 2024
1 parent ca179e6 commit ab64f11
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package controller

import (
"context"
"fmt"
"math"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -212,3 +214,98 @@ func TestCancelErrorOfReservation(t *testing.T) {
re.Error(err)
re.Contains(err.Error(), "context canceled")
}

func TestQPS(t *testing.T) {
cases := []struct {
concurrency int
reserveN int64
RU_PER_SEC int64
}{
{10000, 1000, 4000000},
{5000, 1000, 4000000},
{3840, 1000, 4000000},
{1000, 1000, 4000000},
{1000, 200, 4000000},
{1000, 5000, 4000000},

{10000, 50, 400000},
{5000, 50, 400000},
{3840, 50, 400000},
{1000, 50, 400000},
{500, 50, 400000},
{200, 50, 400000},
{100, 50, 400000},
{1000, 10, 400000},
{1000, 250, 400000},

{10000, 500, 400000},
{5000, 500, 400000},
{3840, 500, 400000},
{1000, 500, 400000},
{1000, 100, 200000},
{1000, 100, 400000},
}

for _, tc := range cases {
t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) {
qps, ruSec, waitTime := testQPSCase(t, tc.concurrency, tc.reserveN, tc.RU_PER_SEC)
t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime))
})
}
}

func testQPSCase(t *testing.T, concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) {
nc := make(chan notifyMsg, 1)
lim := NewLimiter(time.Now(), Limit(limit), limit, float64(limit), nc)
ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

var wg sync.WaitGroup
var totalRequests int64
start := time.Now()

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
r := lim.Reserve(context.Background(), 30*time.Second, time.Now(), float64(reserveN))
if r.OK() {
delay := r.DelayFrom(time.Now())
<-time.After(delay)
} else {
panic("r not ok")
}
atomic.AddInt64(&totalRequests, 1)
}
}()
}
qps := float64(0)
var wait time.Duration
ch := make(chan struct{})
go func() {
var windowRequests int64
for {
elapsed := time.Since(start)
if elapsed >= 20*time.Second {
close(ch)
break
}
windowRequests = atomic.SwapInt64(&totalRequests, 0)
qps = float64(windowRequests)
r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN))
wait = r.Delay()
// fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait)
time.Sleep(1 * time.Second)
}
}()
<-ch
cancel()
wg.Wait()
return qps, qps * float64(reserveN), wait
}

0 comments on commit ab64f11

Please sign in to comment.