Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

frontend: implement cache control #1974

Merged
merged 12 commits into from
Jan 17, 2020
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

## master / unreleased

* [CHANGE] The frontend component now does not cache results if it finds a `Cache-Control` header and if one of its values is `no-store`. #1974
* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled.

* [CHANGE] Flags changed with transition to upstream Prometheus rules manager:
* `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`
* `ruler.group-timeout`has been removed
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
PrometheusCodec Codec = &prometheusCodec{}

// Name of the cache control header.
cachecontrolHeader = "Cache-Control"
)

// Codec is used to encode/decode query range requests and responses so they can be passed down to middlewares.
Expand Down Expand Up @@ -221,6 +224,10 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

for h, hv := range r.Header {
resp.Headers = append(resp.Headers, &PrometheusResponseHeader{Name: h, Values: hv})
}
return &resp, nil
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ func TestRequest(t *testing.T) {
}

func TestResponse(t *testing.T) {
r := *parsedResponse
r.Headers = respHeaders
for i, tc := range []struct {
body string
expected *PrometheusResponse
}{
{
body: responseBody,
expected: parsedResponse,
expected: &r,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
Expand Down
471 changes: 414 additions & 57 deletions pkg/querier/queryrange/queryrange.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/querier/queryrange/queryrange.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ message PrometheusRequest {
string query = 6;
}

message PrometheusResponseHeader {
string Name = 1 [(gogoproto.jsontag) = "-"];
repeated string Values = 2 [(gogoproto.jsontag) = "-"];
}

message PrometheusResponse {
string Status = 1 [(gogoproto.jsontag) = "status"];
PrometheusData Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data,omitempty"];
string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"];
string Error = 4 [(gogoproto.jsontag) = "error,omitempty"];
repeated PrometheusResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"];
}

message PrometheusData {
Expand Down
35 changes: 35 additions & 0 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

var (
// Value that cachecontrolHeader has if the response indicates that the results should not be cached.
noCacheValue = "no-store"
)

// ResultsCacheConfig is the config for the results cache.
type ResultsCacheConfig struct {
CacheConfig cache.Config `yaml:"cache"`
Expand Down Expand Up @@ -59,6 +64,7 @@ var PrometheusResponseExtractor = ExtractorFunc(func(start, end int64, from Resp
ResultType: promRes.Data.ResultType,
Result: extractMatrix(start, end, promRes.Data.Result),
},
Headers: promRes.Headers,
}
})

Expand Down Expand Up @@ -133,12 +139,41 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
return response, err
}

// shouldCacheResponse says whether the response should be cached or not.
func shouldCacheResponse(r Response) bool {
if promResp, ok := r.(*PrometheusResponse); ok {
shouldCache := true
outer:
for _, hv := range promResp.Headers {
if hv == nil {
continue
}
if hv.Name != cachecontrolHeader {
continue
}
for _, v := range hv.Values {
if v == noCacheValue {
shouldCache = false
break outer
}
}
}
return shouldCache
}
return true
}

func (s resultsCache) handleMiss(ctx context.Context, r Request) (Response, []Extent, error) {
response, err := s.next.Do(ctx, r)
if err != nil {
return nil, nil, err
}

if !shouldCacheResponse(response) {
level.Debug(s.logger).Log("msg", fmt.Sprintf("%s header in response is equal to %s, not caching the response", cachecontrolHeader, noCacheValue))
return response, []Extent{}, nil
}

extent, err := toExtent(ctx, r, response)
if err != nil {
return nil, nil, err
Expand Down
57 changes: 57 additions & 0 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ var (
Step: 120 * 1e3,
Query: "sum(container_memory_rss) by (namespace)",
}
respHeaders = []*PrometheusResponseHeader{
{
Name: "Content-Type",
Values: []string{"application/json"},
},
}
parsedResponse = &PrometheusResponse{
Status: "success",
Data: PrometheusData{
Expand Down Expand Up @@ -108,6 +114,57 @@ func mkExtent(start, end int64) Extent {
}
}

func TestShouldCache(t *testing.T) {
for i, tc := range []struct {
input Response
expected bool
}{
// Does not contain the needed header.
{
input: Response(&PrometheusResponse{
Headers: []*PrometheusResponseHeader{
{
Name: "meaninglessheader",
Values: []string{},
},
},
}),
expected: true,
},
// Does contain the header which has the value.
{
input: Response(&PrometheusResponse{
Headers: []*PrometheusResponseHeader{
{
Name: cachecontrolHeader,
Values: []string{noCacheValue},
},
},
}),
expected: false,
},
// Header contains extra values but still good.
{
input: Response(&PrometheusResponse{
Headers: []*PrometheusResponseHeader{
{
Name: cachecontrolHeader,
Values: []string{"foo", noCacheValue},
},
},
}),
expected: false,
},
} {
{
t.Run(strconv.Itoa(i), func(t *testing.T) {
ret := shouldCacheResponse(tc.input)
require.Equal(t, tc.expected, ret)
})
}
}
}

func TestPartiton(t *testing.T) {
for i, tc := range []struct {
input Request
Expand Down