Skip to content

Commit

Permalink
promtail: add multi-tenant support (#1135)
Browse files Browse the repository at this point in the history
* promtail: added tenant_id support to promtail client

* promtail: added tenant stage to dinamically override the tenant ID

* promtail: documented client's batch struct and improved tenant stage config pointer usage

* Added static value support to tenant stage
  • Loading branch information
pracucci authored and cyriltovena committed Nov 7, 2019
1 parent 3d2c643 commit 04f58c8
Show file tree
Hide file tree
Showing 13 changed files with 756 additions and 119 deletions.
26 changes: 25 additions & 1 deletion docs/clients/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and how to scrape logs from files.
* [metric_counter](#metric_counter)
* [metric_gauge](#metric_gauge)
* [metric_histogram](#metric_histogram)
* [tenant_stage](#tenant_stage)
* [journal_config](#journal_config)
* [relabel_config](#relabel_config)
* [static_config](#static_config)
Expand Down Expand Up @@ -135,6 +136,11 @@ Loki:
# URL for the Distributor.
url: <string>
# The tenant ID used by default to push logs to Loki. If omitted or empty
# it assumes Loki is running in single-tenant mode and no X-Scope-OrgID header
# it sent.
[tenant_id: <string>]
# Maximum amount of time to wait before sending a batch, even if that
# batch isn't full.
[batchwait: <duration> | default = 1s]
Expand Down Expand Up @@ -275,7 +281,8 @@ set of key-value pairs that is passed around from stage to stage.
<timestamp_stage> |
<output_stage> |
<labels_stage> |
<metrics_stage>
<metrics_stage> |
<tenant_stage>
]
```

Expand Down Expand Up @@ -536,6 +543,23 @@ config:
- <int>
```

#### tenant_stage

The tenant stage is an action stage that sets the tenant ID for the log entry
picking it from a field in the extracted data map.

```yaml
tenant:
# Name from extracted data to whose value should be set as tenant ID.
# Either source or value config option is required, but not both (they
# are mutually exclusive).
[ source: <string> ]
# Value to use to set the tenant ID when this stage is executed. Useful
# when this stage is included within a conditional pipeline with "match".
[ value: <string> ]
```

### journal_config

The `journal_config` block configures reading from the systemd journal from
Expand Down
1 change: 1 addition & 0 deletions docs/clients/promtail/stages/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Action stages:
* [output](./output.md): Set the log line text.
* [labels](./labels.md): Update the label set for the log entry.
* [metrics](./metrics.md): Calculate metrics based on extracted data.
* [tenant](./tenant.md): Set the tenant ID value to use for the log entry.

Filtering stages:

Expand Down
3 changes: 2 additions & 1 deletion docs/clients/promtail/stages/match.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ match:
<timestamp_stage> |
<output_stage> |
<labels_stage> |
<metrics_stage>
<metrics_stage> |
<tenant_stage>
]
```
Expand Down
80 changes: 80 additions & 0 deletions docs/clients/promtail/stages/tenant.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# `tenant` stage

The tenant stage is an action stage that sets the tenant ID for the log entry
picking it from a field in the extracted data map. If the field is missing, the
default promtail client [`tenant_id`](../configuration.md#client_config) will
be used.


## Schema

```yaml
tenant:
# Name from extracted data to whose value should be set as tenant ID.
# Either source or value config option is required, but not both (they
# are mutually exclusive).
[ source: <string> ]

# Value to use to set the tenant ID when this stage is executed. Useful
# when this stage is included within a conditional pipeline with "match".
[ value: <string> ]
```
### Example: extract the tenant ID from a structured log
For the given pipeline:
```yaml
pipeline_stages:
- json:
expressions:
customer_id: customer_id
- tenant:
source: customer_id
```
Given the following log line:
```json
{"customer_id":"1","log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}
```

The first stage would extract `customer_id` into the extracted map with a value of
`1`. The tenant stage would set the `X-Scope-OrgID` request header (used by Loki to
identify the tenant) to the value of the `customer_id` extracted data, which is `1`.


### Example: override the tenant ID with the configured value

For the given pipeline:

```yaml
pipeline_stages:
- json:
expressions:
app:
message:
- labels:
app:
- match:
selector: '{app="api"}'
stages:
- tenant:
value: "team-api"
- output:
source: message
```
Given the following log line:
```json
{"app":"api","log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}
```

The pipeline would:

1. Decode the JSON log
2. Set the label `app="api"`
3. Process the `match` stage checking if the `{app="api"}` selector matches
and - whenever it matches - run the sub stages. The `tenant` sub stage
would override the tenant with the value `"team-api"`.
6 changes: 6 additions & 0 deletions pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
StageTypeMatch = "match"
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
)

// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
Expand Down Expand Up @@ -94,6 +95,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeTenant:
s, err = newTenantStage(logger, cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}
Expand Down
106 changes: 106 additions & 0 deletions pkg/logentry/stages/tenant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package stages

import (
"reflect"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/promtail/constants"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

const (
ErrTenantStageEmptySourceOrValue = "source or value config are required"
ErrTenantStageConflictingSourceAndValue = "source and value are mutually exclusive: you should set source or value but not both"
)

type tenantStage struct {
cfg TenantConfig
logger log.Logger
}

type TenantConfig struct {
Source string `mapstructure:"source"`
Value string `mapstructure:"value"`
}

// validateTenantConfig validates the tenant stage configuration
func validateTenantConfig(c TenantConfig) error {
if c.Source == "" && c.Value == "" {
return errors.New(ErrTenantStageEmptySourceOrValue)
}

if c.Source != "" && c.Value != "" {
return errors.New(ErrTenantStageConflictingSourceAndValue)
}

return nil
}

// newTenantStage creates a new tenant stage to override the tenant ID from extracted data
func newTenantStage(logger log.Logger, configs interface{}) (*tenantStage, error) {
cfg := TenantConfig{}
err := mapstructure.Decode(configs, &cfg)
if err != nil {
return nil, err
}

err = validateTenantConfig(cfg)
if err != nil {
return nil, err
}

return &tenantStage{
cfg: cfg,
logger: logger,
}, nil
}

// Process implements Stage
func (s *tenantStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
var tenantID string

// Get tenant ID from source or configured value
if s.cfg.Source != "" {
tenantID = s.getTenantFromSourceField(extracted)
} else {
tenantID = s.cfg.Value
}

// Skip an empty tenant ID (ie. failed to get the tenant from the source)
if tenantID == "" {
return
}

labels[constants.ReservedLabelTenantID] = model.LabelValue(tenantID)
}

// Name implements Stage
func (s *tenantStage) Name() string {
return StageTypeTenant
}

func (s *tenantStage) getTenantFromSourceField(extracted map[string]interface{}) string {
// Get the tenant ID from the source data
value, ok := extracted[s.cfg.Source]
if !ok {
if Debug {
level.Debug(s.logger).Log("msg", "the tenant source does not exist in the extracted data", "source", s.cfg.Source)
}
return ""
}

// Convert the value to string
tenantID, err := getString(value)
if err != nil {
if Debug {
level.Debug(s.logger).Log("msg", "failed to convert value to string", "err", err, "type", reflect.TypeOf(value).String())
}
return ""
}

return tenantID
}
Loading

0 comments on commit 04f58c8

Please sign in to comment.