Skip to content

Commit

Permalink
Add rolling window QPS tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Sep 19, 2024
1 parent 811ffe7 commit 218ec9c
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 5 deletions.
72 changes: 72 additions & 0 deletions common/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,75 @@ func (r *emaFixedWindowQPSTracker) ReportCounter(delta int64) {
func (r *emaFixedWindowQPSTracker) QPS() float64 {
return r.qps.Load()
}

type (
bucket struct {
counter int64
timeIndex int
}
rollingWindowQPSTracker struct {
sync.RWMutex
timeSource clock.TimeSource
bucketInterval time.Duration
buckets []bucket

counter int64
timeIndex int
}
)

func NewRollingWindowQPSTracker(timeSource clock.TimeSource, bucketInterval time.Duration, numBuckets int) QPSTracker {
return &rollingWindowQPSTracker{
timeSource: timeSource,
bucketInterval: bucketInterval,
buckets: make([]bucket, numBuckets),
}
}

func (r *rollingWindowQPSTracker) Start() {
}

func (r *rollingWindowQPSTracker) Stop() {
}

func (r *rollingWindowQPSTracker) getCurrentTimeIndex() int {
now := r.timeSource.Now()
return int(now.UnixNano() / int64(r.bucketInterval))
}

func (r *rollingWindowQPSTracker) ReportCounter(delta int64) {
r.Lock()
defer r.Unlock()
currentIndex := r.getCurrentTimeIndex()
if currentIndex == r.timeIndex {
r.counter += delta
return
}
r.buckets[r.timeIndex%len(r.buckets)] = bucket{
counter: r.counter,
timeIndex: r.timeIndex,
}
r.timeIndex = currentIndex
r.counter = delta
}

func (r *rollingWindowQPSTracker) QPS() float64 {
r.RLock()
defer r.RUnlock()
currentIndex := r.getCurrentTimeIndex()
if currentIndex != r.timeIndex {
r.buckets[r.timeIndex%len(r.buckets)] = bucket{
counter: r.counter,
timeIndex: r.timeIndex,
}
r.timeIndex = currentIndex
r.counter = 0
}
totalCounter := int64(0)
for _, b := range r.buckets {
if currentIndex-b.timeIndex <= len(r.buckets) {
totalCounter += b.counter
}
}
return float64(totalCounter) / float64(r.bucketInterval) / float64(len(r.buckets)) * float64(time.Second)
}
78 changes: 78 additions & 0 deletions common/stats/stats_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,81 @@ func BenchmarkFullReport(b *testing.B) {
b.StopTimer()
reporter.Stop()
}

func BenchmarkReportCounterRollingWindow(b *testing.B) {
timeSource := clock.NewRealTimeSource()
// Initialize the QPS reporter with a smoothing factor and a 1 second bucket interval
reporter := NewRollingWindowQPSTracker(timeSource, time.Second, 10)
reporter.Start()

// Run the benchmark for b.N iterations
b.ResetTimer()
for i := 0; i < b.N; i++ {
reporter.ReportCounter(1)
}

// Stop the reporter after the benchmark
b.StopTimer()
reporter.Stop()
}

func BenchmarkQPSRollingWindow(b *testing.B) {
timeSource := clock.NewRealTimeSource()
// Initialize the QPS reporter
reporter := NewRollingWindowQPSTracker(timeSource, time.Second, 10)
reporter.Start()

// Simulate a number of report updates before calling QPS
for i := 0; i < 1000; i++ {
reporter.ReportCounter(1)
}

// Benchmark QPS retrieval
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = reporter.QPS()
}

// Stop the reporter
b.StopTimer()
reporter.Stop()
}

