Skip to content

Commit

Permalink
Support labelallow stage in Promtail (#3468)
Browse files Browse the repository at this point in the history
* add labelallow stage

* added docs

* add docs

* fix lint

* update stage name in _index.md

(cherry picked from commit 0f5fcee)
  • Loading branch information
adityacs authored and slim-bean committed Apr 6, 2021
1 parent a27c799 commit 3b30483
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 18 deletions.
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/stages/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Action stages:
- [timestamp](timestamp/): Set the timestamp value for the log entry.
- [output](output/): Set the log line text.
- [labeldrop](labeldrop/): Drop label set for the log entry.
- [labelallow](labelallow/): Allow label set for the log entry.
- [labels](labels/): Update the label set for the log entry.
- [metrics](metrics/): Calculate metrics based on extracted data.
- [tenant](tenant/): Set the tenant ID value to use for the log entry.
Expand Down
41 changes: 41 additions & 0 deletions docs/sources/clients/promtail/stages/labelallow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
title: labelallow
---
# `labelallow` stage

The labelallow stage is an action stage that allows only the provided labels
to be included in the label set that is sent to Loki with the log entry.

## Schema

```yaml
labelallow:
- [<string>]
...
```

### Examples

For the given pipeline:

```yaml
kubernetes_sd_configs:
- role: pod
pipeline_stages:
- docker: {}
- labelallow:
- kubernetes_pod_name
- kubernetes_container_name
```
Given the following incoming labels:
- `kubernetes_pod_name`: `"loki-pqrs"`
- `kubernetes_container_name`: `"loki"`
- `kubernetes_pod_template_hash`: `"79f5db67b"`
- `kubernetes_controller_revision_hash`: `"774858987d"`

Only the below labels would be sent to `loki`

- `kubernetes_pod_name`: `"loki-pqrs"`
- `contaikubernetes_container_namener`: `"loki"`
2 changes: 1 addition & 1 deletion docs/sources/clients/promtail/stages/labeldrop.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: labeldrop
---
# `labeldrop` stage

The labeldrop stage is an action stage that takes drops labels from
The labeldrop stage is an action stage that drops labels from
the label set that is sent to Loki with the log entry.

## Schema
Expand Down
65 changes: 65 additions & 0 deletions pkg/logentry/stages/labelallow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package stages

import (
"time"

"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

const (
// ErrEmptyLabelAllowStageConfig error returned if config is empty
ErrEmptyLabelAllowStageConfig = "labelallow stage config cannot be empty"
)

// labelallowConfig is a slice of labels to be included
type LabelAllowConfig []string

func validateLabelAllowConfig(c LabelAllowConfig) error {
if c == nil || len(c) < 1 {
return errors.New(ErrEmptyLabelAllowStageConfig)
}

return nil
}

func newLabelAllowStage(configs interface{}) (Stage, error) {
cfgs := &LabelAllowConfig{}
err := mapstructure.Decode(configs, cfgs)
if err != nil {
return nil, err
}

err = validateLabelAllowConfig(*cfgs)
if err != nil {
return nil, err
}

labelMap := make(map[string]struct{})
for _, label := range *cfgs {
labelMap[label] = struct{}{}
}

return toStage(&labelAllowStage{
labels: labelMap,
}), nil
}

type labelAllowStage struct {
labels map[string]struct{}
}

// Process implements Stage
func (l *labelAllowStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for label := range labels {
if _, ok := l.labels[string(label)]; !ok {
delete(labels, label)
}
}
}

// Name implements Stage
func (l *labelAllowStage) Name() string {
return StageTypeLabelAllow
}
72 changes: 72 additions & 0 deletions pkg/logentry/stages/labelallow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package stages

import (
"testing"
"time"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ww "github.com/weaveworks/common/server"
)

func Test_addLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg)
Debug = true

tests := []struct {
name string
config *LabelAllowConfig
inputLabels model.LabelSet
expectedLabels model.LabelSet
}{
{
name: "allow single label",
config: &LabelAllowConfig{"testLabel1"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel1": "testValue",
},
},
{
name: "allow multiple labels",
config: &LabelAllowConfig{"testLabel1", "testLabel2"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
"testLabel3": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
},
{
name: "allow non-existing label",
config: &LabelAllowConfig{"foobar"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
st, err := newLabelAllowStage(test.config)
if err != nil {
t.Fatal(err)
}
out := processEntries(st, newEntry(nil, test.inputLabels, "", time.Now()))[0]
assert.Equal(t, test.expectedLabels, out.Labels)
})
}
}
40 changes: 23 additions & 17 deletions pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ import (
)

const (
StageTypeJSON = "json"
StageTypeRegex = "regex"
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
StageTypeCRI = "cri"
StageTypeMatch = "match"
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeJSON = "json"
StageTypeRegex = "regex"
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
StageTypeCRI = "cri"
StageTypeMatch = "match"
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
)

// Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
Expand Down Expand Up @@ -151,6 +152,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLabelAllow:
s, err = newLabelAllowStage(cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}
Expand Down

0 comments on commit 3b30483

Please sign in to comment.