Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔥 Feature: Add max size to cache #1892

Merged
merged 5 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions middleware/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func New(config ...Config) fiber.Handler {
)
// Create manager to simplify storage operations ( see manager.go )
manager := newManager(cfg.Storage)
// Create indexed heap for tracking expirations ( see heap.go )
heap := &indexedHeap{}
// count stored bytes (sizes of response bodies)
var storedBytes uint = 0

// Update timestamp in the configured interval
go func() {
Expand All @@ -68,6 +72,15 @@ func New(config ...Config) fiber.Handler {
}
}()

// Delete key from both manager and storage
deleteKey := func(dkey string) {
manager.delete(dkey)
// External storage saves body data with different key
if cfg.Storage != nil {
manager.delete(dkey + "_body")
}
}

// Return new handler
return func(c *fiber.Ctx) error {
// Only cache GET and HEAD methods
Expand All @@ -89,12 +102,12 @@ func New(config ...Config) fiber.Handler {
// Get timestamp
ts := atomic.LoadUint64(&timestamp)

// Check if entry is expired
if e.exp != 0 && ts >= e.exp {
// Check if entry is expired
manager.delete(key)
// External storage saves body data with different key
if cfg.Storage != nil {
manager.delete(key + "_body")
deleteKey(key)
if cfg.MaxBytes > 0 {
_, size := heap.remove(e.heapidx)
storedBytes -= size
}
} else if e.exp != 0 {
// Separate body value to avoid msgp serialization
Expand Down Expand Up @@ -146,6 +159,22 @@ func New(config ...Config) fiber.Handler {
return nil
}

// Don't try to cache if body won't fit into cache
bodySize := uint(len(c.Response().Body()))
if cfg.MaxBytes > 0 && bodySize > cfg.MaxBytes {
c.Set(cfg.CacheHeader, cacheUnreachable)
return nil
}

// Remove oldest to make room for new
if cfg.MaxBytes > 0 {
for storedBytes+bodySize > cfg.MaxBytes {
key, size := heap.removeFirst()
deleteKey(key)
storedBytes -= size
}
}

// Cache response
e.body = utils.CopyBytes(c.Response().Body())
e.status = c.Response().StatusCode()
Expand Down Expand Up @@ -175,6 +204,12 @@ func New(config ...Config) fiber.Handler {
}
e.exp = ts + uint64(expiration.Seconds())

// Store entry in heap
if cfg.MaxBytes > 0 {
e.heapidx = heap.put(key, e.exp, bodySize)
storedBytes += bodySize
}

// For external Storage we store raw body separated
if cfg.Storage != nil {
manager.setRaw(key+"_body", e.body, expiration)
Expand Down
116 changes: 116 additions & 0 deletions middleware/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"strconv"
Expand Down Expand Up @@ -493,6 +494,88 @@ func Test_CustomCacheHeader(t *testing.T) {
utils.AssertEqual(t, cacheMiss, resp.Header.Get("Cache-Status"))
}

// Because time points are updated once every X milliseconds, entries in tests can often have
// equal expiration times and thus be in an random order. This closure hands out increasing
// time intervals to maintain strong ascending order of expiration
func stableAscendingExpiration() func(c1 *fiber.Ctx, c2 *Config) time.Duration {
i := 0
return func(c1 *fiber.Ctx, c2 *Config) time.Duration {
i += 1
return time.Hour * time.Duration(i)
}
}

func Test_Cache_MaxBytesOrder(t *testing.T) {
t.Parallel()

app := fiber.New()
app.Use(New(Config{
MaxBytes: 2,
ExpirationGenerator: stableAscendingExpiration(),
}))

app.Get("/*", func(c *fiber.Ctx) error {
return c.SendString("1")
})

cases := [][]string{
// Insert a, b into cache of size 2 bytes (responses are 1 byte)
{"/a", cacheMiss},
{"/b", cacheMiss},
{"/a", cacheHit},
{"/b", cacheHit},
// Add c -> a evicted
{"/c", cacheMiss},
{"/b", cacheHit},
// Add a again -> b evicted
{"/a", cacheMiss},
{"/c", cacheHit},
// Add b -> c evicted
{"/b", cacheMiss},
{"/c", cacheMiss},
}

for idx, tcase := range cases {
rsp, err := app.Test(httptest.NewRequest("GET", tcase[0], nil))
utils.AssertEqual(t, nil, err)
utils.AssertEqual(t, tcase[1], rsp.Header.Get("X-Cache"), fmt.Sprintf("Case %v", idx))
}
}

func Test_Cache_MaxBytesSizes(t *testing.T) {
t.Parallel()

app := fiber.New()

app.Use(New(Config{
MaxBytes: 7,
ExpirationGenerator: stableAscendingExpiration(),
}))

app.Get("/*", func(c *fiber.Ctx) error {
path := c.Context().URI().LastPathSegment()
size, _ := strconv.Atoi(string(path))
return c.Send(make([]byte, size))
})

cases := [][]string{
{"/1", cacheMiss},
{"/2", cacheMiss},
{"/3", cacheMiss},
{"/4", cacheMiss}, // 1+2+3+4 > 7 => 1,2 are evicted now
{"/3", cacheHit},
{"/1", cacheMiss},
{"/2", cacheMiss},
{"/8", cacheUnreachable}, // too big to cache -> unreachable
}

for idx, tcase := range cases {
rsp, err := app.Test(httptest.NewRequest("GET", tcase[0], nil))
utils.AssertEqual(t, nil, err)
utils.AssertEqual(t, tcase[1], rsp.Header.Get("X-Cache"), fmt.Sprintf("Case %v", idx))
}
}

// go test -v -run=^$ -bench=Benchmark_Cache -benchmem -count=4
func Benchmark_Cache(b *testing.B) {
app := fiber.New()
Expand Down Expand Up @@ -578,3 +661,36 @@ func Benchmark_Cache_AdditionalHeaders(b *testing.B) {
utils.AssertEqual(b, fiber.StatusTeapot, fctx.Response.Header.StatusCode())
utils.AssertEqual(b, []byte("foobar"), fctx.Response.Header.Peek("X-Foobar"))
}

func Benchmark_Cache_MaxSize(b *testing.B) {
// The benchmark is run with three different MaxSize parameters
// 1) 0: Tracking is disabled = no overhead
// 2) MaxInt32: Enough to store all entries = no removals
// 3) 100: Small size = constant insertions and removals
cases := []uint{0, math.MaxUint32, 100}
names := []string{"Disabled", "Unlim", "LowBounded"}
for i, size := range cases {
b.Run(names[i], func(b *testing.B) {
app := fiber.New()
app.Use(New(Config{MaxBytes: size}))

app.Get("/*", func(c *fiber.Ctx) error {
return c.Status(fiber.StatusTeapot).SendString("1")
})

h := app.Handler()
fctx := &fasthttp.RequestCtx{}
fctx.Request.Header.SetMethod("GET")

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
fctx.Request.SetRequestURI(fmt.Sprintf("/%v", n))
h(fctx)
}

utils.AssertEqual(b, fiber.StatusTeapot, fctx.Response.Header.StatusCode())
})
}
}
8 changes: 8 additions & 0 deletions middleware/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ type Config struct {
//
// Default: false
StoreResponseHeaders bool

// Max number of bytes of response bodies simultaneously stored in cache. When limit is reached,
// entries with the nearest expiration are deleted to make room for new.
// 0 means no limit
//
// Default: 0
MaxBytes uint
}

// ConfigDefault is the default config
Expand All @@ -73,6 +80,7 @@ var ConfigDefault = Config{
ExpirationGenerator: nil,
StoreResponseHeaders: false,
Storage: nil,
MaxBytes: 0,
}

// Helper function to set default values
Expand Down
92 changes: 92 additions & 0 deletions middleware/cache/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package cache

import (
"container/heap"
)

type heapEntry struct {
key string
exp uint64
bytes uint
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entry size is stored in the heap to update the total size without reading entries on delete

idx int
}

// indexedHeap is a regular min-heap that allows finding
// elements in constant time. It does so by handing out special indices
// and tracking entry movement.
//
// indexdedHeap is used for quickly finding entries with the lowest
// expiration timestamp and deleting arbitrary entries.
type indexedHeap struct {
// Slice the heap is built on
entries []heapEntry
// Mapping "index" to position in heap slice
indices []int
// Max index handed out
maxidx int
}

func (h indexedHeap) Len() int {
return len(h.entries)
}

func (h indexedHeap) Less(i, j int) bool {
return h.entries[i].exp < h.entries[j].exp
}

func (h indexedHeap) Swap(i, j int) {
h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
h.indices[h.entries[i].idx] = i
h.indices[h.entries[j].idx] = j
}

func (h *indexedHeap) Push(x interface{}) {
h.pushInternal(x.(heapEntry))
}

func (h *indexedHeap) Pop() interface{} {
n := len(h.entries)
h.entries = h.entries[0 : n-1]
return h.entries[0:n][n-1]
}

func (h *indexedHeap) pushInternal(entry heapEntry) {
h.indices[entry.idx] = len(h.entries)
h.entries = append(h.entries, entry)
}

// Returns index to track entry
func (h *indexedHeap) put(key string, exp uint64, bytes uint) int {
idx := 0
if len(h.entries) < h.maxidx {
// Steal index from previously removed entry
// capacity > size is guaranteed
n := len(h.entries)
idx = h.entries[:n+1][n].idx
} else {
idx = h.maxidx
h.maxidx += 1
h.indices = append(h.indices, idx)
}
// Push manually to avoid allocation
h.pushInternal(heapEntry{
key: key, exp: exp, idx: idx, bytes: bytes,
})
heap.Fix(h, h.Len()-1)
return idx
}

func (h *indexedHeap) removeInternal(realIdx int) (string, uint) {
x := heap.Remove(h, realIdx).(heapEntry)
return x.key, x.bytes
}

// Remove entry by index
func (h *indexedHeap) remove(idx int) (string, uint) {
return h.removeInternal(h.indices[idx])
}

// Remove entry with lowest expiration time
func (h *indexedHeap) removeFirst() (string, uint) {
return h.removeInternal(0)
}
2 changes: 2 additions & 0 deletions middleware/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type item struct {
status int
exp uint64
headers map[string][]byte
// used for finding the item in an indexed heap
heapidx int
}

//msgp:ignore manager
Expand Down
Loading