Skip to content

Commit

Permalink
feat: wrapper sliding window with custom counter for QPS (#2359)
Browse files Browse the repository at this point in the history
  • Loading branch information
ev1lQuark authored Jul 19, 2023
1 parent 5f72883 commit 10336a5
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 0 deletions.
84 changes: 84 additions & 0 deletions metrics/util/aggregate/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

import (
"sync"
"time"
)

// TimeWindowCounter wrappers sliding window around a counter.
//
// It is concurrent-safe.
// When it works for calculating QPS, the counter's value means the number of requests in a pane.
type TimeWindowCounter struct {
window *slidingWindow
mux sync.RWMutex
}

func NewTimeWindowCounter(paneCount int, timeWindowSeconds int64) *TimeWindowCounter {
return &TimeWindowCounter{
window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
}
}

// Count returns the sum of all panes' value.
func (t *TimeWindowCounter) Count() float64 {
t.mux.RLock()
defer t.mux.RUnlock()

total := float64(0)
for _, v := range t.window.values(time.Now().UnixMilli()) {
total += v.(*counter).value
}
return total
}

// LivedSeconds returns the lived seconds of the sliding window.
func (t *TimeWindowCounter) LivedSeconds() int64 {
t.mux.RLock()
defer t.mux.RUnlock()

windowLength := len(t.window.values(time.Now().UnixMilli()))
return int64(windowLength) * t.window.paneIntervalInMs / 1000
}

// Add adds a step to the counter.
func (t *TimeWindowCounter) Add(step float64) {
t.mux.Lock()
defer t.mux.Unlock()

t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*counter).add(step)
}

// Inc increments the counter by 1.
func (t *TimeWindowCounter) Inc() {
t.Add(1)
}

func (t *TimeWindowCounter) newEmptyValue() interface{} {
return &counter{0}
}

type counter struct {
value float64
}

func (c *counter) add(v float64) {
c.value += v
}
99 changes: 99 additions & 0 deletions metrics/util/aggregate/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

import (
"testing"
"time"
)

func TestTimeWindowCounterCount(t1 *testing.T) {
tests := []struct {
name string
queryTimes int
want float64
}{
{
name: "Query Times: 0",
queryTimes: 0,
want: 0,
},
{
name: "Query Times: 3",
queryTimes: 3,
want: 3,
},
{
name: "Query Times: 10",
queryTimes: 10,
want: 10,
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := NewTimeWindowCounter(10, 1)
for i := 0; i < tt.queryTimes; i++ {
if i%3 == 0 {
time.Sleep(time.Millisecond * 100)
}
t.Inc()
}
if got := t.Count(); got != tt.want {
t1.Errorf("Count() = %v, want %v", got, tt.want)
}
})
}
}

func TestTimeWindowCounterLivedSeconds(t1 *testing.T) {
tests := []struct {
name string
queryTimes int
want int64
}{
{
name: "Query Times: 0",
queryTimes: 0,
want: 0,
},
{
name: "Query Times: 3",
queryTimes: 3,
want: 1,
},
{
name: "Query Times: 9",
queryTimes: 9,
want: 3,
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := NewTimeWindowCounter(10, 10)
for i := 0; i < tt.queryTimes; i++ {
if i%3 == 0 {
time.Sleep(time.Second * 1)
}
t.Inc()
}
if got := t.LivedSeconds(); got != tt.want {
t1.Errorf("Count() = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit 10336a5

Please sign in to comment.