// Benchmark the full reporting loop, simulating a real-time system.
func BenchmarkFullReportRollingWindow(b *testing.B) {
timeSource := clock.NewRealTimeSource()
// Initialize the QPS reporter
reporter := NewRollingWindowQPSTracker(timeSource, time.Millisecond*100, 10) // 100ms bucket interval
reporter.Start()

var wg sync.WaitGroup
// Number of goroutines for each task
numReporters := 10
numQPSQueries := 10
b.ResetTimer()

for i := 0; i < numReporters; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
// Report random counter value (simulate workload)
reporter.ReportCounter(1)
}
}()
}
for i := 0; i < numQPSQueries; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
// Query QPS value (simulate workload)
_ = reporter.QPS()
}
}()
}
wg.Wait()
// Stop the reporter after the benchmark
b.StopTimer()
reporter.Stop()
}
73 changes: 68 additions & 5 deletions common/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,31 @@
package stats

import (
"math"
"testing"
"time"

"github.com/uber/cadence/common/clock"
)

const floatResolution = 1e-6

func TestEmaFixedWindowQPSTracker(t *testing.T) {
timeSource := clock.NewMockedTimeSourceAt(time.Now())
exp := 0.4
bucketInterval := time.Second

r := NewEmaFixedWindowQPSTracker(timeSource, exp, bucketInterval)
r.Start()
defer r.Stop()

// Test ReportCounter
r.ReportCounter(10)
r.ReportCounter(20)

qps := r.QPS()
if qps != 0 {
t.Errorf("QPS mismatch, expected: 0, got: %f", qps)
t.Errorf("QPS mismatch, expected: 0, got: %v", qps)
}

timeSource.BlockUntil(1)
Expand All @@ -52,8 +56,8 @@ func TestEmaFixedWindowQPSTracker(t *testing.T) {
// Test QPS
qps = r.QPS()
expectedQPS := float64(30) / (float64(bucketInterval) / float64(time.Second))
if qps != expectedQPS {
t.Errorf("QPS mismatch, expected: %f, got: %f", expectedQPS, qps)
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}

r.ReportCounter(10)
Expand All @@ -63,9 +67,68 @@ func TestEmaFixedWindowQPSTracker(t *testing.T) {
// Test QPS
qps = r.QPS()
expectedQPS = float64(22) / (float64(bucketInterval) / float64(time.Second))
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}
}

func TestRollingWindowQPSTracker(t *testing.T) {
timeSource := clock.NewMockedTimeSourceAt(time.Now())
bucketInterval := time.Second

r := NewRollingWindowQPSTracker(timeSource, bucketInterval, 10)
r.Start()
defer r.Stop()

r.ReportCounter(10)
qps := r.QPS()
if qps != 0 {
t.Errorf("QPS mismatch, expected: 0, got: %v", qps)
}

timeSource.Advance(bucketInterval)

qps = r.QPS()
expectedQPS := float64(10) / (float64(bucketInterval) * 10 / float64(time.Second))
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}

r.ReportCounter(20)
qps = r.QPS()
if qps != expectedQPS {
t.Errorf("QPS mismatch, expected: %f, got: %f", expectedQPS, qps)
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}

timeSource.Advance(bucketInterval)

qps = r.QPS()
expectedQPS = float64(30) / (float64(bucketInterval) * 10 / float64(time.Second))
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}

r.Stop()
r.ReportCounter(100)

timeSource.Advance(8 * bucketInterval)

qps = r.QPS()
expectedQPS = float64(130) / (float64(bucketInterval) * 10 / float64(time.Second))
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}

timeSource.Advance(bucketInterval)
qps = r.QPS()
expectedQPS = float64(120) / (float64(bucketInterval) * 10 / float64(time.Second))
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}

timeSource.Advance(bucketInterval)
qps = r.QPS()
expectedQPS = float64(100) / (float64(bucketInterval) * 10 / float64(time.Second))
if math.Abs(qps-expectedQPS) > floatResolution {
t.Errorf("QPS mismatch, expected: %v, got: %v", expectedQPS, qps)
}
}

0 comments on commit 218ec9c

Please sign in to comment.