From 1abe8841cef600678ff7e5de3f07bed91d4d6db3 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 17 Oct 2019 17:08:49 -0400 Subject: [PATCH] Moves request parsing into the loghttp package (#1171) * refactor query requests parsing Signed-off-by: Cyril Tovena * add tests and documentation Signed-off-by: Cyril Tovena --- pkg/loghttp/labels.go | 21 +++ pkg/loghttp/labels_test.go | 65 +++++++++ pkg/loghttp/params.go | 123 +++++++++++++++++ pkg/loghttp/params_test.go | 123 +++++++++++++++++ pkg/loghttp/query.go | 94 +++++++++++++ pkg/loghttp/query_test.go | 112 +++++++++++++++ pkg/loghttp/tail.go | 33 +++++ pkg/loghttp/tail_test.go | 53 +++++++ pkg/querier/http.go | 273 +++++-------------------------------- pkg/querier/http_test.go | 90 ------------ 10 files changed, 658 insertions(+), 329 deletions(-) create mode 100644 pkg/loghttp/labels_test.go create mode 100644 pkg/loghttp/params.go create mode 100644 pkg/loghttp/params_test.go create mode 100644 pkg/loghttp/query_test.go create mode 100644 pkg/loghttp/tail_test.go delete mode 100644 pkg/querier/http_test.go diff --git a/pkg/loghttp/labels.go b/pkg/loghttp/labels.go index 2367f596d301..71b267baf7d6 100644 --- a/pkg/loghttp/labels.go +++ b/pkg/loghttp/labels.go @@ -2,8 +2,12 @@ package loghttp import ( "bytes" + "net/http" "sort" "strconv" + + "github.com/gorilla/mux" + "github.com/grafana/loki/pkg/logproto" ) // LabelResponse represents the http json response to a label query @@ -37,3 +41,20 @@ func (l LabelSet) String() string { b.WriteByte('}') return b.String() } + +// ParseLabelQuery parses a LabelRequest request from an http request. +func ParseLabelQuery(r *http.Request) (*logproto.LabelRequest, error) { + name, ok := mux.Vars(r)["name"] + req := &logproto.LabelRequest{ + Values: ok, + Name: name, + } + + start, end, err := bounds(r) + if err != nil { + return nil, err + } + req.Start = &start + req.End = &end + return req, nil +} diff --git a/pkg/loghttp/labels_test.go b/pkg/loghttp/labels_test.go new file mode 100644 index 000000000000..d2f452e52ced --- /dev/null +++ b/pkg/loghttp/labels_test.go @@ -0,0 +1,65 @@ +package loghttp + +import ( + "net/http" + "reflect" + "testing" + "time" + + "github.com/gorilla/mux" + "github.com/grafana/loki/pkg/logproto" +) + +func TestParseLabelQuery(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + r *http.Request + want *logproto.LabelRequest + wantErr bool + }{ + {"bad start", &http.Request{URL: mustParseURL(`?&start=t`)}, nil, true}, + {"bad end", &http.Request{URL: mustParseURL(`?&start=2016-06-10T21:42:24.760738998Z&end=h`)}, nil, true}, + {"good no name in the pah", + requestWithVar(&http.Request{ + URL: mustParseURL(`?start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z`), + }, "name", "test"), &logproto.LabelRequest{ + Name: "test", + Values: true, + Start: timePtr(time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC)), + End: timePtr(time.Date(2017, 07, 10, 21, 42, 24, 760738998, time.UTC)), + }, false}, + {"good with name", + &http.Request{ + URL: mustParseURL(`?start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z`), + }, &logproto.LabelRequest{ + Name: "", + Values: false, + Start: timePtr(time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC)), + End: timePtr(time.Date(2017, 07, 10, 21, 42, 24, 760738998, time.UTC)), + }, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseLabelQuery(tt.r) + if (err != nil) != tt.wantErr { + t.Errorf("ParseLabelQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParseLabelQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func timePtr(t time.Time) *time.Time { + return &t +} + +func requestWithVar(req *http.Request, name, value string) *http.Request { + return mux.SetURLVars(req, map[string]string{ + name: value, + }) +} diff --git a/pkg/loghttp/params.go b/pkg/loghttp/params.go new file mode 100644 index 000000000000..3319b03190c9 --- /dev/null +++ b/pkg/loghttp/params.go @@ -0,0 +1,123 @@ +package loghttp + +import ( + "fmt" + "math" + "net/http" + "strconv" + "strings" + "time" + + "github.com/grafana/loki/pkg/logproto" +) + +const ( + defaultQueryLimit = 100 + defaultSince = 1 * time.Hour + defaultStep = 1 // 1 seconds +) + +func limit(r *http.Request) (uint32, error) { + l, err := parseInt(r.URL.Query().Get("limit"), defaultQueryLimit) + if err != nil { + return 0, err + } + return uint32(l), nil +} + +func query(r *http.Request) string { + return r.URL.Query().Get("query") +} + +func ts(r *http.Request) (time.Time, error) { + return parseTimestamp(r.URL.Query().Get("time"), time.Now()) +} + +func direction(r *http.Request) (logproto.Direction, error) { + return parseDirection(r.URL.Query().Get("direction"), logproto.BACKWARD) +} + +func bounds(r *http.Request) (time.Time, time.Time, error) { + now := time.Now() + start, err := parseTimestamp(r.URL.Query().Get("start"), now.Add(-defaultSince)) + if err != nil { + return time.Time{}, time.Time{}, err + } + end, err := parseTimestamp(r.URL.Query().Get("end"), now) + if err != nil { + return time.Time{}, time.Time{}, err + } + return start, end, nil +} + +func step(r *http.Request, start, end time.Time) (time.Duration, error) { + s, err := parseInt(r.URL.Query().Get("step"), defaultQueryRangeStep(start, end)) + if err != nil { + return 0, err + } + return time.Duration(s) * time.Second, nil +} + +// defaultQueryRangeStep returns the default step used in the query range API, +// which is dinamically calculated based on the time range +func defaultQueryRangeStep(start time.Time, end time.Time) int { + return int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1)) +} + +func tailDelay(r *http.Request) (uint32, error) { + l, err := parseInt(r.URL.Query().Get("delay_for"), 0) + if err != nil { + return 0, err + } + return uint32(l), nil +} + +// parseInt parses an int from a string +// if the value is empty it returns a default value passed as second parameter +func parseInt(value string, def int) (int, error) { + if value == "" { + return def, nil + } + return strconv.Atoi(value) +} + +// parseUnixNano parses a ns unix timestamp from a string +// if the value is empty it returns a default value passed as second parameter +func parseTimestamp(value string, def time.Time) (time.Time, error) { + if value == "" { + return def, nil + } + + if strings.Contains(value, ".") { + if t, err := strconv.ParseFloat(value, 64); err == nil { + s, ns := math.Modf(t) + ns = math.Round(ns*1000) / 1000 + return time.Unix(int64(s), int64(ns*float64(time.Second))), nil + } + } + nanos, err := strconv.ParseInt(value, 10, 64) + if err != nil { + if ts, err := time.Parse(time.RFC3339Nano, value); err == nil { + return ts, nil + } + return time.Time{}, err + } + if len(value) <= 10 { + return time.Unix(nanos, 0), nil + } + return time.Unix(0, nanos), nil +} + +// parseDirection parses a logproto.Direction from a string +// if the value is empty it returns a default value passed as second parameter +func parseDirection(value string, def logproto.Direction) (logproto.Direction, error) { + if value == "" { + return def, nil + } + + d, ok := logproto.Direction_value[strings.ToUpper(value)] + if !ok { + return logproto.FORWARD, fmt.Errorf("invalid direction '%s'", value) + } + return logproto.Direction(d), nil +} diff --git a/pkg/loghttp/params_test.go b/pkg/loghttp/params_test.go new file mode 100644 index 000000000000..b6559bd53ba9 --- /dev/null +++ b/pkg/loghttp/params_test.go @@ -0,0 +1,123 @@ +package loghttp + +import ( + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHttp_defaultQueryRangeStep(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + start time.Time + end time.Time + expected int + }{ + "should not be lower then 1s": { + start: time.Unix(60, 0), + end: time.Unix(60, 0), + expected: 1, + }, + "should return 1s if input time range is 5m": { + start: time.Unix(60, 0), + end: time.Unix(360, 0), + expected: 1, + }, + "should return 14s if input time range is 1h": { + start: time.Unix(60, 0), + end: time.Unix(3660, 0), + expected: 14, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, defaultQueryRangeStep(testData.start, testData.end)) + }) + } +} + +func TestHttp_ParseRangeQuery_Step(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + reqPath string + expected *RangeQuery + }{ + "should set the default step based on the input time range if the step parameter is not provided": { + reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000", + expected: &RangeQuery{ + Query: "{}", + Start: time.Unix(0, 0), + End: time.Unix(3600, 0), + Step: 14 * time.Second, + Limit: 100, + Direction: logproto.BACKWARD, + }, + }, + "should use the input step parameter if provided": { + reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5", + expected: &RangeQuery{ + Query: "{}", + Start: time.Unix(0, 0), + End: time.Unix(3600, 0), + Step: 5 * time.Second, + Limit: 100, + Direction: logproto.BACKWARD, + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + req := httptest.NewRequest("GET", testData.reqPath, nil) + actual, err := ParseRangeQuery(req) + + require.NoError(t, err) + assert.Equal(t, testData.expected, actual) + }) + } +} + +func Test_parseTimestamp(t *testing.T) { + + now := time.Now() + + tests := []struct { + name string + value string + def time.Time + want time.Time + wantErr bool + }{ + {"default", "", now, now, false}, + {"unix timestamp", "1571332130", now, time.Unix(1571332130, 0), false}, + {"unix nano timestamp", "1571334162051000000", now, time.Unix(0, 1571334162051000000), false}, + {"unix timestamp with subseconds", "1571332130.934", now, time.Unix(1571332130, 934*1e6), false}, + {"RFC3339 format", "2002-10-02T15:00:00Z", now, time.Date(2002, 10, 02, 15, 0, 0, 0, time.UTC), false}, + {"RFC3339nano format", "2009-11-10T23:00:00.000000001Z", now, time.Date(2009, 11, 10, 23, 0, 0, 1, time.UTC), false}, + {"invalid", "we", now, time.Time{}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseTimestamp(tt.value, tt.def) + if (err != nil) != tt.wantErr { + t.Errorf("parseTimestamp() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseTimestamp() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index f69fb1470c62..b5818f55cc1d 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -2,13 +2,22 @@ package loghttp import ( "encoding/json" + "errors" "fmt" + "net/http" "strconv" "time" + "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/common/model" ) +var ( + errEndBeforeStart = errors.New("end timestamp must not be before start time") + errNegativeStep = errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") + errStepTooSmall = errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") +) + // QueryStatus holds the status of a query type QueryStatus string @@ -149,3 +158,88 @@ type Vector []model.Sample // Matrix is a slice of SampleStreams type Matrix []model.SampleStream + +// InstantQuery defines a log instant query. +type InstantQuery struct { + Query string + Ts time.Time + Limit uint32 + Direction logproto.Direction +} + +// ParseInstantQuery parses an InstantQuery request from an http request. +func ParseInstantQuery(r *http.Request) (*InstantQuery, error) { + var err error + request := &InstantQuery{ + Query: query(r), + } + request.Limit, err = limit(r) + if err != nil { + return nil, err + } + + request.Ts, err = ts(r) + if err != nil { + return nil, err + } + + request.Direction, err = direction(r) + if err != nil { + return nil, err + } + + return request, nil +} + +// RangeQuery defines a log range query. +type RangeQuery struct { + Start time.Time + End time.Time + Step time.Duration + Query string + Direction logproto.Direction + Limit uint32 +} + +// ParseRangeQuery parses a RangeQuery request from an http request. +func ParseRangeQuery(r *http.Request) (*RangeQuery, error) { + var result RangeQuery + var err error + + result.Query = query(r) + result.Start, result.End, err = bounds(r) + if err != nil { + return nil, err + } + + if result.End.Before(result.Start) { + return nil, errEndBeforeStart + } + + result.Limit, err = limit(r) + if err != nil { + return nil, err + } + + result.Direction, err = direction(r) + if err != nil { + return nil, err + } + + result.Step, err = step(r, result.Start, result.End) + if err != nil { + return nil, err + } + + if result.Step <= 0 { + return nil, errNegativeStep + } + + // For safety, limit the number of returned points per timeseries. + // This is sufficient for 60s resolution for a week or 1h resolution for a year. + if (result.End.Sub(result.Start) / result.Step) > 11000 { + return nil, errStepTooSmall + } + + return &result, nil +} diff --git a/pkg/loghttp/query_test.go b/pkg/loghttp/query_test.go new file mode 100644 index 000000000000..09150c7b20b4 --- /dev/null +++ b/pkg/loghttp/query_test.go @@ -0,0 +1,112 @@ +package loghttp + +import ( + "net/http" + "net/url" + "reflect" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" +) + +func TestParseRangeQuery(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + r *http.Request + want *RangeQuery + wantErr bool + }{ + {"bad start", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=t`)}, nil, true}, + {"bad end", &http.Request{URL: mustParseURL(`?query={foo="bar"}&end=t`)}, nil, true}, + {"end before start", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2015-06-10T21:42:24.760738998Z`)}, nil, true}, + {"bad limit", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=h`)}, nil, true}, + {"bad direction", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=fw`), + }, nil, true}, + {"bad step", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=FORWARD&step=h`), + }, nil, true}, + {"negative step", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=-1`), + }, nil, true}, + {"too small step", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&end=2017-06-10T21:42:24.760738998Z&limit=100&direction=BACKWARD&step=1`), + }, nil, true}, + {"good", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&end=2017-07-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD&step=3600`), + }, &RangeQuery{ + Step: time.Hour, + Query: `{foo="bar"}`, + Direction: logproto.BACKWARD, + Start: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC), + End: time.Date(2017, 07, 10, 21, 42, 24, 760738998, time.UTC), + Limit: 1000, + }, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseRangeQuery(tt.r) + if (err != nil) != tt.wantErr { + t.Errorf("ParseRangeQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParseRangeQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestParseInstantQuery(t *testing.T) { + + tests := []struct { + name string + r *http.Request + want *InstantQuery + wantErr bool + }{ + {"bad time", &http.Request{URL: mustParseURL(`?query={foo="bar"}&time=t`)}, nil, true}, + {"bad limit", &http.Request{URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=h`)}, nil, true}, + {"bad direction", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=100&direction=fw`), + }, nil, true}, + {"good", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&time=2017-06-10T21:42:24.760738998Z&limit=1000&direction=BACKWARD`), + }, &InstantQuery{ + Query: `{foo="bar"}`, + Direction: logproto.BACKWARD, + Ts: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC), + Limit: 1000, + }, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseInstantQuery(tt.r) + if (err != nil) != tt.wantErr { + t.Errorf("ParseInstantQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParseInstantQuery() = %v, want %v", got, tt.want) + } + }) + } +} + +func mustParseURL(u string) *url.URL { + url, err := url.Parse(u) + if err != nil { + panic(err) + } + return url +} diff --git a/pkg/loghttp/tail.go b/pkg/loghttp/tail.go index 40205592964b..d6ef5f2c9cf1 100644 --- a/pkg/loghttp/tail.go +++ b/pkg/loghttp/tail.go @@ -3,8 +3,15 @@ package loghttp import ( "encoding/json" "fmt" + "net/http" "strconv" "time" + + "github.com/grafana/loki/pkg/logproto" +) + +const ( + maxDelayForInTailing = 5 ) // TailResponse represents the http json response to a tail query @@ -53,3 +60,29 @@ func (s *DroppedStream) UnmarshalJSON(data []byte) error { return nil } + +// ParseTailQuery parses a TailRequest request from an http request. +func ParseTailQuery(r *http.Request) (*logproto.TailRequest, error) { + var err error + req := logproto.TailRequest{ + Query: query(r), + } + + req.Limit, err = limit(r) + if err != nil { + return nil, err + } + + req.Start, _, err = bounds(r) + if err != nil { + return nil, err + } + req.DelayFor, err = tailDelay(r) + if err != nil { + return nil, err + } + if req.DelayFor > maxDelayForInTailing { + return nil, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing) + } + return &req, nil +} diff --git a/pkg/loghttp/tail_test.go b/pkg/loghttp/tail_test.go new file mode 100644 index 000000000000..36de31d16fc4 --- /dev/null +++ b/pkg/loghttp/tail_test.go @@ -0,0 +1,53 @@ +package loghttp + +import ( + "net/http" + "reflect" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" +) + +func TestParseTailQuery(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + r *http.Request + want *logproto.TailRequest + wantErr bool + }{ + {"bad time", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=t`)}, nil, true}, + {"bad limit", &http.Request{URL: mustParseURL(`?query={foo="bar"}&start=2016-06-10T21:42:24.760738998Z&limit=h`)}, nil, true}, + {"bad delay", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=100&delay_for=fw`), + }, nil, true}, + {"too much delay", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&time=2016-06-10T21:42:24.760738998Z&limit=100&delay_for=20`), + }, nil, true}, + {"good", + &http.Request{ + URL: mustParseURL(`?query={foo="bar"}&start=2017-06-10T21:42:24.760738998Z&limit=1000&delay_for=5`), + }, &logproto.TailRequest{ + Query: `{foo="bar"}`, + DelayFor: 5, + Start: time.Date(2017, 06, 10, 21, 42, 24, 760738998, time.UTC), + Limit: 1000, + }, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseTailQuery(tt.r) + if (err != nil) != tt.wantErr { + t.Errorf("ParseTailQuery() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParseTailQuery() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 26ef87b80dc6..ca232d3b17e8 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -2,239 +2,44 @@ package querier import ( "context" - "fmt" - "math" "net/http" - "net/url" - "strconv" - "strings" "time" "github.com/grafana/loki/pkg/loghttp" loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/marshal" marshal_legacy "github.com/grafana/loki/pkg/logql/marshal/legacy" "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" - "github.com/gorilla/mux" "github.com/gorilla/websocket" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/httpgrpc/server" ) const ( - defaultQueryLimit = 100 - defaultSince = 1 * time.Hour - wsPingPeriod = 1 * time.Second - maxDelayForInTailing = 5 + wsPingPeriod = 1 * time.Second ) -// nolint -func intParam(values url.Values, name string, def int) (int, error) { - value := values.Get(name) - if value == "" { - return def, nil - } - - return strconv.Atoi(value) -} - -func unixNanoTimeParam(values url.Values, name string, def time.Time) (time.Time, error) { - value := values.Get(name) - if value == "" { - return def, nil - } - - if strings.Contains(value, ".") { - if t, err := strconv.ParseFloat(value, 64); err == nil { - s, ns := math.Modf(t) - ns = math.Round(ns*1000) / 1000 - return time.Unix(int64(s), int64(ns*float64(time.Second))), nil - } - } - nanos, err := strconv.ParseInt(value, 10, 64) - if err != nil { - if ts, err := time.Parse(time.RFC3339Nano, value); err == nil { - return ts, nil - } - return time.Time{}, err - } - if len(value) <= 10 { - return time.Unix(nanos, 0), nil - } - return time.Unix(0, nanos), nil -} - -// nolint -func directionParam(values url.Values, name string, def logproto.Direction) (logproto.Direction, error) { - value := values.Get(name) - if value == "" { - return def, nil - } - - d, ok := logproto.Direction_value[strings.ToUpper(value)] - if !ok { - return logproto.FORWARD, fmt.Errorf("invalid direction '%s'", value) - } - return logproto.Direction(d), nil -} - -// defaultQueryRangeStep returns the default step used in the query range API, -// which is dinamically calculated based on the time range -func defaultQueryRangeStep(start time.Time, end time.Time) int { - return int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1)) -} - -func httpRequestToInstantQueryRequest(httpRequest *http.Request) (*instantQueryRequest, error) { - params := httpRequest.URL.Query() - queryRequest := instantQueryRequest{ - query: params.Get("query"), - } - - limit, err := intParam(params, "limit", defaultQueryLimit) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - queryRequest.limit = uint32(limit) - - queryRequest.ts, err = unixNanoTimeParam(params, "time", time.Now()) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - - queryRequest.direction, err = directionParam(params, "direction", logproto.BACKWARD) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - - return &queryRequest, nil -} - -func httpRequestToRangeQueryRequest(httpRequest *http.Request) (*rangeQueryRequest, error) { - var err error - - params := httpRequest.URL.Query() - queryRequest := rangeQueryRequest{ - query: params.Get("query"), - } - - queryRequest.limit, queryRequest.start, queryRequest.end, err = httpRequestToLookback(httpRequest) - if err != nil { - return nil, err - } - - step, err := intParam(params, "step", defaultQueryRangeStep(queryRequest.start, queryRequest.end)) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - queryRequest.step = time.Duration(step) * time.Second - - queryRequest.direction, err = directionParam(params, "direction", logproto.BACKWARD) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - - return &queryRequest, nil -} - -func httpRequestToTailRequest(httpRequest *http.Request) (*logproto.TailRequest, error) { - params := httpRequest.URL.Query() - tailRequest := logproto.TailRequest{ - Query: params.Get("query"), - } - var err error - tailRequest.Limit, tailRequest.Start, _, err = httpRequestToLookback(httpRequest) - if err != nil { - return nil, err - } - - // delay_for is used to allow server to let slow loggers catch up. - // Entries would be accumulated in a heap until they become older than now()- - delayFor, err := intParam(params, "delay_for", 0) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - - tailRequest.DelayFor = uint32(delayFor) - - return &tailRequest, nil -} - -func httpRequestToLookback(httpRequest *http.Request) (limit uint32, start, end time.Time, err error) { - params := httpRequest.URL.Query() - now := time.Now() - - lim, err := intParam(params, "limit", defaultQueryLimit) - if err != nil { - return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - limit = uint32(lim) - - start, err = unixNanoTimeParam(params, "start", now.Add(-defaultSince)) - if err != nil { - return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - - end, err = unixNanoTimeParam(params, "end", now) - if err != nil { - return 0, time.Now(), time.Now(), httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - return -} - -// parseRegexQuery parses regex and query querystring from httpRequest and returns the combined LogQL query. -// This is used only to keep regexp query string support until it gets fully deprecated. -func parseRegexQuery(httpRequest *http.Request) (string, error) { - params := httpRequest.URL.Query() - query := params.Get("query") - regexp := params.Get("regexp") - if regexp != "" { - expr, err := logql.ParseLogSelector(query) - if err != nil { - return "", err - } - query = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp).String() - } - return query, nil -} - type QueryResponse struct { ResultType promql.ValueType `json:"resultType"` Result promql.Value `json:"result"` } -type rangeQueryRequest struct { - query string - start, end time.Time - step time.Duration - limit uint32 - direction logproto.Direction -} - -type instantQueryRequest struct { - query string - ts time.Time - limit uint32 - direction logproto.Direction -} - // RangeQueryHandler is a http.HandlerFunc for range queries. func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { // Enforce the query timeout while querying backends ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() - request, err := httpRequestToRangeQueryRequest(r) + request, err := loghttp.ParseRangeQuery(r) if err != nil { - server.WriteError(w, err) + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) return } - query := q.engine.NewRangeQuery(q, request.query, request.start, request.end, request.step, request.direction, request.limit) + query := q.engine.NewRangeQuery(q, request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -253,12 +58,12 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() - request, err := httpRequestToInstantQueryRequest(r) + request, err := loghttp.ParseInstantQuery(r) if err != nil { - server.WriteError(w, err) + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) return } - query := q.engine.NewInstantQuery(q, request.query, request.ts, request.direction, request.limit) + query := q.engine.NewInstantQuery(q, request.Query, request.Ts, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -277,18 +82,18 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout)) defer cancel() - request, err := httpRequestToRangeQueryRequest(r) + request, err := loghttp.ParseRangeQuery(r) if err != nil { - server.WriteError(w, err) + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) return } - request.query, err = parseRegexQuery(r) + request.Query, err = parseRegexQuery(r) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) return } - query := q.engine.NewRangeQuery(q, request.query, request.start, request.end, request.step, request.direction, request.limit) + query := q.engine.NewRangeQuery(q, request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -303,27 +108,11 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) { // LabelHandler is a http.HandlerFunc for handling label queries. func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) { - name, ok := mux.Vars(r)["name"] - params := r.URL.Query() - now := time.Now() - req := &logproto.LabelRequest{ - Values: ok, - Name: name, - } - - end, err := unixNanoTimeParam(params, "end", now) - if err != nil { - http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) - return - } - req.End = &end - - start, err := unixNanoTimeParam(params, "start", end.Add(-6*time.Hour)) + req, err := loghttp.ParseLabelQuery(r) if err != nil { http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) return } - req.Start = &start resp, err := q.Label(r.Context(), req) if err != nil { @@ -348,24 +137,18 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { CheckOrigin: func(r *http.Request) bool { return true }, } - tailRequestPtr, err := httpRequestToTailRequest(r) + req, err := loghttp.ParseTailQuery(r) if err != nil { - server.WriteError(w, err) + http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest) return } - tailRequestPtr.Query, err = parseRegexQuery(r) + req.Query, err = parseRegexQuery(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - if tailRequestPtr.DelayFor > maxDelayForInTailing { - server.WriteError(w, fmt.Errorf("delay_for can't be greater than %d", maxDelayForInTailing)) - level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) - return - } - conn, err := upgrader.Upgrade(w, r, nil) if err != nil { level.Error(util.Logger).Log("msg", "Error in upgrading websocket", "err", err) @@ -378,11 +161,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { } }() - // response from httpRequestToQueryRequest is a ptr, if we keep passing pointer down the call then it would stay on - // heap until connection to websocket stays open - tailRequest := *tailRequestPtr - - tailer, err := q.Tail(r.Context(), &tailRequest) + tailer, err := q.Tail(r.Context(), req) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { level.Error(util.Logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) @@ -437,3 +216,19 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) { } } } + +// parseRegexQuery parses regex and query querystring from httpRequest and returns the combined LogQL query. +// This is used only to keep regexp query string support until it gets fully deprecated. +func parseRegexQuery(httpRequest *http.Request) (string, error) { + params := httpRequest.URL.Query() + query := params.Get("query") + regexp := params.Get("regexp") + if regexp != "" { + expr, err := logql.ParseLogSelector(query) + if err != nil { + return "", err + } + query = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp).String() + } + return query, nil +} diff --git a/pkg/querier/http_test.go b/pkg/querier/http_test.go deleted file mode 100644 index 6a17bb21004f..000000000000 --- a/pkg/querier/http_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package querier - -import ( - "net/http/httptest" - "testing" - "time" - - "github.com/grafana/loki/pkg/logproto" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestHttp_defaultQueryRangeStep(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - start time.Time - end time.Time - expected int - }{ - "should not be lower then 1s": { - start: time.Unix(60, 0), - end: time.Unix(60, 0), - expected: 1, - }, - "should return 1s if input time range is 5m": { - start: time.Unix(60, 0), - end: time.Unix(360, 0), - expected: 1, - }, - "should return 14s if input time range is 1h": { - start: time.Unix(60, 0), - end: time.Unix(3660, 0), - expected: 14, - }, - } - - for testName, testData := range tests { - testData := testData - - t.Run(testName, func(t *testing.T) { - assert.Equal(t, testData.expected, defaultQueryRangeStep(testData.start, testData.end)) - }) - } -} - -func TestHttp_httpRequestToRangeQueryRequest(t *testing.T) { - t.Parallel() - - tests := map[string]struct { - reqPath string - expected *rangeQueryRequest - }{ - "should set the default step based on the input time range if the step parameter is not provided": { - reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000", - expected: &rangeQueryRequest{ - query: "{}", - start: time.Unix(0, 0), - end: time.Unix(3600, 0), - step: 14 * time.Second, - limit: 100, - direction: logproto.BACKWARD, - }, - }, - "should use the input step parameter if provided": { - reqPath: "/loki/api/v1/query_range?query={}&start=0&end=3600000000000&step=5", - expected: &rangeQueryRequest{ - query: "{}", - start: time.Unix(0, 0), - end: time.Unix(3600, 0), - step: 5 * time.Second, - limit: 100, - direction: logproto.BACKWARD, - }, - }, - } - - for testName, testData := range tests { - testData := testData - - t.Run(testName, func(t *testing.T) { - req := httptest.NewRequest("GET", testData.reqPath, nil) - actual, err := httpRequestToRangeQueryRequest(req) - - require.NoError(t, err) - assert.Equal(t, testData.expected, actual) - }) - } -}