Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Fix tag matching correctness and optimization
Browse files Browse the repository at this point in the history
Previously, tag matching did not correctly handle
numeric types that were written as jsonb strings (#1467).
This commit fixes that by using the new _ps_trace.text_matches
function.

This also fixes the fact that the gin index was not used
on trace searches. This is because the query contained
OR conditions between the span tags on the span table
and the event tags on the event table. Postgres does
not support using indexes for such OR conditions. (
and this would be very hard to add support for)

We also tried using UNION ALL to optimize this,
but that approach falls apart when considering
matches on multiple tags (e.g. tag1=val1,tag2=val2)

In the end we switched to a 2 round protocol:
round 1: fetch tags and their types
round 2: add quals for tags only for the types present.

This avoids the case above for tags that don't have
the same key, values on both spans and events. As far
as we know, this is a rare case.

The other downside is that these queries now require 2
database calls. In the future, this can be optimized by
moving the query generation logic into PLPGSQL functions.

Other optimizations:
- if no tags are found in round 1 we can return an empty
  result early
- avoid join to the event table when not needed
- add quals on event times to the query for chunk exclusion
  • Loading branch information
cevian authored and niksajakovljevic committed Aug 29, 2022
1 parent 6ffd6c6 commit 1c9f929
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 84 deletions.
2 changes: 1 addition & 1 deletion EXTENSION_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.4
master
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.12.0 // indirect
github.com/spyzhov/ajson v0.7.1 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PaesslerAG/gval v1.0.0 h1:GEKnRwkWDdf9dOmKcNrar9EA1bz1z9DqPIO1+iLzhd8=
github.com/PaesslerAG/gval v1.0.0/go.mod h1:y/nm5yEyTeX6av0OfKJNp9rBNj2XrGhAf5+v24IBN1I=
github.com/PaesslerAG/jsonpath v0.1.0/go.mod h1:4BzmtoM/PI8fPO4aQGIusjGxGir2BzcV0grWtFzq1Y8=
github.com/PaesslerAG/jsonpath v0.1.1 h1:c1/AToHQMVsduPAa4Vh6xp2U0evy4t8SWp8imEsylIk=
github.com/PaesslerAG/jsonpath v0.1.1/go.mod h1:lVboNxFGal/VwW6d9JzIy56bUsYAP6tH/x80vjnCseY=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ=
Expand Down Expand Up @@ -1065,6 +1070,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
github.com/spyzhov/ajson v0.7.1 h1:1MDIlPc6x0zjNtpa7tDzRAyFAvRX+X8ZsvtYz5lZg6A=
github.com/spyzhov/ajson v0.7.1/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
10 changes: 9 additions & 1 deletion pkg/jaeger/store/find_trace_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ import (
)

func findTraceIDs(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
query, params := builder.findTraceIDsQuery(q)
tInfo, err := FindTagInfo(ctx, q, conn)
if err != nil {
return nil, fmt.Errorf("querying trace tags error: %w", err)
}
if tInfo == nil {
//tags cannot be matched
return []model.TraceID{}, nil
}
query, params := builder.findTraceIDsQuery(q, tInfo)
rows, err := conn.Query(ctx, query, params...)
if err != nil {
return nil, fmt.Errorf("querying traces: %w", err)
Expand Down
10 changes: 9 additions & 1 deletion pkg/jaeger/store/find_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ import (
)

func findTraces(ctx context.Context, builder *Builder, conn pgxconn.PgxConn, q *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
query, params := builder.findTracesQuery(q)
tInfo, err := FindTagInfo(ctx, q, conn)
if err != nil {
return nil, fmt.Errorf("querying trace tags error: %w", err)
}
if tInfo == nil {
//tags cannot be matched
return []*model.Trace{}, nil
}
query, params := builder.findTracesQuery(q, tInfo)
rows, err := conn.Query(ctx, query, params...)
if err != nil {
return nil, fmt.Errorf("querying traces error: %w query:\n%s", err, query)
Expand Down
4 changes: 4 additions & 0 deletions pkg/jaeger/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (p *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback t
return res, nil
}

func (p *Store) GetBuilder() *Builder {
return p.builder
}

func logError(err error) error {
if err != nil {
log.Error("msg", "Error in jaeger query GRPC response", "err", err)
Expand Down
168 changes: 168 additions & 0 deletions pkg/jaeger/store/tag_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package store

import (
"context"
"fmt"

"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/timescale/promscale/pkg/pgxconn"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
)

type tag struct {
k string
v string
jsonbPairArray [][]byte
isSpan bool
isResource bool
isEvent bool
}

type tagsInfo struct {
spanClauses []string
operationClauses []string
eventClauses []string
generalTags []*tag
params []interface{}
}

func FindTagInfo(ctx context.Context, q *spanstore.TraceQueryParameters, conn pgxconn.PgxConn) (*tagsInfo, error) {
tagsInfo := &tagsInfo{}

for k, v := range q.Tags {
switch k {
case TagError:
var sc ptrace.StatusCode
switch v {
case "true":
sc = ptrace.StatusCodeError
case "false":
sc = ptrace.StatusCodeUnset
default:
// Ignore tag if we don't match boolean.
continue
}
tagsInfo.params = append(tagsInfo.params, statusCodeToInternal(sc))
qual := fmt.Sprintf(`s.status_code = $%d`, len(tagsInfo.params))
tagsInfo.spanClauses = append(tagsInfo.spanClauses, qual)
case TagHostname:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a13030ca6208fa7d4477f0805f3b95e271f32b24/pkg/translator/jaeger/jaegerproto_to_traces.go#L109
tagsInfo.params = append(tagsInfo.params, semconv.AttributeHostName, "\""+v+"\"")
qual := fmt.Sprintf(`s.span_tags @> (SELECT _ps_trace.tag_v_eq_matching_tags($%d::text, $%d::jsonb))`, len(tagsInfo.params)-1, len(tagsInfo.params))
tagsInfo.spanClauses = append(tagsInfo.spanClauses, qual)
case TagJaegerVersion:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a13030ca6208fa7d4477f0805f3b95e271f32b24/pkg/translator/jaeger/jaegerproto_to_traces.go#L119
tagsInfo.params = append(tagsInfo.params, "opencensus.exporterversion", "\"Jaeger-"+v+"\"")
qual := fmt.Sprintf(`s.span_tags @> (SELECT _ps_trace.tag_v_eq_matching_tags($%d::text, $%d::jsonb))`, len(tagsInfo.params)-1, len(tagsInfo.params))
tagsInfo.spanClauses = append(tagsInfo.spanClauses, qual)
case TagSpanKind:
tagsInfo.params = append(tagsInfo.params, jSpanKindToInternal(v))
qual := fmt.Sprintf("op.span_kind = $%d", len(tagsInfo.params))
tagsInfo.operationClauses = append(tagsInfo.operationClauses, qual)
case TagW3CTraceState:
tagsInfo.params = append(tagsInfo.params, v)
qual := fmt.Sprintf(`s.trace_state = $%d`, len(tagsInfo.params))
tagsInfo.spanClauses = append(tagsInfo.spanClauses, qual)
case TagEventName:
tagsInfo.params = append(tagsInfo.params, v)
qual := fmt.Sprintf(`e.name = $%d`, len(tagsInfo.params))
tagsInfo.eventClauses = append(tagsInfo.eventClauses, qual)
default:
tagsInfo.generalTags = append(tagsInfo.generalTags, &tag{k: k, v: v})
}
}

if len(tagsInfo.generalTags) > 0 {
query, params := tagsInfo.BuildTagsQuery(q)
rows, err := conn.Query(ctx, query, params...)
if err != nil {
return nil, fmt.Errorf("querying traces error: %w query:\n%s", err, query)
}
defer rows.Close()
possibleToMatch, err := tagsInfo.scanTags(rows)
if err != nil {
return nil, fmt.Errorf("scanning traces error: %w query:\n%s", err, query)
}
if !possibleToMatch {
return nil, nil
}
}

return tagsInfo, nil
}

// BuildTagsQuery returns the query to be used to scan tags.
// Exposed for end to end tests.
func (t *tagsInfo) BuildTagsQuery(q *spanstore.TraceQueryParameters) (string, []interface{}) {
params := make([]interface{}, 0, 2)

keys := make([]string, len(t.generalTags))
values := make([]string, len(t.generalTags))
for i, tag := range t.generalTags {
keys[i] = tag.k
values[i] = tag.v
}
params = append(params, keys, values)
c := fmt.Sprintf(`
SELECT result.*
FROM unnest ($%d::text[], $%d::text[]) with ordinality as tag_qual(key, value, nr)
INNER JOIN LATERAL
(
SELECT
array_agg(pg_catalog.jsonb_build_object(tag.key_id, tag.id)) json_pair_array,
(bit_or(tag.tag_type) OPERATOR(pg_catalog.&) ps_trace.span_tag_type() != 0) is_span_type,
(bit_or(tag.tag_type) OPERATOR(pg_catalog.&) ps_trace.resource_tag_type() != 0) is_resource_type,
(bit_or(tag.tag_type) OPERATOR(pg_catalog.&) ps_trace.event_tag_type() != 0) is_event_type
FROM
(
SELECT v, _prom_ext.jsonb_digest(v) as digest
FROM unnest(_ps_trace.text_matches(tag_qual.value)) as v
) as matchers
INNER JOIN LATERAL (
SELECT t.*
FROM _ps_trace.tag t
WHERE t.key OPERATOR(pg_catalog.=) tag_qual.key
AND _prom_ext.jsonb_digest(t.value) OPERATOR(pg_catalog.=) matchers.digest
AND t.value OPERATOR(pg_catalog.=) matchers.v
LIMIT 1
) tag on (true)
) as result on (true)
ORDER BY tag_qual.nr`, len(params)-1, len(params))

return c, params
}

func (t *tagsInfo) scanTags(rows pgxconn.PgxRows) (bool, error) {
i := 0
for rows.Next() {
if rows.Err() != nil {
return false, fmt.Errorf("tag row iterator: %w", rows.Err())
}
tag := t.generalTags[i]
var isSpan *bool
var isResource *bool
var isEvent *bool
if err := rows.Scan(&tag.jsonbPairArray, &isSpan, &isResource, &isEvent); err != nil {
return false, fmt.Errorf("error scanning tags: %w", err)
}
if tag.jsonbPairArray == nil {
//such a tag does not exist. That means the tag predicate could not be fulfilled and the result
//should always be the empty set.
return false, nil
}
tag.isSpan = *isSpan
tag.isResource = *isResource
tag.isEvent = *isEvent
i++
}
if rows.Err() != nil {
return false, fmt.Errorf("tag row iterator: %w", rows.Err())
}

return true, nil
}
Loading

0 comments on commit 1c9f929

Please sign in to comment.