Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support global denylist and per-tenant allowlist of tags for search data #960

Merged
merged 13 commits into from
Sep 23, 2021
Merged
27 changes: 14 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
## main / unreleased

* [CHANGE] **BREAKING CHANGE** Drop support for v0 and v1 blocks. See [1.1 changelog](https://github.com/grafana/tempo/releases/tag/v1.1.0) for details [#919](https://github.com/grafana/tempo/pull/919) (@joe-elliott)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
* [CHANGE] **BREAKING CHANGE** Consolidate status information onto /status endpoint [ #952 ](https://github.com/grafana/tempo/pull/952) @zalegrala)
The following endpoints moved.
`/runtime_config` moved to `/status/runtime_config`
`/config` moved to `/status/config`
`/services` moved to `/status/services`
* [FEATURE] Add ability to search ingesters for traces [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25)
* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno)
* [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott)
* [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394)
* [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk)
Expand Down Expand Up @@ -40,15 +45,11 @@
* [ENHANCEMENT] Compression updates: Added s2, improved snappy performance [#961](https://github.com/grafana/tempo/pull/961) (@joe-elliott)
* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio)
* [ENHANCEMENT] Add search block headers for wal blocks [#963](https://github.com/grafana/tempo/pull/963) (@mdisibio)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
* [CHANGE] **BREAKING CHANGE** Consolidate status information onto /status endpoint [ #952 ](https://github.com/grafana/tempo/pull/952) @zalegrala)
The following endpoints moved.
`/runtime_config` moved to `/status/runtime_config`
`/config` moved to `/status/config`
`/services` moved to `/status/services`
* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno)
* [ENHANCEMENT] Support global denylist and per-tenant allowlist of tags for search data. [#960](https://github.com/grafana/tempo/pull/960) (@annanay25)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25)

## v1.1.0 / 2021-08-26
* [CHANGE] Upgrade Cortex from v1.9.0 to v1.9.0-131-ga4bf10354 [#841](https://github.com/grafana/tempo/pull/841) (@aknuds1)
Expand Down
4 changes: 4 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ distributor:
# note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
[extend_writes: <bool>]

# Optional.
# List of tags that will **not** be extracted from trace data for search lookups
# This is a global config that will apply to all tenants
[search_tags_deny_list: <list of string> | default = ]
```

## Ingester
Expand Down
2 changes: 2 additions & 0 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Config struct {
// note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
ExtendWrites bool `yaml:"extend_writes"`

SearchTagsDenyList []string `yaml:"search_tags_deny_list"`

// For testing.
factory func(addr string) (ring_client.PoolClient, error) `yaml:"-"`
}
Expand Down
29 changes: 26 additions & 3 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ type Distributor struct {
ingestersRing ring.ReadRing
pool *ring_client.Pool
DistributorRing *ring.Ring
searchEnabled bool
overrides *overrides.Overrides

// search
searchEnabled bool
globalTagsToDrop map[string]struct{}

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
Expand Down Expand Up @@ -147,6 +151,12 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi

subservices = append(subservices, pool)

// turn list into map for efficient checking
tagsToDrop := map[string]struct{}{}
for _, tag := range cfg.SearchTagsDenyList {
tagsToDrop[tag] = struct{}{}
}

d := &Distributor{
cfg: cfg,
clientCfg: clientCfg,
Expand All @@ -155,6 +165,8 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
DistributorRing: distributorRing,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
searchEnabled: searchEnabled,
globalTagsToDrop: tagsToDrop,
overrides: o,
}

cfgReceivers := cfg.Receivers
Expand Down Expand Up @@ -249,10 +261,21 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp
return nil, err
}

//var
var searchData [][]byte
if d.searchEnabled {
searchData = extractSearchDataAll(traces, ids)
perTenantAllowedTags := d.overrides.SearchTagsAllowList(userID)
searchData = extractSearchDataAll(traces, ids, func(tag string) bool {
// if in per tenant override, extract
if _, ok := perTenantAllowedTags.GetMap()[tag]; ok {
return true
}
// if in global deny list, drop
if _, ok := d.globalTagsToDrop[tag]; ok {
return false
}
// allow otherwise
return true
})
}

err = d.sendToIngestersViaBytes(ctx, userID, traces, searchData, keys, ids)
Expand Down
20 changes: 17 additions & 3 deletions modules/distributor/search_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/grafana/tempo/tempodb/search"
)

type extractTagFunc func(tag string) bool

// extractSearchDataAll returns flatbuffer search data for every trace.
func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte) [][]byte {
func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte, extractTag extractTagFunc) [][]byte {
headers := make([][]byte, len(traces))

for i, t := range traces {
headers[i] = extractSearchData(t, ids[i])
headers[i] = extractSearchData(t, ids[i], extractTag)
}

return headers
Expand All @@ -24,7 +26,7 @@ func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte) [][]byte {
// extractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
func extractSearchData(trace *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id
Expand All @@ -33,6 +35,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
Expand All @@ -49,6 +54,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {

// Span attrs
for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(fmt.Sprint(search.RootSpanPrefix, a.Key), s)
}
Expand All @@ -57,6 +65,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(fmt.Sprint(search.RootSpanPrefix, a.Key), s)
}
Expand All @@ -70,6 +81,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
data.SetEndTimeUnixNano(s.EndTimeUnixNano)

for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
Expand Down
45 changes: 43 additions & 2 deletions modules/distributor/search_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ func TestExtractSearchData(t *testing.T) {
name string
trace *tempopb.Trace
id []byte
extractTag extractTagFunc
searchData *tempofb.SearchEntryMutable
}{
{
name: "trace with root span",
name: "extracts search tags",
trace: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
Expand Down Expand Up @@ -73,12 +74,52 @@ func TestExtractSearchData(t *testing.T) {
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
extractTag: func(tag string) bool {
return true
},
},
{
name: "drops tags in deny list",
trace: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
Resource: &v1_resource.Resource{
Attributes: []*v1_common.KeyValue{
{
Key: "foo",
Value: &v1_common.AnyValue{
Value: &v1_common.AnyValue_StringValue{StringValue: "bar"},
},
},
{
Key: "bar",
Value: &v1_common.AnyValue{
Value: &v1_common.AnyValue_StringValue{StringValue: "baz"},
},
},
},
},
},
},
},
id: traceIDA,
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.SearchDataMap{
"bar": []string{"baz"},
},
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
extractTag: func(tag string) bool {
return tag != "foo"
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id))
assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.extractTag))
})
}
}
7 changes: 4 additions & 3 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ const (
// limits via flags, or per-user limits via yaml config.
type Limits struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"`
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"`
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`
SearchTagsAllowList ListToMap `yaml:"search_tags_allow_list" json:"search_tags_allow_list"`

// Ingester enforced limits.
MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"`
Expand Down
6 changes: 6 additions & 0 deletions modules/overrides/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestLimitsYamlMatchJson(t *testing.T) {
ingestion_rate_strategy: global
ingestion_rate_limit_bytes: 100_000
ingestion_burst_size_bytes: 100_000
search_tags_allow_list:
- a
- b

max_traces_per_user: 1000
max_global_traces_per_user: 1000
Expand All @@ -53,6 +56,9 @@ per_tenant_override_period: 1m
"ingestion_rate_strategy": "global",
"ingestion_rate_limit_bytes": 100000,
"ingestion_burst_size_bytes": 100000,
"search_tags_allow_list" : [
"a", "b"
],

"max_traces_per_user": 1000,
"max_global_traces_per_user": 1000,
Expand Down
76 changes: 76 additions & 0 deletions modules/overrides/list_to_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package overrides

import (
"encoding/json"

"gopkg.in/yaml.v2"
)

type ListToMap struct {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
m map[string]struct{}
}

var _ yaml.Marshaler = (*ListToMap)(nil)
var _ yaml.Unmarshaler = (*ListToMap)(nil)
var _ json.Marshaler = (*ListToMap)(nil)
var _ json.Unmarshaler = (*ListToMap)(nil)

// MarshalYAML implements the Marshal interface of the yaml pkg.
func (l ListToMap) MarshalYAML() (interface{}, error) {
list := make([]string, 0)
for k := range l.m {
list = append(list, k)
}

if len(list) == 0 {
return nil, nil
}
return list, nil
}

// UnmarshalYAML implements the Unmarshaler interface of the yaml pkg.
func (l *ListToMap) UnmarshalYAML(unmarshal func(interface{}) error) error {
list := make([]string, 0)
err := unmarshal(&list)
if err != nil {
return err
}

l.m = make(map[string]struct{})
for _, element := range list {
l.m[element] = struct{}{}
}
return nil
}

// MarshalJSON implements the Marshal interface of the json pkg.
func (l ListToMap) MarshalJSON() ([]byte, error) {
list := make([]string, 0)
for k := range l.m {
list = append(list, k)
}

return json.Marshal(&list)
}

// UnmarshalJSON implements the Unmarshal interface of the json pkg.
func (l *ListToMap) UnmarshalJSON(b []byte) error {
list := make([]string, 0)
err := json.Unmarshal(b, &list)
if err != nil {
return err
}

l.m = make(map[string]struct{})
for _, element := range list {
l.m[element] = struct{}{}
}
return nil
}

func (l *ListToMap) GetMap() map[string]struct{} {
if l.m == nil {
l.m = map[string]struct{}{}
}
return l.m
}
Loading