Skip to content

Commit

Permalink
maintain local quota buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
theganyo committed May 19, 2020
1 parent 1b7bf06 commit 4057296
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 27 deletions.
50 changes: 47 additions & 3 deletions quota/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@ import (
"fmt"
"net/http"
"path"
"strings"
"sync"
"time"

"github.com/apigee/apigee-remote-service-golib/log"
)

const (
quotaSecond = "second"
quotaMinute = "minute"
quotaHour = "hour"
quotaDay = "day"
quotaMonth = "month"
)

// bucket tracks a specific quota instance
type bucket struct {
manager *manager
Expand All @@ -38,23 +47,26 @@ type bucket struct {
checked time.Time // last apply time
refreshAfter time.Duration // duration after synced
deleteAfter time.Duration // duration after checked
invalidAfter time.Time // result window is no longer valid after this
}

func newBucket(req Request, m *manager) *bucket {
req.TimeUnit = strings.ToLower(req.TimeUnit)
quotaURL := *m.baseURL
quotaURL.Path = path.Join(quotaURL.Path, quotaPath)
return &bucket{
b := &bucket{
request: &req,
manager: m,
quotaURL: quotaURL.String(),
result: nil,
created: m.now(),
checked: m.now(),
lock: sync.RWMutex{},
deleteAfter: defaultDeleteAfter,
refreshAfter: defaultRefreshAfter,
}
b.result = &Result{
ExpiryTime: calcLocalExpiry(b.now(), req.Interval, req.TimeUnit).Unix(),
}
return b
}

func (b *bucket) now() time.Time {
Expand All @@ -76,6 +88,14 @@ func (b *bucket) apply(req *Request) (*Result, error) {
ExpiryTime: b.checked.Unix(),
Timestamp: b.checked.Unix(),
}

if b.windowExpired() {
b.result.Used = 0
b.result.Exceeded = 0
b.result.ExpiryTime = calcLocalExpiry(b.now(), req.Interval, req.TimeUnit).Unix()
b.request.Weight = 0
}

if b.result != nil {
res.Used = b.result.Used // start from last result
res.Used += b.result.Exceeded
Expand Down Expand Up @@ -191,3 +211,27 @@ func (b *bucket) windowExpired() bool {
}
return false
}

func calcLocalExpiry(now time.Time, interval int64, timeUnit string) time.Time {

var expiry time.Time
switch timeUnit {
case quotaSecond:
start := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, now.Location())
expiry = start.Add(time.Duration(interval) * time.Second)
case quotaMinute:
start := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())
expiry = start.Add(time.Duration(interval) * time.Minute)
case quotaHour:
start := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
expiry = start.Add(time.Duration(interval) * time.Hour)
case quotaDay:
start := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
expiry = start.AddDate(0, 0, int(interval))
case quotaMonth:
start := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location())
expiry = start.AddDate(0, int(interval), 0)
}

return expiry.Add(-time.Second)
}
51 changes: 47 additions & 4 deletions quota/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func TestBucket(t *testing.T) {
Allow: 4,
Weight: 1,
},
&Result{Used: 2},
&Result{
Used: 2,
ExpiryTime: now().Unix(),
},
&Request{
Allow: 4,
Weight: 1,
Expand All @@ -71,7 +74,10 @@ func TestBucket(t *testing.T) {
Allow: 7,
Weight: 3,
},
&Result{Used: 3},
&Result{
Used: 3,
ExpiryTime: now().Unix(),
},
&Request{
Allow: 7,
Weight: 2,
Expand All @@ -89,8 +95,9 @@ func TestBucket(t *testing.T) {
Allow: 3,
},
&Result{
Used: 3,
Exceeded: 1,
Used: 3,
Exceeded: 1,
ExpiryTime: now().Unix(),
},
&Request{
Allow: 3,
Expand Down Expand Up @@ -216,3 +223,39 @@ func TestNeedToSync(t *testing.T) {
}
}
}

func TestCalcLocalExpiry(t *testing.T) {

now, _ := time.Parse(time.RFC1123, "Mon, 31 Mar 2006 23:59:59 PST")
nowStartMinute, _ := time.Parse(time.RFC1123, "Mon, 31 Mar 2006 23:59:00 PST")
nowStartHour, _ := time.Parse(time.RFC1123, "Mon, 31 Mar 2006 23:00:00 PST")
nowStartDay, _ := time.Parse(time.RFC1123, "Mon, 31 Mar 2006 00:00:00 PST")
nowStartMonth, err := time.Parse(time.RFC1123, "Mon, 01 Mar 2006 00:00:00 PST")
if err != nil {
t.Fatal(err)
}

tests := []struct {
interval int64
quotaLength string
want time.Time
}{
{1, quotaSecond, now},
{2, quotaSecond, now.Add(time.Second)},
{1, quotaMinute, nowStartMinute.Add(time.Minute).Add(-time.Second)},
{2, quotaMinute, nowStartMinute.Add(2 * time.Minute).Add(-time.Second)},
{1, quotaHour, nowStartHour.Add(time.Hour).Add(-time.Second)},
{2, quotaHour, nowStartHour.Add(2 * time.Hour).Add(-time.Second)},
{1, quotaDay, nowStartDay.AddDate(0, 0, 1).Add(-time.Second)},
{2, quotaDay, nowStartDay.AddDate(0, 0, 2).Add(-time.Second)},
{1, quotaMonth, nowStartMonth.AddDate(0, 1, 0).Add(-time.Second)},
{2, quotaMonth, nowStartMonth.AddDate(0, 2, 0).Add(-time.Second)},
}

for _, tst := range tests {
got := calcLocalExpiry(now, tst.interval, tst.quotaLength)
if got != tst.want {
t.Errorf("%d %s got: %v, want: %v", tst.interval, tst.quotaLength, got, tst.want)
}
}
}
1 change: 1 addition & 0 deletions quota/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (m *manager) syncBucketWorker() {
if _, ok := m.syncingBuckets[bucket]; !ok {
m.syncingBuckets[bucket] = struct{}{}
m.syncingBucketsLock.Unlock()
// TODO: ideally, this should have a backoff on it
bucket.sync()
m.syncingBucketsLock.Lock()
delete(m.syncingBuckets, bucket)
Expand Down
96 changes: 76 additions & 20 deletions quota/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestQuota(t *testing.T) {
p := &product.APIProduct{
QuotaLimitInt: 1,
QuotaIntervalInt: 1,
QuotaTimeUnit: "second",
QuotaTimeUnit: quotaMonth,
}

args := Args{
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestQuota(t *testing.T) {
p2 := &product.APIProduct{
QuotaLimitInt: 1,
QuotaIntervalInt: 2,
QuotaTimeUnit: "second",
QuotaTimeUnit: quotaSecond,
}
c := testcase{
name: "incompatible",
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestSync(t *testing.T) {
request := &Request{
Identifier: quotaID,
Interval: 1,
TimeUnit: "seconds",
TimeUnit: quotaSecond,
Allow: 1,
Weight: 3,
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestSync(t *testing.T) {
req := &Request{
Identifier: quotaID,
Interval: 1,
TimeUnit: "seconds",
TimeUnit: quotaSecond,
Allow: 1,
Weight: 2,
}
Expand Down Expand Up @@ -243,7 +243,8 @@ func TestSync(t *testing.T) {
}

func TestDisconnected(t *testing.T) {
now := func() time.Time { return time.Unix(1521221450, 0) }
fakeTime := int64(1521221450)
now := func() time.Time { return time.Unix(fakeTime, 0) }

errC := &errControl{
send: 404,
Expand All @@ -268,7 +269,6 @@ func TestDisconnected(t *testing.T) {
closed: make(chan bool),
client: http.DefaultClient,
now: now,
syncRate: 2 * time.Millisecond,
syncQueue: make(chan *bucket, 10),
baseURL: context.InternalAPI(),
numSyncWorkers: 1,
Expand All @@ -281,7 +281,7 @@ func TestDisconnected(t *testing.T) {
p := &product.APIProduct{
QuotaLimitInt: 1,
QuotaIntervalInt: 1,
QuotaTimeUnit: "second",
QuotaTimeUnit: quotaMinute,
}

args := Args{
Expand Down Expand Up @@ -322,6 +322,23 @@ func TestDisconnected(t *testing.T) {
if !reflect.DeepEqual(*res, wantResult) {
t.Errorf("result got: %#v, want: %#v", *res, wantResult)
}

// next window
fakeTime = fakeTime + 60
res, err = m.Apply(authContext, p, args)
if err != nil {
t.Fatalf("got error: %s", err)
}
wantResult = Result{
Allowed: 1,
Used: 1,
Exceeded: 0,
ExpiryTime: now().Unix(),
Timestamp: now().Unix(),
}
if !reflect.DeepEqual(*res, wantResult) {
t.Errorf("result got: %#v, want: %#v", *res, wantResult)
}
}

func TestWindowExpired(t *testing.T) {
Expand Down Expand Up @@ -351,7 +368,7 @@ func TestWindowExpired(t *testing.T) {
closed: make(chan bool),
client: http.DefaultClient,
now: now,
syncRate: 2 * time.Millisecond,
syncRate: time.Minute,
syncQueue: make(chan *bucket, 10),
baseURL: context.InternalAPI(),
numSyncWorkers: 1,
Expand All @@ -364,27 +381,31 @@ func TestWindowExpired(t *testing.T) {
p := &product.APIProduct{
QuotaLimitInt: 1,
QuotaIntervalInt: 1,
QuotaTimeUnit: "second",
QuotaTimeUnit: quotaSecond,
}

args := Args{
QuotaAmount: 1,
BestEffort: true,
}

// apply and force a sync
res, err := m.Apply(authContext, p, args)
m.forceSync(getQuotaID(authContext, p))

quotaID := fmt.Sprintf("%s-%s", authContext.Application, p.Name)
bucket := m.buckets[quotaID]

if bucket.request.Weight != 0 {
t.Errorf("got: %d, want: %d", bucket.request.Weight, 0)
}
if res.Used != 1 {
t.Errorf("got: %d, want: %d", res.Used, 1)
}
if res.Exceeded != 0 {
t.Errorf("got: %d, want: %d", res.Exceeded, 0)
}

// move to the next window
if bucket.windowExpired() {
t.Errorf("should not be expired")
}
fakeTime++
if !bucket.windowExpired() {
t.Errorf("should be expired")
Expand All @@ -394,20 +415,53 @@ func TestWindowExpired(t *testing.T) {
if err != nil {
t.Errorf("got error: %v", err)
}
if bucket.request.Weight != 1 {
t.Errorf("got: %d, want: %d", bucket.request.Weight, 1)
if res.Used != 1 {
t.Errorf("got: %d, want: %d", res.Used, 1)
}
if res.Exceeded != 0 {
t.Errorf("got: %d, want: %d", res.Exceeded, 0)
}

err = bucket.sync() // after window expiration, should reset
res, err = m.Apply(authContext, p, args)
if err != nil {
t.Errorf("got error: %v", err)
}
if bucket.result.Used != 1 {
t.Errorf("got: %d, want: %d", bucket.result.Used, 1)
if res.Used != 1 {
t.Errorf("got: %d, want: %d", res.Used, 1)
}
if res.Exceeded != 1 {
t.Errorf("got: %d, want: %d", res.Exceeded, 1)
}

// move to the next window
fakeTime++
if !bucket.windowExpired() {
t.Errorf("should be expired")
}

res, err = m.Apply(authContext, p, args)
if err != nil {
t.Errorf("got error: %v", err)
}
if bucket.result.Exceeded != 0 {
t.Errorf("got: %d, want: %d", bucket.result.Exceeded, 0)
if res.Used != 1 {
t.Errorf("got: %d, want: %d", res.Used, 1)
}
if res.Exceeded != 0 {
t.Errorf("got: %d, want: %d", res.Exceeded, 0)
}

// res, err = m.Apply(authContext, p, args)
// bucket = m.buckets[quotaID]
// err = bucket.sync() // after window expiration, should reset
// if err != nil {
// t.Errorf("got error: %v", err)
// }
// if bucket.result.Used != 1 {
// t.Errorf("got: %d, want: %d", bucket.result.Used, 1)
// }
// if bucket.result.Exceeded != 0 {
// t.Errorf("got: %d, want: %d", bucket.result.Exceeded, 0)
// }
}

type errControl struct {
Expand Down Expand Up @@ -450,8 +504,10 @@ func (m *manager) forceSync(quotaID string) error {
m.bucketsLock.RLock()
b, ok := m.buckets[quotaID]
if !ok {
m.bucketsLock.RUnlock()
return nil
}
m.bucketsLock.RUnlock()
m.syncingBucketsLock.Lock()
m.syncingBuckets[b] = struct{}{}
m.syncingBucketsLock.Unlock()
Expand Down

0 comments on commit 4057296

Please sign in to comment.