Skip to content

Commit

Permalink
Merge pull request grafana#994 from grafana/batch-memcache
Browse files Browse the repository at this point in the history
Batch memcache requests
  • Loading branch information
tomwilkie authored Sep 14, 2018
2 parents 0694a69 + a9941d3 commit 675e1cc
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 29 deletions.
14 changes: 12 additions & 2 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,18 @@ func testCache(t *testing.T, cache cache.Cache) {
}

func TestMemcache(t *testing.T) {
cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache())
testCache(t, cache)
t.Run("Unbatched", func(t *testing.T) {
cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache())
testCache(t, cache)
})

t.Run("Batched", func(t *testing.T) {
cache := cache.NewMemcached(cache.MemcachedConfig{
BatchSize: 10,
Parallelism: 3,
}, newMockMemcache())
testCache(t, cache)
})
}

func TestDiskcache(t *testing.T) {
Expand Down
131 changes: 127 additions & 4 deletions cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ package cache
import (
"context"
"flag"
"sync"
"time"

"github.com/bradfitz/gomemcache/memcache"
opentracing "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
instr "github.com/weaveworks/common/instrument"
"github.com/weaveworks/cortex/pkg/util"
)

var (
Expand All @@ -27,17 +31,25 @@ func init() {
// MemcachedConfig is config to make a Memcached
type MemcachedConfig struct {
Expiration time.Duration

BatchSize int
Parallelism int
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.")
f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long keys stay in the memcache.")
f.IntVar(&cfg.BatchSize, "memcached.batchsize", 0, "How many keys to fetch in each batch.")
f.IntVar(&cfg.Parallelism, "memcached.parallelism", 100, "Maximum active requests to memcache.")
}

// Memcached type caches chunks in memcached
type Memcached struct {
cfg MemcachedConfig
memcache MemcachedClient

wg sync.WaitGroup
inputCh chan *work
}

// NewMemcached makes a new Memcache
Expand All @@ -46,9 +58,46 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient) *Memcached {
cfg: cfg,
memcache: client,
}

if cfg.BatchSize == 0 || cfg.Parallelism == 0 {
return c
}

c.inputCh = make(chan *work)
c.wg.Add(cfg.Parallelism)

for i := 0; i < cfg.Parallelism; i++ {
go func() {
for input := range c.inputCh {
res := &result{
batchID: input.batchID,
}
res.found, res.bufs, res.missed, res.err = c.fetch(input.ctx, input.keys)
input.resultCh <- res
}

c.wg.Done()
}()
}

return c
}

type work struct {
keys []string
ctx context.Context
resultCh chan<- *result
batchID int // For ordering results.
}

type result struct {
found []string
bufs [][]byte
missed []string
err error
batchID int // For ordering results.
}

func memcacheStatusCode(err error) string {
// See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
switch err {
Expand All @@ -63,17 +112,41 @@ func memcacheStatusCode(err error) string {
}
}

// Fetch gets keys from the cache.
// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested.
func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(ctx context.Context) error {
sp := opentracing.SpanFromContext(ctx)
sp.LogFields(otlog.Int("keys requested", len(keys)))
defer func() {
sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(missed)))
}()

var err error
if c.cfg.BatchSize == 0 {
found, bufs, missed, err = c.fetch(ctx, keys)
return err
}

found, bufs, missed, err = c.fetchKeysBatched(ctx, keys)
return err
})

return
}

func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
var items map[string]*memcache.Item
err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
err = UntracedCollectedRequest(ctx, "Memcache.GetMulti", instr.NewHistogramCollector(memcacheRequestDuration), memcacheStatusCode, func(_ context.Context) error {
var err error
items, err = c.memcache.GetMulti(keys)
return err
})

if err != nil {
missed = keys
return
}

for _, key := range keys {
item, ok := items[key]
if ok {
Expand All @@ -86,6 +159,50 @@ func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, b
return
}

func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) {
resultsCh := make(chan *result)
batchSize := c.cfg.BatchSize

go func() {
for i, j := 0, 0; i < len(keys); i += batchSize {
batchKeys := keys[i:util.Min(i+batchSize, len(keys))]
c.inputCh <- &work{
keys: batchKeys,
ctx: ctx,
resultCh: resultsCh,
batchID: j,
}
j++
}
}()

// Read all values from this channel to avoid blocking upstream.
numResults := len(keys) / batchSize
if len(keys)%batchSize != 0 {
numResults++
}

// We need to order found by the input keys order.
results := make([]*result, numResults)
for i := 0; i < numResults; i++ {
result := <-resultsCh
results[result.batchID] = result
}
close(resultsCh)

for _, result := range results {
if result.err != nil {
err = result.err
}

found = append(found, result.found...)
bufs = append(bufs, result.bufs...)
missed = append(missed, result.missed...)
}

return
}

// Store stores the key in the cache.
func (c *Memcached) Store(ctx context.Context, key string, buf []byte) error {
return instr.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
Expand All @@ -99,6 +216,12 @@ func (c *Memcached) Store(ctx context.Context, key string, buf []byte) error {
}

// Stop does nothing.
func (*Memcached) Stop() error {
func (c *Memcached) Stop() error {
if c.inputCh == nil {
return nil
}

close(c.inputCh)
c.wg.Wait()
return nil
}
39 changes: 39 additions & 0 deletions cache/memcached_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cache_test

import (
"sync"

"github.com/bradfitz/gomemcache/memcache"
)

type mockMemcache struct {
sync.RWMutex
contents map[string][]byte
}

func newMockMemcache() *mockMemcache {
return &mockMemcache{
contents: map[string][]byte{},
}
}

func (m *mockMemcache) GetMulti(keys []string) (map[string]*memcache.Item, error) {
m.RLock()
defer m.RUnlock()
result := map[string]*memcache.Item{}
for _, k := range keys {
if c, ok := m.contents[k]; ok {
result[k] = &memcache.Item{
Value: c,
}
}
}
return result, nil
}

func (m *mockMemcache) Set(item *memcache.Item) error {
m.Lock()
defer m.Unlock()
m.contents[item.Key] = item.Value
return nil
}
Loading

0 comments on commit 675e1cc

Please sign in to comment.