Skip to content

Commit

Permalink
feat(cache): Add Cache-Control: no-cache support for Loki instant q…
Browse files Browse the repository at this point in the history
…ueries. (#12896)

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
  • Loading branch information
kavirajk authored May 19, 2024
1 parent 88c6711 commit 88e545f
Show file tree
Hide file tree
Showing 11 changed files with 626 additions and 167 deletions.
1 change: 1 addition & 0 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func newQueryClient(app *kingpin.Application) client.Client {
app.Flag("key", "Path to the client certificate key. Can also be set using LOKI_CLIENT_KEY_PATH env var.").Default("").Envar("LOKI_CLIENT_KEY_PATH").StringVar(&client.TLSConfig.KeyFile)
app.Flag("org-id", "adds X-Scope-OrgID to API requests for representing tenant ID. Useful for requesting tenant data when bypassing an auth gateway. Can also be set using LOKI_ORG_ID env var.").Default("").Envar("LOKI_ORG_ID").StringVar(&client.OrgID)
app.Flag("query-tags", "adds X-Query-Tags http header to API requests. This header value will be part of `metrics.go` statistics. Useful for tracking the query. Can also be set using LOKI_QUERY_TAGS env var.").Default("").Envar("LOKI_QUERY_TAGS").StringVar(&client.QueryTags)
app.Flag("nocache", "adds Cache-Control: no-cache http header to API requests. Can also be set using LOKI_NO_CACHE env var.").Default("false").Envar("LOKI_NO_CACHE").BoolVar(&client.NoCache)
app.Flag("bearer-token", "adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN env var.").Default("").Envar("LOKI_BEARER_TOKEN").StringVar(&client.BearerToken)
app.Flag("bearer-token-file", "adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN_FILE env var.").Default("").Envar("LOKI_BEARER_TOKEN_FILE").StringVar(&client.BearerTokenFile)
app.Flag("retries", "How many times to retry each query when getting an error response from Loki. Can also be set using LOKI_CLIENT_RETRIES env var.").Default("0").Envar("LOKI_CLIENT_RETRIES").IntVar(&client.Retries)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
)

type errTooFarBehind struct {
// original timestmap of the entry itself.
// original timestamp of the entry itself.
entryTs time.Time

// cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to.
Expand Down
15 changes: 13 additions & 2 deletions pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ const (
volumeRangePath = "/loki/api/v1/index/volume_range"
detectedFieldsPath = "/loki/api/v1/detected_fields"
defaultAuthHeader = "Authorization"

// HTTP header keys
HTTPScopeOrgID = "X-Scope-OrgID"
HTTPQueryTags = "X-Query-Tags"
HTTPCacheControl = "Cache-Control"
HTTPCacheControlNoCache = "no-cache"
)

var userAgent = fmt.Sprintf("loki-logcli/%s", build.Version)
Expand Down Expand Up @@ -77,6 +83,7 @@ type DefaultClient struct {
BearerTokenFile string
Retries int
QueryTags string
NoCache bool
AuthHeader string
ProxyURL string
BackoffConfig BackoffConfig
Expand Down Expand Up @@ -372,11 +379,15 @@ func (c *DefaultClient) getHTTPRequestHeader() (http.Header, error) {
h.Set("User-Agent", userAgent)

if c.OrgID != "" {
h.Set("X-Scope-OrgID", c.OrgID)
h.Set(HTTPScopeOrgID, c.OrgID)
}

if c.NoCache {
h.Set(HTTPCacheControl, HTTPCacheControlNoCache)
}

if c.QueryTags != "" {
h.Set("X-Query-Tags", c.QueryTags)
h.Set(HTTPQueryTags, c.QueryTags)
}

if (c.Username != "" || c.Password != "") && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) {
Expand Down
76 changes: 66 additions & 10 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util"
)

Expand All @@ -42,6 +43,7 @@ type Params interface {
Shards() []string
GetExpression() syntax.Expr
GetStoreChunks() *logproto.ChunkRefGroup
CachingOptions() resultscache.CachingOptions
}

func NewLiteralParams(
Expand All @@ -52,22 +54,70 @@ func NewLiteralParams(
limit uint32,
shards []string,
storeChunks *logproto.ChunkRefGroup,
) (LiteralParams, error) {
return newLiteralParams(
qs,
start,
end,
step,
interval,
direction,
limit,
shards,
storeChunks,
resultscache.CachingOptions{},
)
}

func NewLiteralParamsWithCaching(
qs string,
start, end time.Time,
step, interval time.Duration,
direction logproto.Direction,
limit uint32,
shards []string,
storeChunks *logproto.ChunkRefGroup,
cachingOptions resultscache.CachingOptions,
) (LiteralParams, error) {
return newLiteralParams(
qs,
start,
end,
step,
interval,
direction,
limit,
shards,
storeChunks,
cachingOptions,
)
}

func newLiteralParams(
qs string,
start, end time.Time,
step, interval time.Duration,
direction logproto.Direction,
limit uint32,
shards []string,
storeChunks *logproto.ChunkRefGroup,
cachingOptions resultscache.CachingOptions,
) (LiteralParams, error) {
p := LiteralParams{
queryString: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
shards: shards,
storeChunks: storeChunks,
queryString: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
shards: shards,
storeChunks: storeChunks,
cachingOptions: cachingOptions,
}
var err error
p.queryExpr, err = syntax.ParseExpr(qs)
return p, err

}

// LiteralParams impls Params
Expand All @@ -80,6 +130,7 @@ type LiteralParams struct {
shards []string
queryExpr syntax.Expr
storeChunks *logproto.ChunkRefGroup
cachingOptions resultscache.CachingOptions
}

func (p LiteralParams) Copy() LiteralParams { return p }
Expand Down Expand Up @@ -114,6 +165,11 @@ func (p LiteralParams) Shards() []string { return p.shards }
// StoreChunks impls Params
func (p LiteralParams) GetStoreChunks() *logproto.ChunkRefGroup { return p.storeChunks }

// CachingOptions returns whether Loki query created from this params should be cached.
func (p LiteralParams) CachingOptions() resultscache.CachingOptions {
return p.cachingOptions
}

// GetRangeType returns whether a query is an instant query or range query
func GetRangeType(q Params) QueryRangeType {
if q.Start() == q.End() && q.Step() == 0 {
Expand Down
48 changes: 43 additions & 5 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
"github.com/grafana/loki/v3/pkg/util/querylimits"
)

const (
cacheControlHeader = "Cache-Control"
noCacheVal = "no-cache"
)

var DefaultCodec = &Codec{}

type Codec struct{}
Expand Down Expand Up @@ -95,8 +100,6 @@ func (r *LokiRequest) LogToSpan(sp opentracing.Span) {
)
}

func (*LokiRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return }

func (r *LokiInstantRequest) GetStep() int64 {
return 0
}
Expand Down Expand Up @@ -142,8 +145,6 @@ func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) {
)
}

func (*LokiInstantRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return }

func (r *LokiSeriesRequest) GetEnd() time.Time {
return r.EndTs
}
Expand Down Expand Up @@ -329,20 +330,29 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

disableCacheReq := false

if strings.ToLower(strings.TrimSpace(r.Header.Get(cacheControlHeader))) == noCacheVal {
disableCacheReq = true
}

switch op := getOperation(r.URL.Path); op {
case QueryRangeOp:
req, err := parseRangeQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return req, nil
case InstantQueryOp:
req, err := parseInstantQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

req.CachingOptions = queryrangebase.CachingOptions{
Disabled: disableCacheReq,
}

return req, nil
case SeriesOp:
req, err := loghttp.ParseAndValidateSeriesQuery(r)
Expand Down Expand Up @@ -1808,6 +1818,10 @@ func (p paramsRangeWrapper) Shards() []string {
return p.GetShards()
}

func (p paramsRangeWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsInstantWrapper struct {
*LokiInstantRequest
}
Expand Down Expand Up @@ -1840,6 +1854,10 @@ func (p paramsInstantWrapper) Shards() []string {
return p.GetShards()
}

func (p paramsInstantWrapper) CachingOptions() resultscache.CachingOptions {
return p.LokiInstantRequest.CachingOptions
}

type paramsSeriesWrapper struct {
*LokiSeriesRequest
}
Expand Down Expand Up @@ -1876,6 +1894,10 @@ func (p paramsSeriesWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsSeriesWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsLabelWrapper struct {
*LabelRequest
}
Expand Down Expand Up @@ -1912,6 +1934,10 @@ func (p paramsLabelWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsLabelWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsStatsWrapper struct {
*logproto.IndexStatsRequest
}
Expand Down Expand Up @@ -1948,6 +1974,10 @@ func (p paramsStatsWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsStatsWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsDetectedFieldsWrapper struct {
*DetectedFieldsRequest
}
Expand Down Expand Up @@ -2040,6 +2070,14 @@ func (p paramsDetectedFieldsWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsDetectedLabelsWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

func (p paramsDetectedFieldsWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrangebase.PrometheusResponseHeader {
var promHeaders []queryrangebase.PrometheusResponseHeader
for h, hv := range httpHeaders {
Expand Down
49 changes: 49 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,55 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
}
}

func Test_codec_DecodeRequest_cacheHeader(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")

tests := []struct {
name string
reqBuilder func() (*http.Request, error)
want queryrangebase.Request
}{
{
"query_instant",
func() (*http.Request, error) {
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf(`/v1/query?time=%d&query={foo="bar"}&limit=200&direction=FORWARD`, start.UnixNano()),
nil,
)
if err == nil {
req.Header.Set(cacheControlHeader, noCacheVal)
}
return req, err
},
&LokiInstantRequest{
Query: `{foo="bar"}`,
Limit: 200,
Direction: logproto.FORWARD,
Path: "/v1/query",
TimeTs: start,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{foo="bar"}`),
},
CachingOptions: queryrangebase.CachingOptions{
Disabled: true,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := tt.reqBuilder()
if err != nil {
t.Fatal(err)
}
got, err := DefaultCodec.DecodeRequest(ctx, req, nil)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}

func Test_codec_DecodeResponse(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func ParamsToLokiRequest(params logql.Params) queryrangebase.Request {
Plan: &plan.QueryPlan{
AST: params.GetExpression(),
},
StoreChunks: params.GetStoreChunks(),
StoreChunks: params.GetStoreChunks(),
CachingOptions: params.CachingOptions(),
}
}
return &LokiRequest{
Expand All @@ -61,7 +62,8 @@ func ParamsToLokiRequest(params logql.Params) queryrangebase.Request {
Plan: &plan.QueryPlan{
AST: params.GetExpression(),
},
StoreChunks: params.GetStoreChunks(),
StoreChunks: params.GetStoreChunks(),
CachingOptions: params.CachingOptions(),
}
}

Expand Down
Loading

0 comments on commit 88e545f

Please sign in to comment.