Skip to content

Commit

Permalink
feat: Tune Patterns query drain instance (#13137)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Jun 5, 2024
1 parent 6e119aa commit 30df31e
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)

func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster {
tokens := deduplicatePlaceholders(d.tokenizer.Tokenize(content), d.config.ParamString)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true)
// Match no existing log cluster
if matchCluster == nil {
d.clustersCounter++
Expand Down
24 changes: 16 additions & 8 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ type IngesterQuerier struct {

ringClient *RingClient

registerer prometheus.Registerer
registerer prometheus.Registerer
ingesterQuerierMetrics *ingesterQuerierMetrics
}

func NewIngesterQuerier(
Expand All @@ -36,10 +37,11 @@ func NewIngesterQuerier(
logger log.Logger,
) (*IngesterQuerier, error) {
return &IngesterQuerier{
logger: log.With(logger, "component", "pattern-ingester-querier"),
ringClient: ringClient,
cfg: cfg,
registerer: prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer),
logger: log.With(logger, "component", "pattern-ingester-querier"),
ringClient: ringClient,
cfg: cfg,
registerer: prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer),
ingesterQuerierMetrics: newIngesterQuerierMetrics(registerer, metricsNamespace),
}, nil
}

Expand All @@ -63,11 +65,15 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
if err != nil {
return nil, err
}
return prunePatterns(resp, minClusterSize), nil
return prunePatterns(resp, minClusterSize, q.ingesterQuerierMetrics), nil
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse {
d := drain.New(drain.DefaultConfig(), nil)
func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them

patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}
Expand All @@ -86,6 +92,8 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *lo
Samples: cluster.Samples(),
})
}
metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))
return resp
}

Expand Down
43 changes: 35 additions & 8 deletions pkg/pattern/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"os"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/logproto"
)

func Test_prunePatterns(t *testing.T) {
file, err := os.Open(`testdata/patterns.txt`)
file, err := os.Open("testdata/patterns.txt")
require.NoError(t, err)
defer file.Close()

Expand All @@ -24,15 +25,40 @@ func Test_prunePatterns(t *testing.T) {
})
}
require.NoError(t, scanner.Err())
prunePatterns(resp, 0)

startingPatterns := len(resp.Series)
prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, `test`))

expectedPatterns := []string{
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=<_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=<_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=<_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=<_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=<_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=<_> handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_>`,
`<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" <_> partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=0, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=1, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=2, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=3, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=4, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=5, <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=6, <_> +0000 UTC, <_>`,
`<_> caller=batcher.go:155 level=info msg="batcher:processing aggregation result" result="user=9960, partitionID=7, <_> +0000 UTC, <_>`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> +0000 UTC" <_> +0000 UTC" <_> currentBuckets="unsupported value type"`,
`<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_>`,
}

Expand All @@ -43,4 +69,5 @@ func Test_prunePatterns(t *testing.T) {
slices.Sort(patterns)

require.Equal(t, expectedPatterns, patterns)
require.Less(t, len(patterns), startingPatterns, `prunePatterns should remove duplicates`)
}
22 changes: 22 additions & 0 deletions pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,25 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
}),
}
}

type ingesterQuerierMetrics struct {
patternsPrunedTotal prometheus.Counter
patternsRetainedTotal prometheus.Counter
}

func newIngesterQuerierMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterQuerierMetrics {
return &ingesterQuerierMetrics{
patternsPrunedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "query_pruned_total",
Help: "The total number of patterns removed at query time by the pruning Drain instance",
}),
patternsRetainedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "query_retained_total",
Help: "The total number of patterns retained at query time by the pruning Drain instance",
}),
}
}

0 comments on commit 30df31e

Please sign in to comment.