From 1a0f402380f2b10242675716fcb5b6502b304f37 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 15 Sep 2021 17:52:57 +0530 Subject: [PATCH 1/9] [search] Add global deny list of tags Signed-off-by: Annanay --- modules/distributor/config.go | 2 ++ modules/distributor/distributor.go | 15 ++++++++--- modules/distributor/search_data.go | 18 +++++++++++--- modules/distributor/search_data_test.go | 33 +++++++++++++++++++++++-- 4 files changed, 60 insertions(+), 8 deletions(-) diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 94180f37e15..4b12214827b 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -39,6 +39,8 @@ type Config struct { // note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica ExtendWrites bool `yaml:"extend_writes"` + SearchTagsDenyList []string `yaml:"search_tags_deny_list"` + // For testing. factory func(addr string) (ring_client.PoolClient, error) `yaml:"-"` } diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index cced4b2722b..cf48aed0230 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -94,7 +94,10 @@ type Distributor struct { ingestersRing ring.ReadRing pool *ring_client.Pool DistributorRing *ring.Ring - searchEnabled bool + + // search + searchEnabled bool + tagsToDrop map[string]struct{} // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter @@ -147,6 +150,12 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi subservices = append(subservices, pool) + // turn list into map for efficient checking + tagsToDrop := map[string]struct{}{} + for _, tag := range cfg.SearchTagsDenyList { + tagsToDrop[tag] = struct{}{} + } + d := &Distributor{ cfg: cfg, clientCfg: clientCfg, @@ -155,6 +164,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi DistributorRing: distributorRing, ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), searchEnabled: searchEnabled, + tagsToDrop: tagsToDrop, } cfgReceivers := cfg.Receivers @@ -249,10 +259,9 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp return nil, err } - //var var searchData [][]byte if d.searchEnabled { - searchData = extractSearchDataAll(traces, ids) + searchData = extractSearchDataAll(traces, ids, d.tagsToDrop) } err = d.sendToIngestersViaBytes(ctx, userID, traces, searchData, keys, ids) diff --git a/modules/distributor/search_data.go b/modules/distributor/search_data.go index 2686165dcaf..7739470b5ce 100644 --- a/modules/distributor/search_data.go +++ b/modules/distributor/search_data.go @@ -11,11 +11,11 @@ import ( ) // extractSearchDataAll returns flatbuffer search data for every trace. -func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte) [][]byte { +func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte, tagsToDrop map[string]struct{}) [][]byte { headers := make([][]byte, len(traces)) for i, t := range traces { - headers[i] = extractSearchData(t, ids[i]) + headers[i] = extractSearchData(t, ids[i], tagsToDrop) } return headers @@ -24,7 +24,7 @@ func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte) [][]byte { // extractSearchData returns the flatbuffer search data for the given trace. It is extracted here // in the distributor because this is the only place on the ingest path where the trace is available // in object form. -func extractSearchData(trace *tempopb.Trace, id []byte) []byte { +func extractSearchData(trace *tempopb.Trace, id []byte, tagsToDrop map[string]struct{}) []byte { data := &tempofb.SearchEntryMutable{} data.TraceID = id @@ -33,6 +33,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte { // Batch attrs if b.Resource != nil { for _, a := range b.Resource.Attributes { + if _, exists := tagsToDrop[a.Key]; exists { + continue + } if s, ok := extractValueAsString(a.Value); ok { data.AddTag(a.Key, s) } @@ -49,6 +52,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte { // Span attrs for _, a := range s.Attributes { + if _, exists := tagsToDrop[a.Key]; exists { + continue + } if s, ok := extractValueAsString(a.Value); ok { data.AddTag(fmt.Sprint(search.RootSpanPrefix, a.Key), s) } @@ -57,6 +63,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte { // Batch attrs if b.Resource != nil { for _, a := range b.Resource.Attributes { + if _, exists := tagsToDrop[a.Key]; exists { + continue + } if s, ok := extractValueAsString(a.Value); ok { data.AddTag(fmt.Sprint(search.RootSpanPrefix, a.Key), s) } @@ -70,6 +79,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte { data.SetEndTimeUnixNano(s.EndTimeUnixNano) for _, a := range s.Attributes { + if _, exists := tagsToDrop[a.Key]; exists { + continue + } if s, ok := extractValueAsString(a.Value); ok { data.AddTag(a.Key, s) } diff --git a/modules/distributor/search_data_test.go b/modules/distributor/search_data_test.go index ebb2927ad11..8259e8b1fcd 100644 --- a/modules/distributor/search_data_test.go +++ b/modules/distributor/search_data_test.go @@ -20,10 +20,11 @@ func TestExtractSearchData(t *testing.T) { name string trace *tempopb.Trace id []byte + tagsToDrop map[string]struct{} searchData *tempofb.SearchEntryMutable }{ { - name: "trace with root span", + name: "extracts search tags", trace: &tempopb.Trace{ Batches: []*v1.ResourceSpans{ { @@ -73,12 +74,40 @@ func TestExtractSearchData(t *testing.T) { StartTimeUnixNano: 0, EndTimeUnixNano: 0, }, + tagsToDrop: map[string]struct{}{}, + }, + { + name: "drops tags in deny list", + trace: &tempopb.Trace{ + Batches: []*v1.ResourceSpans{ + { + Resource: &v1_resource.Resource{ + Attributes: []*v1_common.KeyValue{ + { + Key: "foo", + Value: &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{StringValue: "bar"}, + }, + }, + }, + }, + }, + }, + }, + id: traceIDA, + searchData: &tempofb.SearchEntryMutable{ + TraceID: traceIDA, + Tags: tempofb.SearchDataMap{}, + StartTimeUnixNano: 0, + EndTimeUnixNano: 0, + }, + tagsToDrop: map[string]struct{}{"foo": {}}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id)) + assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.tagsToDrop)) }) } } From 7405e9166f1932f1e7d27674c193b1f33b22ae27 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 15 Sep 2021 21:10:37 +0530 Subject: [PATCH 2/9] Move CHANGELOG entries around, add docs Signed-off-by: Annanay --- CHANGELOG.md | 15 ++++++++------- docs/tempo/website/configuration/_index.md | 4 ++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9c7ab76060..6e9a85849e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ ## main / unreleased * [CHANGE] **BREAKING CHANGE** Drop support for v0 and v1 blocks. See [1.1 changelog](https://github.com/grafana/tempo/releases/tag/v1.1.0) for details [#919](https://github.com/grafana/tempo/pull/919) (@joe-elliott) +* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394) +* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover) +* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala) * [FEATURE] Add ability to search ingesters for traces [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) -* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala) -* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio) -* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) +* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno) * [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott) * [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394) * [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk) @@ -35,10 +36,10 @@ * [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala) * [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25) * [ENHANCEMENT] Add status endpoint to list the available endpoints [#938](https://github.com/grafana/tempo/pull/938) (@zalegrala) -* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394) -* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover) -* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala) -* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno) +* [ENHANCEMENT] Support global denylist of tags to be dropped from search data. [#960](https://github.com/grafana/tempo/pull/960) (@annanay25) +* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala) +* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio) +* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) ## v1.1.0 / 2021-08-26 * [CHANGE] Upgrade Cortex from v1.9.0 to v1.9.0-131-ga4bf10354 [#841](https://github.com/grafana/tempo/pull/841) (@aknuds1) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index ae96a0306e8..3e40ab45d17 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -123,6 +123,10 @@ distributor: # note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica [extend_writes: ] + # Optional. + # List of tags that will **not** be extracted from trace data for search lookups + # This is a global config that will apply to all tenants + [search_tags_deny_list: | default = ] ``` ## Ingester From a349d7cb985e97a347332a03c9aa0e2783712526 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 15 Sep 2021 21:14:35 +0530 Subject: [PATCH 3/9] Fix entry Signed-off-by: Annanay --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd9ea9bae19..cc9c16ebdbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ * [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala) * [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25) * [ENHANCEMENT] Add status endpoint to list the available endpoints [#938](https://github.com/grafana/tempo/pull/938) (@zalegrala) +* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio) * [ENHANCEMENT] Support global denylist of tags to be dropped from search data. [#960](https://github.com/grafana/tempo/pull/960) (@annanay25) * [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala) * [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio) From 7409d4bb05d4531597a6fd0448ef1741a4d0f54c Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 21 Sep 2021 16:02:18 +0530 Subject: [PATCH 4/9] Rework around adding a custom type and yaml/json marshaller Signed-off-by: Annanay --- modules/distributor/distributor.go | 22 +++++++++++--- modules/distributor/search_data.go | 16 +++++----- modules/distributor/search_data_test.go | 24 +++++++++++---- modules/overrides/limits.go | 7 +++-- modules/overrides/limits_test.go | 6 ++++ modules/overrides/list_to_map.go | 40 +++++++++++++++++++++++++ modules/overrides/overrides.go | 5 ++++ 7 files changed, 100 insertions(+), 20 deletions(-) create mode 100644 modules/overrides/list_to_map.go diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index cf48aed0230..1de4f16f591 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -94,10 +94,11 @@ type Distributor struct { ingestersRing ring.ReadRing pool *ring_client.Pool DistributorRing *ring.Ring + overrides *overrides.Overrides // search - searchEnabled bool - tagsToDrop map[string]struct{} + searchEnabled bool + globalTagsToDrop map[string]struct{} // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter @@ -164,7 +165,8 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi DistributorRing: distributorRing, ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), searchEnabled: searchEnabled, - tagsToDrop: tagsToDrop, + globalTagsToDrop: tagsToDrop, + overrides: o, } cfgReceivers := cfg.Receivers @@ -261,7 +263,19 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp var searchData [][]byte if d.searchEnabled { - searchData = extractSearchDataAll(traces, ids, d.tagsToDrop) + perTenantAllowedTags := d.overrides.SearchTagsAllowList(userID) + searchData = extractSearchDataAll(traces, ids, func(tag string) bool { + // if in per tenant override, extract + if _, ok := perTenantAllowedTags.GetMap()[tag]; ok { + return true + } + // if in global deny list, drop + if _, ok := d.globalTagsToDrop[tag]; ok { + return false + } + // allow otherwise + return true + }) } err = d.sendToIngestersViaBytes(ctx, userID, traces, searchData, keys, ids) diff --git a/modules/distributor/search_data.go b/modules/distributor/search_data.go index 7739470b5ce..c4e9efaf1b4 100644 --- a/modules/distributor/search_data.go +++ b/modules/distributor/search_data.go @@ -10,12 +10,14 @@ import ( "github.com/grafana/tempo/tempodb/search" ) +type extractTagFunc func(tag string) bool + // extractSearchDataAll returns flatbuffer search data for every trace. -func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte, tagsToDrop map[string]struct{}) [][]byte { +func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte, extractTag extractTagFunc) [][]byte { headers := make([][]byte, len(traces)) for i, t := range traces { - headers[i] = extractSearchData(t, ids[i], tagsToDrop) + headers[i] = extractSearchData(t, ids[i], extractTag) } return headers @@ -24,7 +26,7 @@ func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte, tagsToDrop map[ // extractSearchData returns the flatbuffer search data for the given trace. It is extracted here // in the distributor because this is the only place on the ingest path where the trace is available // in object form. -func extractSearchData(trace *tempopb.Trace, id []byte, tagsToDrop map[string]struct{}) []byte { +func extractSearchData(trace *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte { data := &tempofb.SearchEntryMutable{} data.TraceID = id @@ -33,7 +35,7 @@ func extractSearchData(trace *tempopb.Trace, id []byte, tagsToDrop map[string]st // Batch attrs if b.Resource != nil { for _, a := range b.Resource.Attributes { - if _, exists := tagsToDrop[a.Key]; exists { + if !extractTag(a.Key) { continue } if s, ok := extractValueAsString(a.Value); ok { @@ -52,7 +54,7 @@ func extractSearchData(trace *tempopb.Trace, id []byte, tagsToDrop map[string]st // Span attrs for _, a := range s.Attributes { - if _, exists := tagsToDrop[a.Key]; exists { + if !extractTag(a.Key) { continue } if s, ok := extractValueAsString(a.Value); ok { @@ -63,7 +65,7 @@ func extractSearchData(trace *tempopb.Trace, id []byte, tagsToDrop map[string]st // Batch attrs if b.Resource != nil { for _, a := range b.Resource.Attributes { - if _, exists := tagsToDrop[a.Key]; exists { + if !extractTag(a.Key) { continue } if s, ok := extractValueAsString(a.Value); ok { @@ -79,7 +81,7 @@ func extractSearchData(trace *tempopb.Trace, id []byte, tagsToDrop map[string]st data.SetEndTimeUnixNano(s.EndTimeUnixNano) for _, a := range s.Attributes { - if _, exists := tagsToDrop[a.Key]; exists { + if !extractTag(a.Key) { continue } if s, ok := extractValueAsString(a.Value); ok { diff --git a/modules/distributor/search_data_test.go b/modules/distributor/search_data_test.go index 8259e8b1fcd..6abf0c31c07 100644 --- a/modules/distributor/search_data_test.go +++ b/modules/distributor/search_data_test.go @@ -20,7 +20,7 @@ func TestExtractSearchData(t *testing.T) { name string trace *tempopb.Trace id []byte - tagsToDrop map[string]struct{} + extractTag extractTagFunc searchData *tempofb.SearchEntryMutable }{ { @@ -74,7 +74,9 @@ func TestExtractSearchData(t *testing.T) { StartTimeUnixNano: 0, EndTimeUnixNano: 0, }, - tagsToDrop: map[string]struct{}{}, + extractTag: func(tag string) bool { + return true + }, }, { name: "drops tags in deny list", @@ -89,6 +91,12 @@ func TestExtractSearchData(t *testing.T) { Value: &v1_common.AnyValue_StringValue{StringValue: "bar"}, }, }, + { + Key: "bar", + Value: &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{StringValue: "baz"}, + }, + }, }, }, }, @@ -96,18 +104,22 @@ func TestExtractSearchData(t *testing.T) { }, id: traceIDA, searchData: &tempofb.SearchEntryMutable{ - TraceID: traceIDA, - Tags: tempofb.SearchDataMap{}, + TraceID: traceIDA, + Tags: tempofb.SearchDataMap{ + "bar": []string{"baz"}, + }, StartTimeUnixNano: 0, EndTimeUnixNano: 0, }, - tagsToDrop: map[string]struct{}{"foo": {}}, + extractTag: func(tag string) bool { + return tag != "foo" + }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.tagsToDrop)) + assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.extractTag)) }) } } diff --git a/modules/overrides/limits.go b/modules/overrides/limits.go index 7c4d49999e3..9442aafd0f1 100644 --- a/modules/overrides/limits.go +++ b/modules/overrides/limits.go @@ -24,9 +24,10 @@ const ( // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` - IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"` - IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` + IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"` + IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"` + SearchTagsAllowList ListToMap `yaml:"search_tags_allow_list" json:"search_tags_allow_list"` // Ingester enforced limits. MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"` diff --git a/modules/overrides/limits_test.go b/modules/overrides/limits_test.go index 878be15f243..ed759b72b82 100644 --- a/modules/overrides/limits_test.go +++ b/modules/overrides/limits_test.go @@ -38,6 +38,9 @@ func TestLimitsYamlMatchJson(t *testing.T) { ingestion_rate_strategy: global ingestion_rate_limit_bytes: 100_000 ingestion_burst_size_bytes: 100_000 +search_tags_allow_list: +- a +- b max_traces_per_user: 1000 max_global_traces_per_user: 1000 @@ -53,6 +56,9 @@ per_tenant_override_period: 1m "ingestion_rate_strategy": "global", "ingestion_rate_limit_bytes": 100000, "ingestion_burst_size_bytes": 100000, + "search_tags_allow_list" : [ + "a", "b" + ], "max_traces_per_user": 1000, "max_global_traces_per_user": 1000, diff --git a/modules/overrides/list_to_map.go b/modules/overrides/list_to_map.go new file mode 100644 index 00000000000..6e857285af0 --- /dev/null +++ b/modules/overrides/list_to_map.go @@ -0,0 +1,40 @@ +package overrides + +import "encoding/json" + +type ListToMap struct { + m map[string]struct{} +} + +// UnmarshalYAML implements the Unmarshaler interface of the yaml pkg. +func (l *ListToMap) UnmarshalYAML(unmarshal func(interface{}) error) error { + list := make([]string, 0) + err := unmarshal(&list) + if err != nil { + return err + } + + l.m = make(map[string]struct{}) + for _, element := range list { + l.m[element] = struct{}{} + } + return nil +} + +func (l *ListToMap) UnmarshalJSON(b []byte) error { + list := make([]string, 0) + err := json.Unmarshal(b, &list) + if err != nil { + return err + } + + l.m = make(map[string]struct{}) + for _, element := range list { + l.m[element] = struct{}{} + } + return nil +} + +func (l *ListToMap) GetMap() map[string]struct{} { + return l.m +} diff --git a/modules/overrides/overrides.go b/modules/overrides/overrides.go index 0678d461950..3092b5c4aeb 100644 --- a/modules/overrides/overrides.go +++ b/modules/overrides/overrides.go @@ -243,6 +243,11 @@ func (o *Overrides) IngestionBurstSizeBytes(userID string) int { return o.getOverridesForUser(userID).IngestionBurstSizeBytes } +// SearchTagsAllowList is the list of tags to be extracted for search, for this tenant +func (o *Overrides) SearchTagsAllowList(userID string) *ListToMap { + return &o.getOverridesForUser(userID).SearchTagsAllowList +} + // BlockRetention is the duration of the block retention for this tenant func (o *Overrides) BlockRetention(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).BlockRetention) From c97995c44fb574f569eb22312cb3628016a61e73 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 22 Sep 2021 17:52:50 +0530 Subject: [PATCH 5/9] Checkpoint: Implemented marshal funcs and added test Signed-off-by: Annanay --- modules/overrides/list_to_map.go | 33 +++++++++++++++- modules/overrides/list_to_map_test.go | 55 +++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 modules/overrides/list_to_map_test.go diff --git a/modules/overrides/list_to_map.go b/modules/overrides/list_to_map.go index 6e857285af0..0ba95d25a69 100644 --- a/modules/overrides/list_to_map.go +++ b/modules/overrides/list_to_map.go @@ -1,11 +1,31 @@ package overrides -import "encoding/json" +import ( + "encoding/json" + + "gopkg.in/yaml.v2" +) type ListToMap struct { m map[string]struct{} } +var _ yaml.Marshaler = (*ListToMap)(nil) +var _ yaml.Unmarshaler = (*ListToMap)(nil) +var _ json.Marshaler = (*ListToMap)(nil) +var _ json.Unmarshaler = (*ListToMap)(nil) + +// MarshalYAML implements the Marshal interface of the yaml pkg. +func (l ListToMap) MarshalYAML() (interface{}, error) { + list := make([]string, 0) + for k := range l.m { + list = append(list, k) + } + + b, err := yaml.Marshal(&list) + return b, err +} + // UnmarshalYAML implements the Unmarshaler interface of the yaml pkg. func (l *ListToMap) UnmarshalYAML(unmarshal func(interface{}) error) error { list := make([]string, 0) @@ -21,6 +41,17 @@ func (l *ListToMap) UnmarshalYAML(unmarshal func(interface{}) error) error { return nil } +// MarshalJSON implements the Marshal interface of the json pkg. +func (l ListToMap) MarshalJSON() ([]byte, error) { + list := make([]string, 0) + for k := range l.m { + list = append(list, k) + } + + return json.Marshal(&list) +} + +// UnmarshalJSON implements the Unmarshal interface of the json pkg. func (l *ListToMap) UnmarshalJSON(b []byte) error { list := make([]string, 0) err := json.Unmarshal(b, &list) diff --git a/modules/overrides/list_to_map_test.go b/modules/overrides/list_to_map_test.go new file mode 100644 index 00000000000..66ccade7897 --- /dev/null +++ b/modules/overrides/list_to_map_test.go @@ -0,0 +1,55 @@ +package overrides + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func TestListToMapMarshalOperations(t *testing.T) { + testCases := []struct { + name string + original ListToMap + marshalledYAML string + marshalledJSON string + }{ + { + name: "empty map", + original: ListToMap{}, + marshalledYAML: "", + marshalledJSON: "[]", + }, + { + name: "map with entries", + original: ListToMap{ + m: map[string]struct{}{ + "foo": {}, + }, + }, + marshalledYAML: "- foo", + marshalledJSON: "[\"foo\"]", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + bytes, err := yaml.Marshal(tc.original) + assert.NoError(t, err) + assert.Equal(t, tc.marshalledYAML, string(bytes)) + + l := ListToMap{} + assert.NoError(t, yaml.Unmarshal([]byte(tc.marshalledYAML), &l)) + assert.Equal(t, tc.original, l) + + bytes, err = json.Marshal(tc.original) + assert.NoError(t, err) + assert.Equal(t, tc.marshalledJSON, string(bytes)) + + l2 := ListToMap{} + assert.NoError(t, json.Unmarshal([]byte(tc.marshalledJSON), &l2)) + assert.Equal(t, tc.original, l2) + }) + } +} From 0266c3990c1e770ca6a4f52155f0cab731e0c3e3 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 22 Sep 2021 20:52:47 +0530 Subject: [PATCH 6/9] Improve tests, handle edge cases Signed-off-by: Annanay --- modules/overrides/list_to_map.go | 9 ++++- modules/overrides/list_to_map_test.go | 57 ++++++++++++++++++--------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/modules/overrides/list_to_map.go b/modules/overrides/list_to_map.go index 0ba95d25a69..d1d6916547c 100644 --- a/modules/overrides/list_to_map.go +++ b/modules/overrides/list_to_map.go @@ -22,8 +22,10 @@ func (l ListToMap) MarshalYAML() (interface{}, error) { list = append(list, k) } - b, err := yaml.Marshal(&list) - return b, err + if len(list) == 0 { + return nil, nil + } + return list, nil } // UnmarshalYAML implements the Unmarshaler interface of the yaml pkg. @@ -67,5 +69,8 @@ func (l *ListToMap) UnmarshalJSON(b []byte) error { } func (l *ListToMap) GetMap() map[string]struct{} { + if l.m == nil { + l.m = map[string]struct{}{} + } return l.m } diff --git a/modules/overrides/list_to_map_test.go b/modules/overrides/list_to_map_test.go index 66ccade7897..1990aa23fe4 100644 --- a/modules/overrides/list_to_map_test.go +++ b/modules/overrides/list_to_map_test.go @@ -10,46 +10,65 @@ import ( func TestListToMapMarshalOperations(t *testing.T) { testCases := []struct { - name string - original ListToMap - marshalledYAML string - marshalledJSON string + name string + inputYAML string + inputJSON string + expectedListToMapYAML ListToMap + expectedListToMapJSON ListToMap + marshalledYAML string + marshalledJSON string }{ { - name: "empty map", - original: ListToMap{}, - marshalledYAML: "", + name: "empty map", + inputYAML: "null", + expectedListToMapYAML: ListToMap{}, + marshalledYAML: "null\n", + inputJSON: "[]", + expectedListToMapJSON: ListToMap{ + m: map[string]struct{}{}, + }, marshalledJSON: "[]", }, { - name: "map with entries", - original: ListToMap{ + name: "map with entries", + inputYAML: "- foo", + expectedListToMapYAML: ListToMap{ + m: map[string]struct{}{ + "foo": {}, + }, + }, + marshalledYAML: "- foo\n", + inputJSON: "[\"foo\"]", + expectedListToMapJSON: ListToMap{ m: map[string]struct{}{ "foo": {}, }, }, - marshalledYAML: "- foo", marshalledJSON: "[\"foo\"]", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - bytes, err := yaml.Marshal(tc.original) + // YAML to struct + var l ListToMap + assert.NoError(t, yaml.Unmarshal([]byte(tc.inputYAML), &l)) + assert.Equal(t, tc.expectedListToMapYAML, l) + + // struct to YAML + bytes, err := yaml.Marshal(tc.expectedListToMapYAML) assert.NoError(t, err) assert.Equal(t, tc.marshalledYAML, string(bytes)) - l := ListToMap{} - assert.NoError(t, yaml.Unmarshal([]byte(tc.marshalledYAML), &l)) - assert.Equal(t, tc.original, l) + // JSON to struct + var l2 ListToMap + assert.NoError(t, json.Unmarshal([]byte(tc.inputJSON), &l2)) + assert.Equal(t, tc.expectedListToMapJSON, l2) - bytes, err = json.Marshal(tc.original) + // struct to JSON + bytes, err = json.Marshal(tc.expectedListToMapJSON) assert.NoError(t, err) assert.Equal(t, tc.marshalledJSON, string(bytes)) - - l2 := ListToMap{} - assert.NoError(t, json.Unmarshal([]byte(tc.marshalledJSON), &l2)) - assert.Equal(t, tc.original, l2) }) } } From b06385a5e025d84a5d0aea367eb1309e100bc671 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 23 Sep 2021 13:21:23 +0530 Subject: [PATCH 7/9] Address comments Signed-off-by: Annanay --- modules/distributor/distributor.go | 2 +- modules/overrides/list_to_map.go | 22 +++++----- modules/overrides/list_to_map_test.go | 61 ++++++++++++++++----------- modules/overrides/overrides.go | 4 +- 4 files changed, 50 insertions(+), 39 deletions(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 1de4f16f591..ba1e55050ed 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -266,7 +266,7 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp perTenantAllowedTags := d.overrides.SearchTagsAllowList(userID) searchData = extractSearchDataAll(traces, ids, func(tag string) bool { // if in per tenant override, extract - if _, ok := perTenantAllowedTags.GetMap()[tag]; ok { + if _, ok := perTenantAllowedTags[tag]; ok { return true } // if in global deny list, drop diff --git a/modules/overrides/list_to_map.go b/modules/overrides/list_to_map.go index d1d6916547c..c4bd393e7d7 100644 --- a/modules/overrides/list_to_map.go +++ b/modules/overrides/list_to_map.go @@ -6,9 +6,7 @@ import ( "gopkg.in/yaml.v2" ) -type ListToMap struct { - m map[string]struct{} -} +type ListToMap map[string]struct{} var _ yaml.Marshaler = (*ListToMap)(nil) var _ yaml.Unmarshaler = (*ListToMap)(nil) @@ -18,7 +16,7 @@ var _ json.Unmarshaler = (*ListToMap)(nil) // MarshalYAML implements the Marshal interface of the yaml pkg. func (l ListToMap) MarshalYAML() (interface{}, error) { list := make([]string, 0) - for k := range l.m { + for k := range l { list = append(list, k) } @@ -36,9 +34,9 @@ func (l *ListToMap) UnmarshalYAML(unmarshal func(interface{}) error) error { return err } - l.m = make(map[string]struct{}) + *l = make(map[string]struct{}) for _, element := range list { - l.m[element] = struct{}{} + (*l)[element] = struct{}{} } return nil } @@ -46,7 +44,7 @@ func (l *ListToMap) UnmarshalYAML(unmarshal func(interface{}) error) error { // MarshalJSON implements the Marshal interface of the json pkg. func (l ListToMap) MarshalJSON() ([]byte, error) { list := make([]string, 0) - for k := range l.m { + for k := range l { list = append(list, k) } @@ -61,16 +59,16 @@ func (l *ListToMap) UnmarshalJSON(b []byte) error { return err } - l.m = make(map[string]struct{}) + *l = make(map[string]struct{}) for _, element := range list { - l.m[element] = struct{}{} + (*l)[element] = struct{}{} } return nil } func (l *ListToMap) GetMap() map[string]struct{} { - if l.m == nil { - l.m = map[string]struct{}{} + if *l == nil { + *l = map[string]struct{}{} } - return l.m + return *l } diff --git a/modules/overrides/list_to_map_test.go b/modules/overrides/list_to_map_test.go index 1990aa23fe4..69c62fcabc1 100644 --- a/modules/overrides/list_to_map_test.go +++ b/modules/overrides/list_to_map_test.go @@ -8,43 +8,26 @@ import ( "gopkg.in/yaml.v2" ) -func TestListToMapMarshalOperations(t *testing.T) { +func TestListToMapMarshalOperationsYAML(t *testing.T) { testCases := []struct { name string inputYAML string - inputJSON string expectedListToMapYAML ListToMap - expectedListToMapJSON ListToMap marshalledYAML string - marshalledJSON string }{ { name: "empty map", - inputYAML: "null", + inputYAML: "", expectedListToMapYAML: ListToMap{}, marshalledYAML: "null\n", - inputJSON: "[]", - expectedListToMapJSON: ListToMap{ - m: map[string]struct{}{}, - }, - marshalledJSON: "[]", }, { name: "map with entries", inputYAML: "- foo", expectedListToMapYAML: ListToMap{ - m: map[string]struct{}{ - "foo": {}, - }, + "foo": {}, }, marshalledYAML: "- foo\n", - inputJSON: "[\"foo\"]", - expectedListToMapJSON: ListToMap{ - m: map[string]struct{}{ - "foo": {}, - }, - }, - marshalledJSON: "[\"foo\"]", }, } @@ -53,20 +36,50 @@ func TestListToMapMarshalOperations(t *testing.T) { // YAML to struct var l ListToMap assert.NoError(t, yaml.Unmarshal([]byte(tc.inputYAML), &l)) + assert.NotNil(t, l.GetMap()) assert.Equal(t, tc.expectedListToMapYAML, l) // struct to YAML bytes, err := yaml.Marshal(tc.expectedListToMapYAML) assert.NoError(t, err) assert.Equal(t, tc.marshalledYAML, string(bytes)) + }) + } +} +func TestListToMapMarshalOperationsJSON(t *testing.T) { + testCases := []struct { + name string + inputJSON string + expectedListToMapJSON ListToMap + marshalledJSON string + }{ + { + name: "empty map", + inputJSON: "[]", + expectedListToMapJSON: ListToMap{}, + marshalledJSON: "[]", + }, + { + name: "map with entries", + inputJSON: "[\"foo\"]", + expectedListToMapJSON: ListToMap{ + "foo": {}, + }, + marshalledJSON: "[\"foo\"]", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { // JSON to struct - var l2 ListToMap - assert.NoError(t, json.Unmarshal([]byte(tc.inputJSON), &l2)) - assert.Equal(t, tc.expectedListToMapJSON, l2) + var l ListToMap + assert.NoError(t, json.Unmarshal([]byte(tc.inputJSON), &l)) + assert.NotNil(t, l.GetMap()) + assert.Equal(t, tc.expectedListToMapJSON, l) // struct to JSON - bytes, err = json.Marshal(tc.expectedListToMapJSON) + bytes, err := json.Marshal(tc.expectedListToMapJSON) assert.NoError(t, err) assert.Equal(t, tc.marshalledJSON, string(bytes)) }) diff --git a/modules/overrides/overrides.go b/modules/overrides/overrides.go index 3092b5c4aeb..320d5db1ae8 100644 --- a/modules/overrides/overrides.go +++ b/modules/overrides/overrides.go @@ -244,8 +244,8 @@ func (o *Overrides) IngestionBurstSizeBytes(userID string) int { } // SearchTagsAllowList is the list of tags to be extracted for search, for this tenant -func (o *Overrides) SearchTagsAllowList(userID string) *ListToMap { - return &o.getOverridesForUser(userID).SearchTagsAllowList +func (o *Overrides) SearchTagsAllowList(userID string) map[string]struct{} { + return o.getOverridesForUser(userID).SearchTagsAllowList.GetMap() } // BlockRetention is the duration of the block retention for this tenant From d3745b21c718332f608321ea2b79afe71254dbb2 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 23 Sep 2021 15:25:20 +0530 Subject: [PATCH 8/9] Test denylist/allowlist in docker-compose tempo-search example Signed-off-by: Annanay --- .../tempo-search/docker-compose.yaml | 23 +++++---- .../tempo-search/overrides.yaml | 9 ++++ .../docker-compose/tempo-search/tempo.yaml | 51 +++++++++++++++++++ modules/overrides/list_to_map_test.go | 8 +++ 4 files changed, 81 insertions(+), 10 deletions(-) create mode 100644 example/docker-compose/tempo-search/overrides.yaml create mode 100644 example/docker-compose/tempo-search/tempo.yaml diff --git a/example/docker-compose/tempo-search/docker-compose.yaml b/example/docker-compose/tempo-search/docker-compose.yaml index 44316fc8374..4cfe906792d 100644 --- a/example/docker-compose/tempo-search/docker-compose.yaml +++ b/example/docker-compose/tempo-search/docker-compose.yaml @@ -5,21 +5,24 @@ services: image: grafana/tempo:latest command: [ "-search.enabled=true", "-config.file=/etc/tempo.yaml" ] volumes: - - ../local/tempo-local.yaml:/etc/tempo.yaml + - ./tempo.yaml:/etc/tempo.yaml + - ./overrides.yaml:/etc/overrides.yaml - ./tempo-data/:/tmp/tempo ports: - "3200:3200" # tempo - "14268" # jaeger ingest - tempo-query: - image: grafana/tempo-query:latest - command: [ "--grpc-storage-plugin.configuration-file=/etc/tempo-query.yaml" ] - volumes: - - ./tempo-query.yaml:/etc/tempo-query.yaml - ports: - - "16686:16686" # jaeger-ui - depends_on: - - tempo +# Commenting out because Grafana UI has search enabled. Uncomment if you want to use the Jaeger UI! +# +# tempo-query: +# image: grafana/tempo-query:latest +# command: [ "--grpc-storage-plugin.configuration-file=/etc/tempo-query.yaml" ] +# volumes: +# - ./tempo-query.yaml:/etc/tempo-query.yaml +# ports: +# - "16686:16686" # jaeger-ui +# depends_on: +# - tempo synthetic-load-generator: image: omnition/synthetic-load-generator:1.0.25 diff --git a/example/docker-compose/tempo-search/overrides.yaml b/example/docker-compose/tempo-search/overrides.yaml new file mode 100644 index 00000000000..ad5df574b37 --- /dev/null +++ b/example/docker-compose/tempo-search/overrides.yaml @@ -0,0 +1,9 @@ +overrides: + "single-tenant": + ingestion_rate_strategy: "local" + ingestion_rate_limit_bytes: 5_000_000 + ingestion_burst_size_bytes: 5_000_000 + max_traces_per_user: 15_000 + max_bytes_per_trace: 5_000_000 + max_search_bytes_per_trace: 1_000_000 + block_retention: 1h diff --git a/example/docker-compose/tempo-search/tempo.yaml b/example/docker-compose/tempo-search/tempo.yaml new file mode 100644 index 00000000000..74c190b3668 --- /dev/null +++ b/example/docker-compose/tempo-search/tempo.yaml @@ -0,0 +1,51 @@ +server: + http_listen_port: 3200 + +distributor: + search_tags_deny_list: + - "instance" + - "version" + receivers: # this configuration will listen on all ports and protocols that tempo is capable of. + jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can + protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver + thrift_http: # + grpc: # for a production deployment you should only enable the receivers you need! + thrift_binary: + thrift_compact: + zipkin: + otlp: + protocols: + http: + grpc: + opencensus: + +ingester: + trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it + max_block_bytes: 1_000_000 # cut the head block when it hits this size or ... + max_block_duration: 5m # this much time passes + +compactor: + compaction: + compaction_window: 1h # blocks in this time window will be compacted together + max_block_bytes: 100_000_000 # maximum size of compacted blocks + block_retention: 1h + compacted_block_retention: 10m + +storage: + trace: + backend: local # backend configuration to use + block: + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives + index_downsample_bytes: 1000 # number of bytes per index record + encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + wal: + path: /tmp/tempo/wal # where to store the the wal locally + encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 + local: + path: /tmp/tempo/blocks + pool: + max_workers: 100 # worker pool determines the number of parallel requests to the object store backend + queue_depth: 10000 + +overrides: + per_tenant_override_config: /etc/overrides.yaml \ No newline at end of file diff --git a/modules/overrides/list_to_map_test.go b/modules/overrides/list_to_map_test.go index 69c62fcabc1..62926903b76 100644 --- a/modules/overrides/list_to_map_test.go +++ b/modules/overrides/list_to_map_test.go @@ -29,6 +29,14 @@ func TestListToMapMarshalOperationsYAML(t *testing.T) { }, marshalledYAML: "- foo\n", }, + { + name: "explicit string entries", + inputYAML: "- \"foo\"", + expectedListToMapYAML: ListToMap{ + "foo": {}, + }, + marshalledYAML: "- foo\n", + }, } for _, tc := range testCases { From a87ae2042eedcaf43507f67184ce3abd85fb7f51 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 23 Sep 2021 15:35:29 +0530 Subject: [PATCH 9/9] Fix single-tenant overrides Signed-off-by: Annanay --- .../docker-compose/tempo-search/overrides.yaml | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/example/docker-compose/tempo-search/overrides.yaml b/example/docker-compose/tempo-search/overrides.yaml index ad5df574b37..72971ca395f 100644 --- a/example/docker-compose/tempo-search/overrides.yaml +++ b/example/docker-compose/tempo-search/overrides.yaml @@ -1,9 +1,12 @@ overrides: "single-tenant": + search_tags_allow_list: + - "instance" ingestion_rate_strategy: "local" - ingestion_rate_limit_bytes: 5_000_000 - ingestion_burst_size_bytes: 5_000_000 - max_traces_per_user: 15_000 - max_bytes_per_trace: 5_000_000 - max_search_bytes_per_trace: 1_000_000 - block_retention: 1h + ingestion_rate_limit_bytes: 15000000 + ingestion_burst_size_bytes: 20000000 + max_traces_per_user: 10000 + max_global_traces_per_user: 0 + max_bytes_per_trace: 50000 + max_search_bytes_per_trace: 0 + block_retention: 0s