From 018b375bf695670e7aee91b5af5de97c210e8159 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 10 Sep 2018 17:01:22 +0530 Subject: [PATCH 1/4] Batch memcache requests Signed-off-by: Goutham Veeramachaneni --- cache/cache_test.go | 16 ++- cache/memcached.go | 132 +++++++++++++++++- ...ached_test.go => memcached_client_test.go} | 0 3 files changed, 141 insertions(+), 7 deletions(-) rename cache/{memcached_test.go => memcached_client_test.go} (100%) diff --git a/cache/cache_test.go b/cache/cache_test.go index a22fcb56b17e..c2869030d046 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -35,7 +35,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, promChunk[0], ts, @@ -142,8 +142,18 @@ func testCache(t *testing.T, cache cache.Cache) { } func TestMemcache(t *testing.T) { - cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache()) - testCache(t, cache) + t.Run("Unbatched", func(t *testing.T) { + cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache()) + testCache(t, cache) + }) + + t.Run("Batched", func(t *testing.T) { + cache := cache.NewMemcached(cache.MemcachedConfig{ + BatchSize: 10, + Parallelism: 3, + }, newMockMemcache()) + testCache(t, cache) + }) } func TestDiskcache(t *testing.T) { diff --git a/cache/memcached.go b/cache/memcached.go index 4e477123826e..4e46c7e65aee 100644 --- a/cache/memcached.go +++ b/cache/memcached.go @@ -3,11 +3,15 @@ package cache import ( "context" "flag" + "sync" "time" "github.com/bradfitz/gomemcache/memcache" + opentracing "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" instr "github.com/weaveworks/common/instrument" + "github.com/weaveworks/cortex/pkg/util" ) var ( @@ -27,17 +31,25 @@ func init() { // MemcachedConfig is config to make a Memcached type MemcachedConfig struct { Expiration time.Duration + + BatchSize int + Parallelism int } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) { - f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") + f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long keys stay in the memcache.") + f.IntVar(&cfg.BatchSize, "memcached.batchsize", 0, "How many keys to fetch in each batch.") + f.IntVar(&cfg.Parallelism, "memcached.parallelism", 100, "Maximum active requests to memcache.") } // Memcached type caches chunks in memcached type Memcached struct { cfg MemcachedConfig memcache MemcachedClient + + wg sync.WaitGroup + inputCh chan *work } // NewMemcached makes a new Memcache @@ -46,9 +58,46 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient) *Memcached { cfg: cfg, memcache: client, } + + if cfg.BatchSize == 0 || cfg.Parallelism == 0 { + return c + } + + c.inputCh = make(chan *work) + c.wg.Add(cfg.Parallelism) + + for i := 0; i < cfg.Parallelism; i++ { + go func() { + for input := range c.inputCh { + res := &result{ + batchID: input.batchID, + } + res.found, res.bufs, res.missed, res.err = c.fetch(input.ctx, input.keys) + input.resultCh <- res + } + + c.wg.Done() + }() + } + return c } +type work struct { + keys []string + ctx context.Context + resultCh chan<- *result + batchID int // For ordering results. +} + +type result struct { + found []string + bufs [][]byte + missed []string + err error + batchID int // For ordering results. +} + func memcacheStatusCode(err error) string { // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables switch err { @@ -63,14 +112,36 @@ func memcacheStatusCode(err error) string { } } -// Fetch gets keys from the cache. +// Fetch gets keys from the cache. The keys that are found must be in the order of the keys requested. func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { + err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(ctx context.Context) error { + sp := opentracing.SpanFromContext(ctx) + sp.LogFields(otlog.Int("keys requested", len(keys))) + defer func() { + sp.LogFields(otlog.Int("keys found", len(found)), otlog.Int("keys missing", len(missed))) + }() + + var err error + if c.cfg.BatchSize == 0 { + found, bufs, missed, err = c.fetch(ctx, keys) + return err + } + + found, bufs, missed, err = c.fetchKeysBatched(ctx, keys) + return err + }) + + return +} + +func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { var items map[string]*memcache.Item - err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + err = UntracedCollectedRequest(ctx, "Memcache.GetMulti", instr.NewHistogramCollector(memcacheRequestDuration), memcacheStatusCode, func(_ context.Context) error { var err error items, err = c.memcache.GetMulti(keys) return err }) + if err != nil { return } @@ -86,6 +157,53 @@ func (c *Memcached) Fetch(ctx context.Context, keys []string) (found []string, b return } +func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { + resultsCh := make(chan *result) + batchSize := c.cfg.BatchSize + + go func() { + for i, j := 0, 0; i < len(keys); i += batchSize { + batchKeys := keys[i:util.Min(i+batchSize, len(keys))] + c.inputCh <- &work{ + keys: batchKeys, + ctx: ctx, + resultCh: resultsCh, + batchID: j, + } + j++ + } + }() + + // Read all values from this channel to avoid blocking upstream. + numResults := len(keys) / batchSize + if len(keys)%batchSize != 0 { + numResults++ + } + + // We need to order found by the input keys order. + results := make([]*result, numResults) + for i := 0; i < numResults; i++ { + result := <-resultsCh + results[result.batchID] = result + } + close(resultsCh) + + for _, result := range results { + // TODO(gouthamve): One call may fail while everything else succeeds. Put the + // failed call keys in missed then. + if result.err != nil { + err = result.err + continue + } + + found = append(found, result.found...) + bufs = append(bufs, result.bufs...) + missed = append(missed, result.missed...) + } + + return +} + // Store stores the key in the cache. func (c *Memcached) Store(ctx context.Context, key string, buf []byte) error { return instr.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { @@ -99,6 +217,12 @@ func (c *Memcached) Store(ctx context.Context, key string, buf []byte) error { } // Stop does nothing. -func (*Memcached) Stop() error { +func (c *Memcached) Stop() error { + if c.inputCh == nil { + return nil + } + + close(c.inputCh) + c.wg.Wait() return nil } diff --git a/cache/memcached_test.go b/cache/memcached_client_test.go similarity index 100% rename from cache/memcached_test.go rename to cache/memcached_client_test.go From f39dff635a4a8f87d7cc2dcc17427049388f1f3b Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 11 Sep 2018 15:45:55 +0530 Subject: [PATCH 2/4] Add a simple test for just the cache. Signed-off-by: Goutham Veeramachaneni --- cache/memcached_test.go | 59 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 cache/memcached_test.go diff --git a/cache/memcached_test.go b/cache/memcached_test.go new file mode 100644 index 000000000000..fdafbdb0ebce --- /dev/null +++ b/cache/memcached_test.go @@ -0,0 +1,59 @@ +package cache_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +func TestMemcached(t *testing.T) { + t.Run("unbatched", func(t *testing.T) { + client := newMockMemcache() + memcache := cache.NewMemcached(cache.MemcachedConfig{}, client) + + testMemcache(t, memcache) + }) + + t.Run("batched", func(t *testing.T) { + client := newMockMemcache() + memcache := cache.NewMemcached(cache.MemcachedConfig{ + BatchSize: 10, + Parallelism: 5, + }, client) + + testMemcache(t, memcache) + }) +} + +func testMemcache(t *testing.T, memcache *cache.Memcached) { + numKeys := 1000 + + ctx := context.Background() + keys := make([]string, 0, numKeys) + // Insert 1000 keys skipping all multiples of 5. + for i := 0; i < numKeys; i++ { + keys = append(keys, string(i)) + if i%5 == 0 { + continue + } + + require.NoError(t, memcache.Store(ctx, string(i), []byte(string(i)))) + } + + found, bufs, missing, err := memcache.Fetch(ctx, keys) + require.NoError(t, err) + for i := 0; i < numKeys; i++ { + if i%5 == 0 { + require.Equal(t, string(i), missing[0]) + missing = missing[1:] + continue + } + + require.Equal(t, string(i), found[0]) + require.Equal(t, string(i), string(bufs[0])) + found = found[1:] + bufs = bufs[1:] + } +} From 2867bd5c5aaf6a674b7e4dcbe10315cf1b230a27 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 12 Sep 2018 17:07:15 +0530 Subject: [PATCH 3/4] lint: Make sure we follow go1.10 fmt rules Signed-off-by: Goutham Veeramachaneni --- cache/cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cache/cache_test.go b/cache/cache_test.go index c2869030d046..7f759e3c7833 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -35,7 +35,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, promChunk[0], ts, From a9941d33f4bc462c5dc129cea3e2e6501a8bf63b Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Wed, 12 Sep 2018 18:03:15 +0530 Subject: [PATCH 4/4] Add test for failing memcache calls. Signed-off-by: Goutham Veeramachaneni --- cache/memcached.go | 5 +-- cache/memcached_test.go | 87 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 3 deletions(-) diff --git a/cache/memcached.go b/cache/memcached.go index 4e46c7e65aee..2f40891e4f93 100644 --- a/cache/memcached.go +++ b/cache/memcached.go @@ -143,8 +143,10 @@ func (c *Memcached) fetch(ctx context.Context, keys []string) (found []string, b }) if err != nil { + missed = keys return } + for _, key := range keys { item, ok := items[key] if ok { @@ -189,11 +191,8 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found close(resultsCh) for _, result := range results { - // TODO(gouthamve): One call may fail while everything else succeeds. Put the - // failed call keys in missed then. if result.err != nil { err = result.err - continue } found = append(found, result.found...) diff --git a/cache/memcached_test.go b/cache/memcached_test.go index fdafbdb0ebce..b003e6a2d07c 100644 --- a/cache/memcached_test.go +++ b/cache/memcached_test.go @@ -2,8 +2,11 @@ package cache_test import ( "context" + "errors" + "sync/atomic" "testing" + "github.com/bradfitz/gomemcache/memcache" "github.com/stretchr/testify/require" "github.com/weaveworks/cortex/pkg/chunk/cache" ) @@ -57,3 +60,87 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) { bufs = bufs[1:] } } + +// mockMemcache whose calls fail 1/3rd of the time. +type mockMemcacheFailing struct { + *mockMemcache + calls uint64 +} + +func newMockMemcacheFailing() *mockMemcacheFailing { + return &mockMemcacheFailing{ + mockMemcache: newMockMemcache(), + } +} + +func (c *mockMemcacheFailing) GetMulti(keys []string) (map[string]*memcache.Item, error) { + calls := atomic.AddUint64(&c.calls, 1) + if calls%3 == 0 { + return nil, errors.New("fail") + } + + return c.mockMemcache.GetMulti(keys) +} + +func TestMemcacheFailure(t *testing.T) { + t.Run("unbatched", func(t *testing.T) { + client := newMockMemcacheFailing() + memcache := cache.NewMemcached(cache.MemcachedConfig{}, client) + + testMemcacheFailing(t, memcache) + }) + + t.Run("batched", func(t *testing.T) { + client := newMockMemcacheFailing() + memcache := cache.NewMemcached(cache.MemcachedConfig{ + BatchSize: 10, + Parallelism: 5, + }, client) + + testMemcacheFailing(t, memcache) + }) +} + +func testMemcacheFailing(t *testing.T, memcache *cache.Memcached) { + numKeys := 1000 + + ctx := context.Background() + keys := make([]string, 0, numKeys) + // Insert 1000 keys skipping all multiples of 5. + for i := 0; i < numKeys; i++ { + keys = append(keys, string(i)) + if i%5 == 0 { + continue + } + + require.NoError(t, memcache.Store(ctx, string(i), []byte(string(i)))) + } + + for i := 0; i < 10; i++ { + found, bufs, missing, _ := memcache.Fetch(ctx, keys) + + require.Equal(t, len(found), len(bufs)) + for i := range found { + require.Equal(t, found[i], string(bufs[i])) + } + + keysReturned := make(map[string]struct{}) + for _, key := range found { + _, ok := keysReturned[key] + require.False(t, ok, "duplicate key returned") + + keysReturned[key] = struct{}{} + } + for _, key := range missing { + _, ok := keysReturned[key] + require.False(t, ok, "duplicate key returned") + + keysReturned[key] = struct{}{} + } + + for _, key := range keys { + _, ok := keysReturned[key] + require.True(t, ok, "key missing %s", key) + } + } +}