diff --git a/cmd/otelbench/logqlbench/logqlbench.go b/cmd/otelbench/logqlbench/logqlbench.go index 0ea91774..462d190b 100644 --- a/cmd/otelbench/logqlbench/logqlbench.go +++ b/cmd/otelbench/logqlbench/logqlbench.go @@ -40,6 +40,7 @@ type LogQLBenchmark struct { client *lokiapi.Client start time.Time end time.Time + limit int } // Setup setups benchmark using given flags. @@ -53,6 +54,7 @@ func (p *LogQLBenchmark) Setup(cmd *cobra.Command) error { if p.end, err = lokihandler.ParseTimestamp(p.EndTime, time.Time{}); err != nil { return errors.Wrap(err, "parse end time") } + p.limit = 1000 p.tracker, err = chtracker.Setup[Query](ctx, "logql", p.TrackerOptions) if err != nil { @@ -131,12 +133,13 @@ func (p *LogQLBenchmark) Run(ctx context.Context) error { var reports []LogQLReportQuery if err := p.tracker.Report(ctx, func(ctx context.Context, tq chtracker.TrackedQuery[Query], queries []chtracker.QueryReport) error { + header := tq.Meta.Header() reports = append(reports, LogQLReportQuery{ - ID: tq.Meta.ID, - Title: tq.Meta.Title, - Description: tq.Meta.Description, - Query: tq.Meta.Query, - Matchers: tq.Meta.Match, + ID: header.ID, + Title: header.Title, + Description: header.Description, + Query: tq.Meta.Query(), + Matchers: tq.Meta.Matchers(), DurationNanos: tq.Duration.Nanoseconds(), Queries: queries, Timeout: tq.Timeout, diff --git a/cmd/otelbench/logqlbench/queries.go b/cmd/otelbench/logqlbench/queries.go index 469cd412..fd49686d 100644 --- a/cmd/otelbench/logqlbench/queries.go +++ b/cmd/otelbench/logqlbench/queries.go @@ -8,39 +8,311 @@ import ( "github.com/go-faster/errors" "github.com/go-faster/yaml" + "github.com/go-faster/oteldb/internal/lokiapi" "github.com/go-faster/oteldb/internal/lokihandler" ) -// ConfigQuery defines LogQL query parameters. -type ConfigQuery struct { - Title string `yaml:"title,omitempty"` - Description string `yaml:"description,omitempty"` - Start string `yaml:"start,omitempty"` - End string `yaml:"end,omitempty"` - Step time.Duration `yaml:"step,omitempty"` - Query string `yaml:"query,omitempty"` - Match []string `yaml:"match,omitempty"` +// Query is a benchmarked query. +type Query interface { + Header() QueryHeader + Query() string + Matchers() []string + Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error } -// Query is a benchmarked query. -type Query struct { - ID int - Type string +var _ = []Query{ + &InstantQuery{}, + &RangeQuery{}, + &SeriesQuery{}, + &LabelsQuery{}, + &LabelValuesQuery{}, +} + +// QueryHeader is common for all queries. +type QueryHeader struct { + ID int `yaml:"-"` + + Title string `yaml:"title,omitempty"` + Description string `yaml:"description,omitempty"` +} + +// InstantQuery is an instant (`/loki/api/v1/query`) query. +type InstantQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + LogQL string `yaml:"query,omitempty"` +} + +// Header returns the query header. +func (q *InstantQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *InstantQuery) Query() string { + return q.LogQL +} + +// Matchers returns selectors for the query. +func (q *InstantQuery) Matchers() []string { + return nil +} + +// Execute executes the instant query. +func (q *InstantQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + + resp, err := client.Query(ctx, lokiapi.QueryParams{ + Time: toLokiTimestamp(start), + Query: q.LogQL, + Limit: lokiapi.NewOptInt(p.limit), + }) + if err != nil { + return errors.Wrap(err, "query") + } + + if isEmptyQueryResponse(resp.Data) && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +// RangeQuery is a range (`/loki/api/v1/query_range`) query. +type RangeQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Step time.Duration `yaml:"step,omitempty"` + LogQL string `yaml:"query,omitempty"` +} + +// Header returns the query header. +func (q *RangeQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *RangeQuery) Query() string { + return q.LogQL +} + +// Matchers returns selectors for the query. +func (q *RangeQuery) Matchers() []string { + return nil +} + +// Execute executes the range query. +func (q *RangeQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } + + resp, err := client.QueryRange(ctx, lokiapi.QueryRangeParams{ + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + Query: q.LogQL, + Step: toLokiDuration(q.Step), + Limit: lokiapi.NewOptInt(p.limit), + }) + if err != nil { + return errors.Wrap(err, "query range") + } + + if isEmptyQueryResponse(resp.Data) && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +func isEmptyQueryResponse(data lokiapi.QueryResponseData) bool { + switch typ := data.Type; typ { + case lokiapi.StreamsResultQueryResponseData: + streams := data.StreamsResult + return len(streams.Result) == 0 + case lokiapi.ScalarResultQueryResponseData: + return false + case lokiapi.VectorResultQueryResponseData: + vector := data.VectorResult + return len(vector.Result) == 0 + case lokiapi.MatrixResultQueryResponseData: + matrix := data.MatrixResult + return len(matrix.Result) == 0 + default: + return true + } +} + +// SeriesQuery is a series (`/loki/api/v1/series`) query. +type SeriesQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Match []string `yaml:"match,omitempty"` +} + +// Header returns the query header. +func (q *SeriesQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *SeriesQuery) Query() string { + return "" +} + +// Matchers returns selectors for the query. +func (q *SeriesQuery) Matchers() []string { + return q.Match +} + +// Execute executes the series query. +func (q *SeriesQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } - Title string - Description string - Start time.Time - End time.Time - Step time.Duration - Query string - Match []string + resp, err := client.Series(ctx, lokiapi.SeriesParams{ + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + Match: q.Match, + }) + if err != nil { + return errors.Wrap(err, "query series") + } + + if len(resp.Data) == 0 && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +// LabelsQuery is a labels (`/loki/api/v1/labels`) query. +type LabelsQuery struct { + QueryHeader `yaml:"header,inline"` + + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` +} + +// Header returns the query header. +func (q *LabelsQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *LabelsQuery) Query() string { + return "" +} + +// Matchers returns selectors for the query. +func (q *LabelsQuery) Matchers() []string { + return nil +} + +// Execute executes the labels query. +func (q *LabelsQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } + + resp, err := client.Labels(ctx, lokiapi.LabelsParams{ + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + }) + if err != nil { + return errors.Wrap(err, "query labels") + } + + if len(resp.Data) == 0 && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil +} + +// LabelValuesQuery is a label values (`/loki/api/v1/label_values`) query. +type LabelValuesQuery struct { + QueryHeader `yaml:"header,inline"` + + Name string `yaml:"name"` + Start string `yaml:"start,omitempty"` + End string `yaml:"end,omitempty"` + Match string `yaml:"match,omitempty"` +} + +// Header returns the query header. +func (q *LabelValuesQuery) Header() QueryHeader { + return q.QueryHeader +} + +// Query returns the query string. +func (q *LabelValuesQuery) Query() string { + return q.Name +} + +// Matchers returns selectors for the query. +func (q *LabelValuesQuery) Matchers() []string { + return []string{q.Match} +} + +// Execute executes the label values query. +func (q *LabelValuesQuery) Execute(ctx context.Context, client *lokiapi.Client, p *LogQLBenchmark) error { + start, err := lokihandler.ParseTimestamp(q.Start, p.start) + if err != nil { + return errors.Wrap(err, "parse start") + } + end, err := lokihandler.ParseTimestamp(q.End, p.end) + if err != nil { + return errors.Wrap(err, "parse end") + } + var matcher lokiapi.OptString + if q.Match != "" { + matcher.SetTo(q.Match) + } + + resp, err := client.LabelValues(ctx, lokiapi.LabelValuesParams{ + Name: q.Name, + Start: toLokiTimestamp(start), + End: toLokiTimestamp(end), + Query: matcher, + }) + if err != nil { + return errors.Wrap(err, "query label values") + } + + if len(resp.Data) == 0 && !p.AllowEmpty { + return errors.New("unexpected empty data") + } + return nil } // Input defines queries config. type Input struct { - Instant []ConfigQuery `yaml:"instant"` - Range []ConfigQuery `yaml:"range"` - Series []ConfigQuery `yaml:"series"` + Instant []InstantQuery `yaml:"instant"` + Range []RangeQuery `yaml:"range"` + Series []SeriesQuery `yaml:"series"` + Labels []LabelsQuery `yaml:"labels"` + LabelValues []LabelValuesQuery `yaml:"label_values"` } func (p *LogQLBenchmark) each(ctx context.Context, fn func(ctx context.Context, q Query) error) error { @@ -54,58 +326,59 @@ func (p *LogQLBenchmark) each(ctx context.Context, fn func(ctx context.Context, return errors.Wrap(err, "unmarshal input") } - var id int - mapQuery := func(typ string, cq ConfigQuery) (Query, error) { - q := Query{ - Type: typ, - ID: id, - Title: cq.Title, - Description: cq.Description, - Step: cq.Step, - Query: cq.Query, + var ( + id int + nextID = func() (r int) { + r = id + id++ + return } + ) - var err error - q.Start, err = lokihandler.ParseTimestamp(cq.Start, p.start) - if err != nil { - return q, errors.Wrap(err, "parse start") - } - q.End, err = lokihandler.ParseTimestamp(cq.End, p.end) - if err != nil { - return q, errors.Wrap(err, "parse end") - } + for i := range input.Instant { + q := &input.Instant[i] + q.ID = nextID() - id++ - q.ID = id - return q, nil + if err := fn(ctx, q); err != nil { + return errors.Wrapf(err, "instant query %d: %q", i, q.Query()) + } } - for _, cq := range input.Instant { - q, err := mapQuery("instant", cq) - if err != nil { - return err - } + for i := range input.Range { + q := &input.Range[i] + q.ID = nextID() + if err := fn(ctx, q); err != nil { - return errors.Wrap(err, "callback") + return errors.Wrapf(err, "range query %d: %q", i, q.Query()) } } - for _, cq := range input.Range { - q, err := mapQuery("range", cq) - if err != nil { - return err - } + + for i := range input.Series { + q := &input.Series[i] + q.ID = nextID() + if err := fn(ctx, q); err != nil { - return errors.Wrap(err, "callback") + return errors.Wrapf(err, "series query %d: %#v", i, q.Matchers()) } } - for _, cq := range input.Series { - q, err := mapQuery("series", cq) - if err != nil { - return err + + for i := range input.Labels { + q := &input.Labels[i] + q.ID = nextID() + + if err := fn(ctx, q); err != nil { + return errors.Wrapf(err, "labels query %d", i) } + } + + for i := range input.LabelValues { + q := &input.LabelValues[i] + q.ID = nextID() + if err := fn(ctx, q); err != nil { - return errors.Wrap(err, "callback") + return errors.Wrapf(err, "label values query %d: %q", i, q.Query()) } } + return nil } diff --git a/cmd/otelbench/logqlbench/send.go b/cmd/otelbench/logqlbench/send.go index 7f43c56e..f0918698 100644 --- a/cmd/otelbench/logqlbench/send.go +++ b/cmd/otelbench/logqlbench/send.go @@ -2,12 +2,9 @@ package logqlbench import ( "context" - "fmt" "strconv" "time" - "github.com/go-faster/errors" - "github.com/go-faster/oteldb/internal/lokiapi" ) @@ -33,87 +30,5 @@ func (p *LogQLBenchmark) send(ctx context.Context, q Query) error { ctx, cancel := context.WithTimeout(ctx, p.RequestTimeout) defer cancel() - isEmpty := func(data lokiapi.QueryResponseData) bool { - switch typ := data.Type; typ { - case lokiapi.StreamsResultQueryResponseData: - streams := data.StreamsResult - return len(streams.Result) == 0 - case lokiapi.ScalarResultQueryResponseData: - return false - case lokiapi.VectorResultQueryResponseData: - vector := data.VectorResult - return len(vector.Result) == 0 - case lokiapi.MatrixResultQueryResponseData: - matrix := data.MatrixResult - return len(matrix.Result) == 0 - default: - return true - } - } - - const limit = 1000 - switch q.Type { - case "instant": - queryInfo := fmt.Sprintf("instant %q (start: %s, limit: %d)", - q.Query, - q.Start, limit, - ) - - resp, err := p.client.Query(ctx, lokiapi.QueryParams{ - Query: q.Query, - Time: toLokiTimestamp(q.Start), - Limit: lokiapi.NewOptInt(limit), - }) - if err != nil { - return errors.Wrapf(err, "send %s", queryInfo) - } - - if isEmpty(resp.Data) && !p.AllowEmpty { - return errors.Errorf("unexpected empty data: %s", queryInfo) - } - return nil - case "range": - queryInfo := fmt.Sprintf("range %q (start: %s, end: %s, step: %s, limit: %d)", - q.Query, - q.Start, q.End, q.Step, - limit, - ) - - resp, err := p.client.QueryRange(ctx, lokiapi.QueryRangeParams{ - Query: q.Query, - Start: toLokiTimestamp(q.Start), - End: toLokiTimestamp(q.End), - Step: toLokiDuration(q.Step), - Limit: lokiapi.NewOptInt(limit), - }) - if err != nil { - return errors.Wrapf(err, "send %s", queryInfo) - } - - if isEmpty(resp.Data) && !p.AllowEmpty { - return errors.Errorf("unexpected empty data: %s", queryInfo) - } - return nil - case "series": - queryInfo := fmt.Sprintf("series %v (start: %s, end: %s)", - q.Match, - q.Start, q.End, - ) - - resp, err := p.client.Series(ctx, lokiapi.SeriesParams{ - Start: toLokiTimestamp(q.Start), - End: toLokiTimestamp(q.End), - Match: q.Match, - }) - if err != nil { - return errors.Wrapf(err, "send %s", queryInfo) - } - - if len(resp.Data) == 0 && !p.AllowEmpty { - return errors.Errorf("unexpected empty data: %s", queryInfo) - } - return nil - default: - return errors.Errorf("unknown query type %q", q.Type) - } + return q.Execute(ctx, p.client, p) }