diff --git a/docs/sources/clients/promtail/stages/_index.md b/docs/sources/clients/promtail/stages/_index.md index 5b5e2b833a6d..15969926862e 100644 --- a/docs/sources/clients/promtail/stages/_index.md +++ b/docs/sources/clients/promtail/stages/_index.md @@ -24,6 +24,7 @@ Action stages: - [timestamp](timestamp/): Set the timestamp value for the log entry. - [output](output/): Set the log line text. - [labeldrop](labeldrop/): Drop label set for the log entry. + - [labelallow](labelallow/): Allow label set for the log entry. - [labels](labels/): Update the label set for the log entry. - [metrics](metrics/): Calculate metrics based on extracted data. - [tenant](tenant/): Set the tenant ID value to use for the log entry. diff --git a/docs/sources/clients/promtail/stages/labelallow.md b/docs/sources/clients/promtail/stages/labelallow.md new file mode 100644 index 000000000000..46d07d208278 --- /dev/null +++ b/docs/sources/clients/promtail/stages/labelallow.md @@ -0,0 +1,41 @@ +--- +title: labelallow +--- +# `labelallow` stage + +The labelallow stage is an action stage that allows only the provided labels +to be included in the label set that is sent to Loki with the log entry. + +## Schema + +```yaml +labelallow: + - [] + ... +``` + +### Examples + +For the given pipeline: + +```yaml +kubernetes_sd_configs: + - role: pod +pipeline_stages: +- docker: {} +- labelallow: + - kubernetes_pod_name + - kubernetes_container_name +``` + +Given the following incoming labels: + +- `kubernetes_pod_name`: `"loki-pqrs"` +- `kubernetes_container_name`: `"loki"` +- `kubernetes_pod_template_hash`: `"79f5db67b"` +- `kubernetes_controller_revision_hash`: `"774858987d"` + +Only the below labels would be sent to `loki` + +- `kubernetes_pod_name`: `"loki-pqrs"` +- `contaikubernetes_container_namener`: `"loki"` diff --git a/docs/sources/clients/promtail/stages/labeldrop.md b/docs/sources/clients/promtail/stages/labeldrop.md index 6559fb558c7e..e5b08be2e142 100644 --- a/docs/sources/clients/promtail/stages/labeldrop.md +++ b/docs/sources/clients/promtail/stages/labeldrop.md @@ -3,7 +3,7 @@ title: labeldrop --- # `labeldrop` stage -The labeldrop stage is an action stage that takes drops labels from +The labeldrop stage is an action stage that drops labels from the label set that is sent to Loki with the log entry. ## Schema diff --git a/pkg/logentry/stages/labelallow.go b/pkg/logentry/stages/labelallow.go new file mode 100644 index 000000000000..8ab51c4aa659 --- /dev/null +++ b/pkg/logentry/stages/labelallow.go @@ -0,0 +1,65 @@ +package stages + +import ( + "time" + + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +const ( + // ErrEmptyLabelAllowStageConfig error returned if config is empty + ErrEmptyLabelAllowStageConfig = "labelallow stage config cannot be empty" +) + +// labelallowConfig is a slice of labels to be included +type LabelAllowConfig []string + +func validateLabelAllowConfig(c LabelAllowConfig) error { + if c == nil || len(c) < 1 { + return errors.New(ErrEmptyLabelAllowStageConfig) + } + + return nil +} + +func newLabelAllowStage(configs interface{}) (Stage, error) { + cfgs := &LabelAllowConfig{} + err := mapstructure.Decode(configs, cfgs) + if err != nil { + return nil, err + } + + err = validateLabelAllowConfig(*cfgs) + if err != nil { + return nil, err + } + + labelMap := make(map[string]struct{}) + for _, label := range *cfgs { + labelMap[label] = struct{}{} + } + + return toStage(&labelAllowStage{ + labels: labelMap, + }), nil +} + +type labelAllowStage struct { + labels map[string]struct{} +} + +// Process implements Stage +func (l *labelAllowStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { + for label := range labels { + if _, ok := l.labels[string(label)]; !ok { + delete(labels, label) + } + } +} + +// Name implements Stage +func (l *labelAllowStage) Name() string { + return StageTypeLabelAllow +} diff --git a/pkg/logentry/stages/labelallow_test.go b/pkg/logentry/stages/labelallow_test.go new file mode 100644 index 000000000000..2c982db29544 --- /dev/null +++ b/pkg/logentry/stages/labelallow_test.go @@ -0,0 +1,72 @@ +package stages + +import ( + "testing" + "time" + + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" +) + +func Test_addLabelStage_Process(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util_log.InitLogger(cfg) + Debug = true + + tests := []struct { + name string + config *LabelAllowConfig + inputLabels model.LabelSet + expectedLabels model.LabelSet + }{ + { + name: "allow single label", + config: &LabelAllowConfig{"testLabel1"}, + inputLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + expectedLabels: model.LabelSet{ + "testLabel1": "testValue", + }, + }, + { + name: "allow multiple labels", + config: &LabelAllowConfig{"testLabel1", "testLabel2"}, + inputLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + "testLabel3": "testValue", + }, + expectedLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + }, + { + name: "allow non-existing label", + config: &LabelAllowConfig{"foobar"}, + inputLabels: model.LabelSet{ + "testLabel1": "testValue", + "testLabel2": "testValue", + }, + expectedLabels: model.LabelSet{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + st, err := newLabelAllowStage(test.config) + if err != nil { + t.Fatal(err) + } + out := processEntries(st, newEntry(nil, test.inputLabels, "", time.Now()))[0] + assert.Equal(t, test.expectedLabels, out.Labels) + }) + } +} diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index c3e77f066560..d89c26c2b031 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -12,23 +12,24 @@ import ( ) const ( - StageTypeJSON = "json" - StageTypeRegex = "regex" - StageTypeReplace = "replace" - StageTypeMetric = "metrics" - StageTypeLabel = "labels" - StageTypeLabelDrop = "labeldrop" - StageTypeTimestamp = "timestamp" - StageTypeOutput = "output" - StageTypeDocker = "docker" - StageTypeCRI = "cri" - StageTypeMatch = "match" - StageTypeTemplate = "template" - StageTypePipeline = "pipeline" - StageTypeTenant = "tenant" - StageTypeDrop = "drop" - StageTypeMultiline = "multiline" - StageTypePack = "pack" + StageTypeJSON = "json" + StageTypeRegex = "regex" + StageTypeReplace = "replace" + StageTypeMetric = "metrics" + StageTypeLabel = "labels" + StageTypeLabelDrop = "labeldrop" + StageTypeTimestamp = "timestamp" + StageTypeOutput = "output" + StageTypeDocker = "docker" + StageTypeCRI = "cri" + StageTypeMatch = "match" + StageTypeTemplate = "template" + StageTypePipeline = "pipeline" + StageTypeTenant = "tenant" + StageTypeDrop = "drop" + StageTypeMultiline = "multiline" + StageTypePack = "pack" + StageTypeLabelAllow = "labelallow" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -151,6 +152,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeLabelAllow: + s, err = newLabelAllowStage(cfg) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) }