Skip to content

Commit

Permalink
Merge branch 'main' into issue-97/sorting-variables
Browse files Browse the repository at this point in the history
  • Loading branch information
hagen1778 authored Nov 20, 2024
2 parents 77bcf9d + 7d55c66 commit c2e125b
Show file tree
Hide file tree
Showing 20 changed files with 1,173 additions and 49 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## tip

* FEATURE: add support for the `/select/logsql/stats_query` and `/select/logsql/stats_query_range` API calls. This feature helps to build different panels with statistic data. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/61).

* BUGFIX: implemented sorting for variables in Grafana to correct numerical data ordering. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/97).

## v0.7.0
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ golang-ci-lint: install-golang-ci-lint
golangci-lint run ./pkg/...

install-golang-ci-lint:
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.51.2
which golangci-lint || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin v1.60.3

fmt:
gofmt -l -w -s ./pkg
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The VictoriaLogs datasource plugin allows you to query and visualize
[VictoriaLogs](https://docs.victoriametrics.com/victorialogs/) data in [Grafana](https://grafana.com).

* [Installation](#installation)
* [Getting started development](#getting-started-development)
* [How to make new release](#how-to-make-new-release)
* [Notes](#notes)
* [License](#license)
Expand Down Expand Up @@ -145,7 +146,7 @@ sidecar:

See more about chart settings [here](https://github.com/grafana/helm-charts/blob/541d97051de87a309362e02d08741ffc868cfcd6/charts/grafana/values.yaml)

Option 4 would be to build custom Grafana image with plugin based on same installation instructions.
Option 4. would be to build custom Grafana image with plugin based on same installation instructions.

#### Grafana operator

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/invopop/yaml v0.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jszwedko/go-datemath v0.1.1-0.20230526204004-640a500621d6 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/magefile/mage v1.15.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jszwedko/go-datemath v0.1.1-0.20230526204004-640a500621d6 h1:SwcnSwBR7X/5EHJQlXBockkJVIMRVt5yKaesBPMtyZQ=
github.com/jszwedko/go-datemath v0.1.1-0.20230526204004-640a500621d6/go.mod h1:WrYiIuiXUMIvTDAQw97C+9l0CnBmCcvosPjN3XDqS/o=
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down
34 changes: 20 additions & 14 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ type Datasource struct {
// managed channel path – thus plugin can check subscribe permissions and communicate
// options with Grafana Core. As soon as first subscriber joins channel RunStream
// will be called.
func (d *Datasource) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
func (d *Datasource) SubscribeStream(_ context.Context, _ *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusOK,
}, nil
}

// PublishStream called when a user tries to publish to a plugin/datasource
// managed channel path.
func (d *Datasource) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
func (d *Datasource) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
return &backend.PublishStreamResponse{
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
Expand Down Expand Up @@ -129,11 +129,17 @@ func (d *Datasource) QueryData(ctx context.Context, req *backend.QueryDataReques

var wg sync.WaitGroup
for _, q := range req.Queries {
rawQuery, err := d.getQueryFromRaw(q.JSON)
if err != nil {
return nil, err
}
rawQuery.DataQuery = q

wg.Add(1)
go func(q backend.DataQuery) {
go func(rawQuery *Query) {
defer wg.Done()
response.Responses[q.RefID] = d.query(ctx, req.PluginContext, q)
}(q)
response.Responses[rawQuery.RefID] = d.query(ctx, req.PluginContext, rawQuery)
}(rawQuery)
}
wg.Wait()

Expand Down Expand Up @@ -221,14 +227,7 @@ func (d *Datasource) datasourceQuery(ctx context.Context, q *Query, isStream boo
}

// query sends a query to the datasource and returns the result.
func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, query backend.DataQuery) backend.DataResponse {
q, err := d.getQueryFromRaw(query.JSON)
if err != nil {
return newResponseError(err, backend.StatusBadRequest)
}

q.TimeRange = TimeRange(query.TimeRange)

func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, q *Query) backend.DataResponse {
r, err := d.datasourceQuery(ctx, q, false)
if err != nil {
return newResponseError(err, backend.StatusInternal)
Expand All @@ -240,7 +239,14 @@ func (d *Datasource) query(ctx context.Context, _ backend.PluginContext, query b
}
}()

return parseInstantResponse(r)
switch QueryType(q.QueryType) {
case QueryTypeStats:
return parseStatsResponse(r, q)
case QueryTypeStatsRange:
return parseStatsResponse(r, q)
default:
return parseInstantResponse(r)
}
}

// CheckHealth handles health checks sent from Grafana to the plugin.
Expand Down
8 changes: 4 additions & 4 deletions pkg/plugin/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestQueryData(t *testing.T) {
context.Background(),
&backend.QueryDataRequest{
Queries: []backend.DataQuery{
{RefID: "A"},
{RefID: "A", JSON: []byte(`{"expr":"*", "intervalMs":5000, "maxDataPoints":984, "maxLines":1000, "queryType":"instant", "refId":"A"}`)},
},
},
)
Expand Down Expand Up @@ -104,7 +104,8 @@ func TestDatasourceQueryRequest(t *testing.T) {
rsp, gotErr := datasource.QueryData(ctx, &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
RefID: "A",
RefID: "A",
QueryType: "instant",
JSON: []byte(`{
"datasourceId":27,
"expr":".*",
Expand All @@ -113,7 +114,6 @@ func TestDatasourceQueryRequest(t *testing.T) {
"legendFormat":"",
"maxDataPoints":984,
"maxLines":1000,
"queryType":"range",
"refId":"A"
}`),
},
Expand Down Expand Up @@ -860,7 +860,7 @@ func TestDatasourceStreamTailProcess(t *testing.T) {
t.Fatalf("should not be called")
})

mux.HandleFunc("/select/logsql/tail", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/select/logsql/tail", func(w http.ResponseWriter, _ *http.Request) {
// we send 3 messages with 20ms delay
// simulate tail stream response
for i := 0; i < 3; i++ {
Expand Down
200 changes: 187 additions & 13 deletions pkg/plugin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,54 @@ import (
"fmt"
"net/url"
"path"
"regexp"
"sort"
"strconv"
"strings"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"

"github.com/VictoriaMetrics/victorialogs-datasource/pkg/utils"
)

const (
instantQueryPath = "/select/logsql/query"
tailQueryPath = "/select/logsql/tail"
defaultMaxLines = 1000
instantQueryPath = "/select/logsql/query"
tailQueryPath = "/select/logsql/tail"
statsQueryPath = "/select/logsql/stats_query"
statsQueryRangePath = "/select/logsql/stats_query_range"
defaultMaxLines = 1000
legendFormatAuto = "__auto"
metricsName = "__name__"
defaultInterval = 15 * time.Second
)

// QueryType represents query type
type QueryType string

const (
// QueryTypeInstant represents instant query type
QueryTypeInstant QueryType = "instant"
// QueryTypeStats represents stats query type
QueryTypeStats QueryType = "stats"
// QueryTypeStatsRange represents stats range query type
QueryTypeStatsRange QueryType = "statsRange"
)

// Query represents backend query object
type Query struct {
RefID string `json:"refId"`
backend.DataQuery `json:"inline"`

Expr string `json:"expr"`
LegendFormat string `json:"legendFormat"`
TimeInterval string `json:"timeInterval"`
Interval string `json:"interval"`
IntervalMs int64 `json:"intervalMs"`
MaxLines int `json:"maxLines"`
TimeRange TimeRange
IntervalMs int `json:"intervalMs"`
url *url.URL
}

// TimeRange represents time range backend object
type TimeRange struct {
From time.Time
To time.Time
}

// GetQueryURL calculates step and clear expression from template variables,
// and after builds query url depends on query type
func (q *Query) getQueryURL(rawURL string, queryParams string) (string, error) {
Expand All @@ -50,7 +69,18 @@ func (q *Query) getQueryURL(rawURL string, queryParams string) (string, error) {

q.url = u

return q.queryInstantURL(params), nil
switch QueryType(q.QueryType) {
case QueryTypeStats:
return q.statsQueryURL(params), nil
case QueryTypeStatsRange:
minInterval, err := q.calculateMinInterval()
if err != nil {
return "", fmt.Errorf("failed to calculate minimal interval: %w", err)
}
return q.statsQueryRangeURL(params, minInterval), nil
default:
return q.queryInstantURL(params), nil
}
}

// queryInstantURL prepare query url for instant query
Expand Down Expand Up @@ -117,3 +147,147 @@ func (q *Query) queryInstantURL(queryParams url.Values) string {
q.url.RawQuery = values.Encode()
return q.url.String()
}

// statsQueryURL prepare query url for querying log stats
func (q *Query) statsQueryURL(queryParams url.Values) string {
q.url.Path = path.Join(q.url.Path, statsQueryPath)
values := q.url.Query()

for k, vl := range queryParams {
for _, v := range vl {
values.Add(k, v)
}
}

now := time.Now()
if q.TimeRange.From.IsZero() {
q.TimeRange.From = now.Add(-time.Minute * 5)
}

q.Expr = utils.ReplaceTemplateVariable(q.Expr, q.IntervalMs)
values.Set("query", q.Expr)
values.Set("time", strconv.FormatInt(q.TimeRange.To.Unix(), 10))

q.url.RawQuery = values.Encode()
return q.url.String()
}

// statsQueryRangeURL prepare query url for querying log range stats
func (q *Query) statsQueryRangeURL(queryParams url.Values, minInterval time.Duration) string {
q.url.Path = path.Join(q.url.Path, statsQueryRangePath)
values := q.url.Query()

for k, vl := range queryParams {
for _, v := range vl {
values.Add(k, v)
}
}

if q.MaxLines <= 0 {
q.MaxLines = defaultMaxLines
}

from := q.TimeRange.From
to := q.TimeRange.To

now := time.Now()
if q.TimeRange.From.IsZero() {
q.TimeRange.From = now.Add(-time.Minute * 5)
}
if q.TimeRange.To.IsZero() {
q.TimeRange.To = now
}

q.Expr = utils.ReplaceTemplateVariable(q.Expr, q.IntervalMs)
step := utils.CalculateStep(minInterval, from, to, q.MaxDataPoints)

values.Set("query", q.Expr)
values.Set("start", strconv.FormatInt(q.TimeRange.From.Unix(), 10))
values.Set("end", strconv.FormatInt(q.TimeRange.To.Unix(), 10))
values.Set("step", step.String())

q.url.RawQuery = values.Encode()
return q.url.String()
}

func (q *Query) addMetadataToMultiFrame(frame *data.Frame) {
if len(frame.Fields) < 2 {
return
}

customName := q.parseLegend(frame.Fields[1].Labels)
if customName != "" {
frame.Fields[1].Config = &data.FieldConfig{DisplayNameFromDS: customName}
}

frame.Name = customName
}

var legendReplacer = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)

func (q *Query) parseLegend(labels data.Labels) string {

switch {
case q.LegendFormat == legendFormatAuto:
return q.Expr
case q.LegendFormat != "":
result := legendReplacer.ReplaceAllStringFunc(q.LegendFormat, func(in string) string {
labelName := strings.Replace(in, "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, ok := labels[labelName]; ok {
return val
}
return ""
})
if result == "" {
return q.Expr
}
return result
default:
// If legend is empty brackets, use query expression
legend := labelsToString(labels)
if legend == "{}" {
return q.Expr
}
return legend
}
}

func labelsToString(labels data.Labels) string {
if labels == nil {
return "{}"
}

var labelStrings []string
for label, value := range labels {
if label == metricsName {
continue
}
labelStrings = append(labelStrings, fmt.Sprintf("%s=%q", label, value))
}

var metricName string
mn, ok := labels[metricsName]
if ok {
metricName = mn
}

if len(labelStrings) < 1 {
return metricName
}

sort.Strings(labelStrings)
lbs := strings.Join(labelStrings, ",")

return fmt.Sprintf("%s{%s}", metricName, lbs)
}

// calculateMinInterval tries to calculate interval from requested params
// in duration representation or return error if
func (q *Query) calculateMinInterval() (time.Duration, error) {
if utils.WithIntervalVariable(q.Interval) {
q.Interval = ""
}
return utils.GetIntervalFrom(q.TimeInterval, q.Interval, q.IntervalMs, defaultInterval)
}
Loading

0 comments on commit c2e125b

Please sign in to comment.