Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support global denylist and per-tenant allowlist of tags for search data #960

Merged
merged 13 commits into from
Sep 23, 2021
Merged
27 changes: 14 additions & 13 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
## main / unreleased

* [CHANGE] **BREAKING CHANGE** Drop support for v0 and v1 blocks. See [1.1 changelog](https://github.com/grafana/tempo/releases/tag/v1.1.0) for details [#919](https://github.com/grafana/tempo/pull/919) (@joe-elliott)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
* [CHANGE] **BREAKING CHANGE** Consolidate status information onto /status endpoint [ #952 ](https://github.com/grafana/tempo/pull/952) @zalegrala)
The following endpoints moved.
`/runtime_config` moved to `/status/runtime_config`
`/config` moved to `/status/config`
`/services` moved to `/status/services`
* [FEATURE] Add ability to search ingesters for traces [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25)
* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno)
* [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott)
* [ENHANCEMENT] Added "search blocks" cli option. [#972](https://github.com/grafana/tempo/pull/972) (@joe-elliott)
* [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394)
Expand Down Expand Up @@ -42,15 +47,11 @@
* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio)
* [ENHANCEMENT] Add search block headers for wal blocks [#963](https://github.com/grafana/tempo/pull/963) (@mdisibio)
* [ENHANCEMENT] Add support for vulture sending long running traces [#951](https://github.com/grafana/tempo/pull/951) (@zalegrala)
* [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394)
* [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover)
* [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala)
* [CHANGE] **BREAKING CHANGE** Consolidate status information onto /status endpoint [ #952 ](https://github.com/grafana/tempo/pull/952) @zalegrala)
The following endpoints moved.
`/runtime_config` moved to `/status/runtime_config`
`/config` moved to `/status/config`
`/services` moved to `/status/services`
* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno)
* [ENHANCEMENT] Support global denylist and per-tenant allowlist of tags for search data. [#960](https://github.com/grafana/tempo/pull/960) (@annanay25)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25)

## v1.1.0 / 2021-08-26
* [CHANGE] Upgrade Cortex from v1.9.0 to v1.9.0-131-ga4bf10354 [#841](https://github.com/grafana/tempo/pull/841) (@aknuds1)
Expand Down
4 changes: 4 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ distributor:
# note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
[extend_writes: <bool>]

# Optional.
# List of tags that will **not** be extracted from trace data for search lookups
# This is a global config that will apply to all tenants
[search_tags_deny_list: <list of string> | default = ]
```

## Ingester
Expand Down
23 changes: 13 additions & 10 deletions example/docker-compose/tempo-search/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,24 @@ services:
image: grafana/tempo:latest
command: [ "-search.enabled=true", "-config.file=/etc/tempo.yaml" ]
volumes:
- ../local/tempo-local.yaml:/etc/tempo.yaml
- ./tempo.yaml:/etc/tempo.yaml
- ./overrides.yaml:/etc/overrides.yaml
- ./tempo-data/:/tmp/tempo
ports:
- "3200:3200" # tempo
- "14268" # jaeger ingest

tempo-query:
image: grafana/tempo-query:latest
command: [ "--grpc-storage-plugin.configuration-file=/etc/tempo-query.yaml" ]
volumes:
- ./tempo-query.yaml:/etc/tempo-query.yaml
ports:
- "16686:16686" # jaeger-ui
depends_on:
- tempo
# Commenting out because Grafana UI has search enabled. Uncomment if you want to use the Jaeger UI!
#
# tempo-query:
# image: grafana/tempo-query:latest
# command: [ "--grpc-storage-plugin.configuration-file=/etc/tempo-query.yaml" ]
# volumes:
# - ./tempo-query.yaml:/etc/tempo-query.yaml
# ports:
# - "16686:16686" # jaeger-ui
# depends_on:
# - tempo

synthetic-load-generator:
image: omnition/synthetic-load-generator:1.0.25
Expand Down
12 changes: 12 additions & 0 deletions example/docker-compose/tempo-search/overrides.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
overrides:
"single-tenant":
search_tags_allow_list:
- "instance"
ingestion_rate_strategy: "local"
ingestion_rate_limit_bytes: 15000000
ingestion_burst_size_bytes: 20000000
max_traces_per_user: 10000
max_global_traces_per_user: 0
max_bytes_per_trace: 50000
max_search_bytes_per_trace: 0
block_retention: 0s
51 changes: 51 additions & 0 deletions example/docker-compose/tempo-search/tempo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
server:
http_listen_port: 3200

distributor:
search_tags_deny_list:
- "instance"
- "version"
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
opencensus:

ingester:
trace_idle_period: 10s # the length of time after a trace has not received spans to consider it complete and flush it
max_block_bytes: 1_000_000 # cut the head block when it hits this size or ...
max_block_duration: 5m # this much time passes

compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m

storage:
trace:
backend: local # backend configuration to use
block:
bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives
index_downsample_bytes: 1000 # number of bytes per index record
encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
wal:
path: /tmp/tempo/wal # where to store the the wal locally
encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
local:
path: /tmp/tempo/blocks
pool:
max_workers: 100 # worker pool determines the number of parallel requests to the object store backend
queue_depth: 10000

overrides:
per_tenant_override_config: /etc/overrides.yaml
2 changes: 2 additions & 0 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Config struct {
// note that setting these two config values reduces tolerance to failures on rollout b/c there is always one guaranteed to be failing replica
ExtendWrites bool `yaml:"extend_writes"`

SearchTagsDenyList []string `yaml:"search_tags_deny_list"`

// For testing.
factory func(addr string) (ring_client.PoolClient, error) `yaml:"-"`
}
Expand Down
29 changes: 26 additions & 3 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ type Distributor struct {
ingestersRing ring.ReadRing
pool *ring_client.Pool
DistributorRing *ring.Ring
searchEnabled bool
overrides *overrides.Overrides

// search
searchEnabled bool
globalTagsToDrop map[string]struct{}

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
Expand Down Expand Up @@ -147,6 +151,12 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi

subservices = append(subservices, pool)

// turn list into map for efficient checking
tagsToDrop := map[string]struct{}{}
for _, tag := range cfg.SearchTagsDenyList {
tagsToDrop[tag] = struct{}{}
}

d := &Distributor{
cfg: cfg,
clientCfg: clientCfg,
Expand All @@ -155,6 +165,8 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi
DistributorRing: distributorRing,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
searchEnabled: searchEnabled,
globalTagsToDrop: tagsToDrop,
overrides: o,
}

cfgReceivers := cfg.Receivers
Expand Down Expand Up @@ -249,10 +261,21 @@ func (d *Distributor) Push(ctx context.Context, req *tempopb.PushRequest) (*temp
return nil, err
}

//var
var searchData [][]byte
if d.searchEnabled {
searchData = extractSearchDataAll(traces, ids)
perTenantAllowedTags := d.overrides.SearchTagsAllowList(userID)
searchData = extractSearchDataAll(traces, ids, func(tag string) bool {
// if in per tenant override, extract
if _, ok := perTenantAllowedTags[tag]; ok {
return true
}
// if in global deny list, drop
if _, ok := d.globalTagsToDrop[tag]; ok {
return false
}
// allow otherwise
return true
})
}

err = d.sendToIngestersViaBytes(ctx, userID, traces, searchData, keys, ids)
Expand Down
20 changes: 17 additions & 3 deletions modules/distributor/search_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/grafana/tempo/tempodb/search"
)

type extractTagFunc func(tag string) bool

// extractSearchDataAll returns flatbuffer search data for every trace.
func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte) [][]byte {
func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte, extractTag extractTagFunc) [][]byte {
headers := make([][]byte, len(traces))

for i, t := range traces {
headers[i] = extractSearchData(t, ids[i])
headers[i] = extractSearchData(t, ids[i], extractTag)
}

return headers
Expand All @@ -24,7 +26,7 @@ func extractSearchDataAll(traces []*tempopb.Trace, ids [][]byte) [][]byte {
// extractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
func extractSearchData(trace *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id
Expand All @@ -33,6 +35,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
Expand All @@ -49,6 +54,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {

// Span attrs
for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(fmt.Sprint(search.RootSpanPrefix, a.Key), s)
}
Expand All @@ -57,6 +65,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(fmt.Sprint(search.RootSpanPrefix, a.Key), s)
}
Expand All @@ -70,6 +81,9 @@ func extractSearchData(trace *tempopb.Trace, id []byte) []byte {
data.SetEndTimeUnixNano(s.EndTimeUnixNano)

for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
Expand Down
45 changes: 43 additions & 2 deletions modules/distributor/search_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ func TestExtractSearchData(t *testing.T) {
name string
trace *tempopb.Trace
id []byte
extractTag extractTagFunc
searchData *tempofb.SearchEntryMutable
}{
{
name: "trace with root span",
name: "extracts search tags",
trace: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
Expand Down Expand Up @@ -73,12 +74,52 @@ func TestExtractSearchData(t *testing.T) {
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
extractTag: func(tag string) bool {
return true
},
},
{
name: "drops tags in deny list",
trace: &tempopb.Trace{
Batches: []*v1.ResourceSpans{
{
Resource: &v1_resource.Resource{
Attributes: []*v1_common.KeyValue{
{
Key: "foo",
Value: &v1_common.AnyValue{
Value: &v1_common.AnyValue_StringValue{StringValue: "bar"},
},
},
{
Key: "bar",
Value: &v1_common.AnyValue{
Value: &v1_common.AnyValue_StringValue{StringValue: "baz"},
},
},
},
},
},
},
},
id: traceIDA,
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.SearchDataMap{
"bar": []string{"baz"},
},
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
extractTag: func(tag string) bool {
return tag != "foo"
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id))
assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.extractTag))
})
}
}
7 changes: 4 additions & 3 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ const (
// limits via flags, or per-user limits via yaml config.
type Limits struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"`
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"`
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`
SearchTagsAllowList ListToMap `yaml:"search_tags_allow_list" json:"search_tags_allow_list"`

// Ingester enforced limits.
MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"`
Expand Down
6 changes: 6 additions & 0 deletions modules/overrides/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestLimitsYamlMatchJson(t *testing.T) {
ingestion_rate_strategy: global
ingestion_rate_limit_bytes: 100_000
ingestion_burst_size_bytes: 100_000
search_tags_allow_list:
- a
- b

max_traces_per_user: 1000
max_global_traces_per_user: 1000
Expand All @@ -53,6 +56,9 @@ per_tenant_override_period: 1m
"ingestion_rate_strategy": "global",
"ingestion_rate_limit_bytes": 100000,
"ingestion_burst_size_bytes": 100000,
"search_tags_allow_list" : [
"a", "b"
],

"max_traces_per_user": 1000,
"max_global_traces_per_user": 1000,
Expand Down
Loading