From 0af0211d5e06ad1290a286c9cadc0b5dbe4ebd15 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 18 Oct 2024 13:23:24 +0200 Subject: [PATCH 1/5] Make list of json fields to mine patterns for configurable --- pkg/loki/modules.go | 1 + pkg/pattern/drain/drain.go | 9 +++-- pkg/pattern/drain/drain_benchmark_test.go | 2 +- pkg/pattern/drain/drain_test.go | 40 ++++++++++++++--------- pkg/pattern/drain/line_tokenizer.go | 16 ++++++--- pkg/pattern/drain/line_tokenizer_test.go | 3 +- pkg/pattern/flush_test.go | 2 +- pkg/pattern/ingester.go | 8 +++++ pkg/pattern/ingester_test.go | 10 ++++++ pkg/pattern/instance.go | 25 +++++++------- pkg/pattern/stream.go | 3 +- pkg/pattern/stream_test.go | 20 ++---------- pkg/util/limiter/combined_limits.go | 2 ++ pkg/validation/limits.go | 9 +++++ 14 files changed, 94 insertions(+), 56 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 24e19d7c58e05..991af6cdc1c5f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -638,6 +638,7 @@ func (t *Loki) initPatternIngester() (_ services.Service, err error) { t.Cfg.Pattern.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort t.PatternIngester, err = pattern.New( t.Cfg.Pattern, + t.Overrides, t.PatternRingClient, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 9e6062432cc6a..05a4f27922e3b 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -47,6 +47,10 @@ type Config struct { MaxAllowedLineLength int } +type Limits interface { + PatternIngesterTokenizableJsonFields(userID string) []string +} + func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache { if maxSize == 0 { maxSize = math.MaxInt @@ -135,7 +139,7 @@ func DefaultConfig() *Config { } } -func New(config *Config, format string, metrics *Metrics) *Drain { +func New(tenantID string, config *Config, limits Limits, format string, metrics *Metrics) *Drain { if config.LogClusterDepth < 3 { panic("depth argument must be at least 3") } @@ -153,7 +157,8 @@ func New(config *Config, format string, metrics *Metrics) *Drain { var tokenizer LineTokenizer switch format { case FormatJSON: - tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength) + fieldsToTokenize := limits.PatternIngesterTokenizableJsonFields(tenantID) + tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength, fieldsToTokenize) case FormatLogfmt: tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength) default: diff --git a/pkg/pattern/drain/drain_benchmark_test.go b/pkg/pattern/drain/drain_benchmark_test.go index e7c95f721ed4c..5313f10db396d 100644 --- a/pkg/pattern/drain/drain_benchmark_test.go +++ b/pkg/pattern/drain/drain_benchmark_test.go @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) { line := scanner.Text() lines = append(lines, line) } - drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil) + drain := New("", DefaultConfig(), &fakeLimits{}, DetectLogFormat(lines[0]), nil) b.ReportAllocs() b.ResetTimer() diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 9359feb8dd343..e57ffe6acd3d5 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -27,7 +27,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { format string }{ { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/agent-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -56,7 +56,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/ingester-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -66,7 +66,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/drone-json.txt`, format: FormatJSON, patterns: []string{ @@ -79,7 +79,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/distributor-logfmt.txt", format: FormatLogfmt, patterns: []string{ @@ -91,7 +91,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/journald.txt", format: FormatUnknown, patterns: []string{ @@ -211,7 +211,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/kafka.txt", format: FormatUnknown, patterns: []string{ @@ -232,7 +232,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/kubernetes.txt", format: FormatUnknown, patterns: []string{ @@ -273,7 +273,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/vault.txt", format: FormatUnknown, patterns: []string{ @@ -281,7 +281,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/calico.txt", format: FormatUnknown, patterns: []string{ @@ -374,7 +374,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/grafana-ruler.txt", format: FormatLogfmt, patterns: []string{ @@ -470,7 +470,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }{ { name: "should extract patterns that all lines match", - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ "test 1 test test", "test 2 test test", @@ -480,7 +480,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with newlines", - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ `test 1 test test `, @@ -494,7 +494,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with empty space", - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ `test 1 test test `, `test 2 test test `, @@ -504,7 +504,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line starts with empty space", - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ ` test 1 test test`, ` test 2 test test`, @@ -514,7 +514,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "Scheduler patterns are matchable", - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, `ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, @@ -611,7 +611,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) { }{ { name: "should prune old branches", - drain: New(DefaultConfig(), "", nil), + drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ "test test test A", "test test test B", @@ -665,3 +665,11 @@ func countNodes(node *Node) int { } return total } + +type fakeLimits struct { + Limits +} + +func (f *fakeLimits) PatternIngesterTokenizableJsonFields(_ string) []string { + return []string{"log", "message", "msg", "msg_", "_msg", "content"} +} diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 87b98afaea6de..4d758181399b4 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -263,17 +263,23 @@ func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, inter type jsonTokenizer struct { *punctuationTokenizer - varReplace string - maxLineLength int + varReplace string + maxLineLength int + fieldsToTokenize []string } -func newJSONTokenizer(varReplace string, maxLineLength int) *jsonTokenizer { - return &jsonTokenizer{newPunctuationTokenizer(maxLineLength), varReplace, maxLineLength} +func newJSONTokenizer(varReplace string, maxLineLength int, fieldsToTokenize []string) *jsonTokenizer { + return &jsonTokenizer{ + punctuationTokenizer: newPunctuationTokenizer(maxLineLength), + varReplace: varReplace, + maxLineLength: maxLineLength, + fieldsToTokenize: fieldsToTokenize, + } } func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { var found []byte - for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} { + for _, key := range t.fieldsToTokenize { msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key) if err == nil && ty == jsonparser.String { found = msg diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index f825a8d86bbc6..a2c8013b14c3b 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -325,7 +325,8 @@ func TestJsonTokenizer(t *testing.T) { }, } - tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength) + fieldsToTokenize := []string{"log", "message", "msg", "msg_", "_msg", "content"} + tokenizer := newJSONTokenizer(param, DefaultConfig().MaxAllowedLineLength, fieldsToTokenize) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/pattern/flush_test.go b/pkg/pattern/flush_test.go index be6a8f3253335..318492f3406cd 100644 --- a/pkg/pattern/flush_test.go +++ b/pkg/pattern/flush_test.go @@ -41,7 +41,7 @@ func TestSweepInstance(t *testing.T) { ring: fakeRing, } - ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger()) + ing, err := New(defaultIngesterTestConfig(t), &fakeLimits{}, ringClient, "foo", nil, log.NewNopLogger()) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck err = services.StartAndAwaitRunning(context.Background(), ing) diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 3c1bb55b76804..60c71920b7d19 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -148,6 +148,10 @@ func (cfg *Config) Validate() error { return cfg.LifecyclerConfig.Validate() } +type Limits interface { + drain.Limits +} + type Ingester struct { services.Service lifecycler *ring.Lifecycler @@ -156,6 +160,7 @@ type Ingester struct { lifecyclerWatcher *services.FailureWatcher cfg Config + limits Limits registerer prometheus.Registerer logger log.Logger @@ -175,6 +180,7 @@ type Ingester struct { func New( cfg Config, + limits Limits, ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, @@ -189,6 +195,7 @@ func New( i := &Ingester{ cfg: cfg, + limits: limits, ringClient: ringClient, logger: log.With(logger, "component", "pattern-ingester"), registerer: registerer, @@ -416,6 +423,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / i.logger, i.metrics, i.drainCfg, + i.limits, i.ringClient, i.lifecycler.ID, writer, diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index a5dd5cdbaaed4..b9bf6ccfc8aae 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -54,6 +54,7 @@ func TestInstancePushQuery(t *testing.T) { log.NewNopLogger(), newIngesterMetrics(nil, "test"), drain.DefaultConfig(), + &fakeLimits{}, ringClient, ingesterID, mockWriter, @@ -141,6 +142,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) { log.NewNopLogger(), newIngesterMetrics(nil, "test"), drain.DefaultConfig(), + &fakeLimits{}, ringClient, ingesterID, mockWriter, @@ -336,3 +338,11 @@ func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Lab func (m *mockEntryWriter) Stop() { _ = m.Called() } + +type fakeLimits struct { + Limits +} + +func (f *fakeLimits) PatternIngesterTokenizableJsonFields(_ string) []string { + return []string{"log", "message", "msg", "msg_", "_msg", "content"} +} diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 6e3a3de998be5..46c355a9bbcfc 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -32,16 +32,17 @@ const indexShards = 32 // instance is a tenant instance of the pattern ingester. type instance struct { - instanceID string - buf []byte // buffer used to compute fps. - mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free - streams *streamsMap - index *index.BitPrefixInvertedIndex - logger log.Logger - metrics *ingesterMetrics - drainCfg *drain.Config - ringClient RingClient - ingesterID string + instanceID string + buf []byte // buffer used to compute fps. + mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free + streams *streamsMap + index *index.BitPrefixInvertedIndex + logger log.Logger + metrics *ingesterMetrics + drainCfg *drain.Config + drainLimits drain.Limits + ringClient RingClient + ingesterID string aggMetricsLock sync.Mutex aggMetricsByStreamAndLevel map[string]map[string]*aggregatedMetrics @@ -59,6 +60,7 @@ func newInstance( logger log.Logger, metrics *ingesterMetrics, drainCfg *drain.Config, + drainLimits drain.Limits, ringClient RingClient, ingesterID string, writer aggregation.EntryWriter, @@ -75,6 +77,7 @@ func newInstance( index: index, metrics: metrics, drainCfg: drainCfg, + drainLimits: drainLimits, ringClient: ringClient, ingesterID: ingesterID, aggMetricsByStreamAndLevel: make(map[string]map[string]*aggregatedMetrics), @@ -220,7 +223,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) firstEntryLine := pushReqStream.Entries[0].Line - s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg) + s, err := newStream(fp, sortedLabels, i.metrics, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg, i.drainLimits) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 9452def376827..7f53c5777cfeb 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -35,6 +35,7 @@ func newStream( guessedFormat string, instanceID string, drainCfg *drain.Config, + drainLimits drain.Limits, ) (*stream, error) { return &stream{ fp: fp, @@ -42,7 +43,7 @@ func newStream( labelsString: labels.String(), labelHash: labels.Hash(), logger: logger, - patterns: drain.New(drainCfg, guessedFormat, &drain.Metrics{ + patterns: drain.New(instanceID, drainCfg, drainLimits, guessedFormat, &drain.Metrics{ PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"), PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"), PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index 201a5566e728f..adf18af33972d 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -18,15 +18,7 @@ import ( func TestAddStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - stream, err := newStream( - model.Fingerprint(lbs.Hash()), - lbs, - newIngesterMetrics(nil, "test"), - log.NewNopLogger(), - drain.FormatUnknown, - "123", - drain.DefaultConfig(), - ) + stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{}) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ @@ -54,15 +46,7 @@ func TestAddStream(t *testing.T) { func TestPruneStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - stream, err := newStream( - model.Fingerprint(lbs.Hash()), - lbs, - newIngesterMetrics(nil, "test"), - log.NewNopLogger(), - drain.FormatUnknown, - "123", - drain.DefaultConfig(), - ) + stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"), log.NewNopLogger(), drain.FormatUnknown, "123", drain.DefaultConfig(), &fakeLimits{}) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ diff --git a/pkg/util/limiter/combined_limits.go b/pkg/util/limiter/combined_limits.go index 3ea2a230634e8..f29d703a6c68b 100644 --- a/pkg/util/limiter/combined_limits.go +++ b/pkg/util/limiter/combined_limits.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/ingester" + "github.com/grafana/loki/v3/pkg/pattern" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" queryrange_limits "github.com/grafana/loki/v3/pkg/querier/queryrange/limits" "github.com/grafana/loki/v3/pkg/ruler" @@ -28,4 +29,5 @@ type CombinedLimits interface { bloomgateway.Limits bloomplanner.Limits bloombuilder.Limits + pattern.Limits } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 59d4a3f99e4ec..10ec7e88f43e6 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -225,6 +225,8 @@ type Limits struct { BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"` IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` + + PatternIngesterTokenizableJsonFields dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields" json:"pattern_ingester_tokenizable_json_fields" doc:"description=List of JSON fields that should be tokenized in the ingester."` } type StreamRetention struct { @@ -418,6 +420,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.") f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.") + + _ = l.PatternIngesterTokenizableJsonFields.Set("log,message,msg,msg_,_msg,content") + f.Var(&l.PatternIngesterTokenizableJsonFields, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the ingester.") } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -1054,6 +1059,10 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { return o.getOverridesForUser(userID).BlockIngestionStatusCode } +func (o *Overrides) PatternIngesterTokenizableJsonFields(userID string) []string { + return o.getOverridesForUser(userID).PatternIngesterTokenizableJsonFields +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) From 3ea90ddc9f59cbfef30a58017b694a8f0a0cdbf4 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 18 Oct 2024 13:31:49 +0200 Subject: [PATCH 2/5] docs --- pkg/validation/limits.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 10ec7e88f43e6..fb40d8e6ed2f7 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -226,7 +226,7 @@ type Limits struct { IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` - PatternIngesterTokenizableJsonFields dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields" json:"pattern_ingester_tokenizable_json_fields" doc:"description=List of JSON fields that should be tokenized in the ingester."` + PatternIngesterTokenizableJsonFields dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields" json:"pattern_ingester_tokenizable_json_fields" doc:"hidden"` } type StreamRetention struct { From feeb78393886be621f7f11f1ea13c9834ebb1d01 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 18 Oct 2024 13:38:56 +0200 Subject: [PATCH 3/5] lint issues --- pkg/pattern/drain/drain.go | 4 ++-- pkg/pattern/drain/drain_test.go | 2 +- pkg/pattern/ingester_test.go | 2 +- pkg/validation/limits.go | 10 +++++----- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 05a4f27922e3b..fcfd017884fed 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -48,7 +48,7 @@ type Config struct { } type Limits interface { - PatternIngesterTokenizableJsonFields(userID string) []string + PatternIngesterTokenizableJSONFields(userID string) []string } func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache { @@ -157,7 +157,7 @@ func New(tenantID string, config *Config, limits Limits, format string, metrics var tokenizer LineTokenizer switch format { case FormatJSON: - fieldsToTokenize := limits.PatternIngesterTokenizableJsonFields(tenantID) + fieldsToTokenize := limits.PatternIngesterTokenizableJSONFields(tenantID) tokenizer = newJSONTokenizer(config.ParamString, config.MaxAllowedLineLength, fieldsToTokenize) case FormatLogfmt: tokenizer = newLogfmtTokenizer(config.ParamString, config.MaxAllowedLineLength) diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index e57ffe6acd3d5..bf455f1088c80 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -670,6 +670,6 @@ type fakeLimits struct { Limits } -func (f *fakeLimits) PatternIngesterTokenizableJsonFields(_ string) []string { +func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string { return []string{"log", "message", "msg", "msg_", "_msg", "content"} } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index b9bf6ccfc8aae..effa1c1959437 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -343,6 +343,6 @@ type fakeLimits struct { Limits } -func (f *fakeLimits) PatternIngesterTokenizableJsonFields(_ string) []string { +func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string { return []string{"log", "message", "msg", "msg_", "_msg", "content"} } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index fb40d8e6ed2f7..aa759066374e9 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -226,7 +226,7 @@ type Limits struct { IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` - PatternIngesterTokenizableJsonFields dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields" json:"pattern_ingester_tokenizable_json_fields" doc:"hidden"` + PatternIngesterTokenizableJSONFields dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields" json:"pattern_ingester_tokenizable_json_fields" doc:"hidden"` } type StreamRetention struct { @@ -421,8 +421,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.") - _ = l.PatternIngesterTokenizableJsonFields.Set("log,message,msg,msg_,_msg,content") - f.Var(&l.PatternIngesterTokenizableJsonFields, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the ingester.") + _ = l.PatternIngesterTokenizableJSONFields.Set("log,message,msg,msg_,_msg,content") + f.Var(&l.PatternIngesterTokenizableJSONFields, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the ingester.") } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -1059,8 +1059,8 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { return o.getOverridesForUser(userID).BlockIngestionStatusCode } -func (o *Overrides) PatternIngesterTokenizableJsonFields(userID string) []string { - return o.getOverridesForUser(userID).PatternIngesterTokenizableJsonFields +func (o *Overrides) PatternIngesterTokenizableJSONFields(userID string) []string { + return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFields } func (o *Overrides) getOverridesForUser(userID string) *Limits { From d2da933879100aa7277361913af6321972b563f1 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 21 Oct 2024 09:38:07 +0200 Subject: [PATCH 4/5] Default, Append and Delete lists --- pkg/validation/limits.go | 43 ++++++++++++++++++++++--- pkg/validation/limits_test.go | 60 +++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index aa759066374e9..96a1af0693a28 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -226,7 +226,9 @@ type Limits struct { IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` - PatternIngesterTokenizableJSONFields dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields" json:"pattern_ingester_tokenizable_json_fields" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsDefault dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_default" json:"pattern_ingester_tokenizable_json_fields_default" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"` } type StreamRetention struct { @@ -421,8 +423,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.") - _ = l.PatternIngesterTokenizableJSONFields.Set("log,message,msg,msg_,_msg,content") - f.Var(&l.PatternIngesterTokenizableJSONFields, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the ingester.") + _ = l.PatternIngesterTokenizableJSONFieldsDefault.Set("log,message,msg,msg_,_msg,content") + f.Var(&l.PatternIngesterTokenizableJSONFieldsDefault, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the pattern ingester.") + f.Var(&l.PatternIngesterTokenizableJSONFieldsAppend, "limits.pattern-ingester-tokenizable-json-fields-append", "List of JSON fields that should be appended to the default list of tokenizable fields in the pattern ingester.") + f.Var(&l.PatternIngesterTokenizableJSONFieldsDelete, "limits.pattern-ingester-tokenizable-json-fields-delete", "List of JSON fields that should be deleted from the (default U append) list of tokenizable fields in the pattern ingester.") } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -1060,7 +1064,38 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int { } func (o *Overrides) PatternIngesterTokenizableJSONFields(userID string) []string { - return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFields + defaultFields := o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDefault + appendFields := o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsAppend + deleteFields := o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete + + outputMap := make(map[string]struct{}, len(defaultFields)+len(appendFields)) + + for _, field := range defaultFields { + outputMap[field] = struct{}{} + } + + for _, field := range appendFields { + outputMap[field] = struct{}{} + } + + for _, field := range deleteFields { + delete(outputMap, field) + } + + output := make([]string, 0, len(outputMap)) + for field := range outputMap { + output = append(output, field) + } + + return output +} + +func (o *Overrides) PatternIngesterTokenizableJSONFieldsAppend(userID string) []string { + return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsAppend +} + +func (o *Overrides) PatternIngesterTokenizableJSONFieldsDelete(userID string) []string { + return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete } func (o *Overrides) getOverridesForUser(userID string) *Limits { diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 19278c77a342f..bfb522f73a2e6 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -354,3 +354,63 @@ func TestLimitsValidation(t *testing.T) { }) } } + +func Test_PatternIngesterTokenizableJSONFields(t *testing.T) { + for _, tc := range []struct { + name string + yaml string + expected []string + }{ + { + name: "only defaults", + yaml: ` +pattern_ingester_tokenizable_json_fields_default: log,message +`, + expected: []string{"log", "message"}, + }, + { + name: "with append", + yaml: ` +pattern_ingester_tokenizable_json_fields_default: log,message +pattern_ingester_tokenizable_json_fields_append: msg,body +`, + expected: []string{"log", "message", "msg", "body"}, + }, + { + name: "with delete", + yaml: ` +pattern_ingester_tokenizable_json_fields_default: log,message +pattern_ingester_tokenizable_json_fields_delete: message +`, + expected: []string{"log"}, + }, + { + name: "with append and delete from default", + yaml: ` +pattern_ingester_tokenizable_json_fields_default: log,message +pattern_ingester_tokenizable_json_fields_append: msg,body +pattern_ingester_tokenizable_json_fields_delete: message +`, + expected: []string{"log", "msg", "body"}, + }, + { + name: "with append and delete from append", + yaml: ` +pattern_ingester_tokenizable_json_fields_default: log,message +pattern_ingester_tokenizable_json_fields_append: msg,body +pattern_ingester_tokenizable_json_fields_delete: body +`, + expected: []string{"log", "message", "msg"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + overrides := Overrides{ + defaultLimits: &Limits{}, + } + require.NoError(t, yaml.Unmarshal([]byte(tc.yaml), overrides.defaultLimits)) + + actual := overrides.PatternIngesterTokenizableJSONFields("fake") + require.ElementsMatch(t, tc.expected, actual) + }) + } +} From e754f395a8a8563110c7353be94fe07b3e607731 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 21 Oct 2024 09:41:35 +0200 Subject: [PATCH 5/5] test tenant const --- pkg/pattern/drain/drain_test.go | 37 ++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index bf455f1088c80..c2beda4b44d5f 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -14,12 +14,15 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log/pattern" ) +const ( + testTenant = "fake" +) + func TestDrain_TrainExtractsPatterns(t *testing.T) { t.Parallel() // Set this so the test will print the patterns found, in string slice format for easy copy-paste outputPatternsForTestUpdate := false - tests := []struct { drain *Drain inputFile string @@ -27,7 +30,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { format string }{ { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/agent-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -56,7 +59,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/ingester-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -66,7 +69,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: `testdata/drone-json.txt`, format: FormatJSON, patterns: []string{ @@ -79,7 +82,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/distributor-logfmt.txt", format: FormatLogfmt, patterns: []string{ @@ -91,7 +94,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/journald.txt", format: FormatUnknown, patterns: []string{ @@ -211,7 +214,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/kafka.txt", format: FormatUnknown, patterns: []string{ @@ -232,7 +235,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/kubernetes.txt", format: FormatUnknown, patterns: []string{ @@ -273,7 +276,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/vault.txt", format: FormatUnknown, patterns: []string{ @@ -281,7 +284,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/calico.txt", format: FormatUnknown, patterns: []string{ @@ -374,7 +377,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputFile: "testdata/grafana-ruler.txt", format: FormatLogfmt, patterns: []string{ @@ -470,7 +473,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }{ { name: "should extract patterns that all lines match", - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ "test 1 test test", "test 2 test test", @@ -480,7 +483,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with newlines", - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ `test 1 test test `, @@ -494,7 +497,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with empty space", - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ `test 1 test test `, `test 2 test test `, @@ -504,7 +507,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line starts with empty space", - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ ` test 1 test test`, ` test 2 test test`, @@ -514,7 +517,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "Scheduler patterns are matchable", - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, `ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, @@ -611,7 +614,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) { }{ { name: "should prune old branches", - drain: New("", DefaultConfig(), &fakeLimits{}, "", nil), + drain: New(testTenant, DefaultConfig(), &fakeLimits{}, "", nil), inputLines: []string{ "test test test A", "test test test B",