Skip to content

Commit

Permalink
Results cache fix improvements (#7444)
Browse files Browse the repository at this point in the history
Move middleware to someplace more sensible and incorporate feedback
missed in the review for this.
  • Loading branch information
MasslessParticle authored Oct 17, 2022
1 parent b59277b commit c1bccac
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 36 deletions.
5 changes: 4 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre
}

for _, res := range results {
metadata.JoinHeaders(ctx, res.Headers)
if err := metadata.JoinHeaders(ctx, res.Headers); err != nil {
level.Warn(util_log.Logger).Log("msg", "unable to add headers to results context", "error", err)
break
}
}

return results, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2199,7 +2199,7 @@ func TestEngine_Stats(t *testing.T) {
type metaQuerier struct{}

func (metaQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) {
metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{
_ = metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{
{
Name: "Header",
Values: []string{"value"},
Expand All @@ -2209,7 +2209,7 @@ func (metaQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.Entr
}

func (metaQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) {
metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{
_ = metadata.JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{
{Name: "Header", Values: []string{"value"}},
})
return iter.NoopIterator, nil
Expand Down
21 changes: 16 additions & 5 deletions pkg/logqlmodel/metadata/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metadata

import (
"context"
"errors"
"sort"
"sync"

Expand All @@ -20,9 +21,13 @@ const (
metadataKey ctxKeyType = "metadata"
)

var (
ErrNoCtxData = errors.New("unable to add headers to context: no existing context data")
)

// Context is the metadata context. It is passed through the query path and accumulates metadata.
type Context struct {
mtx sync.RWMutex
mtx sync.Mutex
headers map[string][]string
}

Expand All @@ -48,8 +53,8 @@ func FromContext(ctx context.Context) *Context {

// Headers returns the cache headers accumulated in the context so far.
func (c *Context) Headers() []*definitions.PrometheusResponseHeader {
c.mtx.RLock()
defer c.mtx.RUnlock()
c.mtx.Lock()
defer c.mtx.Unlock()

headers := make([]*definitions.PrometheusResponseHeader, 0, len(c.headers))
for k, vs := range c.headers {
Expand All @@ -70,13 +75,19 @@ func (c *Context) Headers() []*definitions.PrometheusResponseHeader {
// JoinHeaders merges a Headers with the embedded Headers in a context in a concurrency-safe manner.
// JoinHeaders will consolidate all distinct headers but will override same-named headers in an
// undefined way
func JoinHeaders(ctx context.Context, headers []*definitions.PrometheusResponseHeader) {
context := FromContext(ctx)
func JoinHeaders(ctx context.Context, headers []*definitions.PrometheusResponseHeader) error {
context, ok := ctx.Value(metadataKey).(*Context)
if !ok {
return ErrNoCtxData
}

context.mtx.Lock()
defer context.mtx.Unlock()

for i := range headers {
header := headers[i]
context.headers[header.Name] = header.Values
}

return nil
}
22 changes: 19 additions & 3 deletions pkg/logqlmodel/metadata/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metadata

import (
"context"
"errors"
"testing"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
Expand Down Expand Up @@ -29,13 +30,28 @@ func TestHeaders(t *testing.T) {
}

metadata, ctx := NewContext(context.Background())
JoinHeaders(ctx, h1)
JoinHeaders(ctx, h2)
JoinHeaders(ctx, h3)
err := JoinHeaders(ctx, h1)
require.Nil(t, err)
err = JoinHeaders(ctx, h2)
require.Nil(t, err)
err = JoinHeaders(ctx, h3)
require.Nil(t, err)

require.Equal(t, []*definitions.PrometheusResponseHeader{
{Name: "Header1", Values: []string{"value"}},
{Name: "Header2", Values: []string{"value1"}},
{Name: "Header3", Values: []string{"value2"}},
}, metadata.Headers())
}

func TestHeadersNoKey(t *testing.T) {
ctx := context.Background()
err := JoinHeaders(ctx, []*definitions.PrometheusResponseHeader{
{
Name: "Header1",
Values: []string{"value"},
},
})

require.True(t, errors.Is(err, ErrNoCtxData))
}
4 changes: 3 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"

"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -388,7 +390,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
if t.supportIndexDeleteRequest() {
toMerge = append(
toMerge,
serverutil.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader),
queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader),
)
}
httpMiddleware := middleware.Merge(toMerge...)
Expand Down
30 changes: 30 additions & 0 deletions pkg/querier/queryrange/queryrangebase/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package queryrangebase

import (
"net/http"

"github.com/grafana/dskit/tenant"
"github.com/weaveworks/common/middleware"
)

const (
// ResultsCacheGenNumberHeaderName holds name of the header we want to set in http response
ResultsCacheGenNumberHeaderName = "Results-Cache-Gen-Number"
)

func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLoader) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs)

w.Header().Set(ResultsCacheGenNumberHeaderName, cacheGenNumber)
next.ServeHTTP(w, r)
})
})
}
33 changes: 33 additions & 0 deletions pkg/querier/queryrange/queryrangebase/middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package queryrangebase

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/weaveworks/common/user"

"github.com/stretchr/testify/assert"
)

func TestCacheGenNumberHeaderSetterMiddleware(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
req, _ := http.NewRequestWithContext(ctx, "GET", "http://testing.com", nil)
w := httptest.NewRecorder()
loader := &fakeGenNumberLoader{genNumber: "test-header-value"}

mware := CacheGenNumberHeaderSetterMiddleware(loader).
Wrap(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {}))
mware.ServeHTTP(w, req)

assert.Equal(t, w.Header().Get(ResultsCacheGenNumberHeaderName), "test-header-value")
}

type fakeGenNumberLoader struct {
genNumber string
}

func (l *fakeGenNumberLoader) GetResultsCacheGenNumber(tenantIDs []string) string {
return l.genNumber
}
3 changes: 0 additions & 3 deletions pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ import (
var (
// Value that cacheControlHeader has if the response indicates that the results should not be cached.
noStoreValue = "no-store"

// ResultsCacheGenNumberHeaderName holds name of the header we want to set in http response
ResultsCacheGenNumberHeaderName = "Results-Cache-Gen-Number"
)

const (
Expand Down
21 changes: 0 additions & 21 deletions pkg/util/server/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ package server
import (
"net/http"

"github.com/grafana/dskit/tenant"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)

// NewPrepopulateMiddleware creates a middleware which will parse incoming http forms.
Expand All @@ -34,21 +31,3 @@ func ResponseJSONMiddleware() middleware.Interface {
})
})
}

// middleware for setting cache gen header to let consumer of response know all previous responses could be invalid due to delete operation
func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader queryrangebase.CacheGenNumberLoader) middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs)

w.Header().Set(queryrangebase.ResultsCacheGenNumberHeaderName, cacheGenNumber)
next.ServeHTTP(w, r)
})
})
}

0 comments on commit c1bccac

Please sign in to comment.