From 4784ea2388e586bada370bd26e1c4558b3fe7ecc Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Tue, 28 Sep 2021 14:56:31 +0530 Subject: [PATCH] add logfmt promtail stage to be able to extract data from logfmt formatted log (#4346) --- clients/pkg/logentry/stages/logfmt.go | 154 +++++++++++ clients/pkg/logentry/stages/logfmt_test.go | 293 +++++++++++++++++++++ clients/pkg/logentry/stages/stage.go | 6 + 3 files changed, 453 insertions(+) create mode 100644 clients/pkg/logentry/stages/logfmt.go create mode 100644 clients/pkg/logentry/stages/logfmt_test.go diff --git a/clients/pkg/logentry/stages/logfmt.go b/clients/pkg/logentry/stages/logfmt.go new file mode 100644 index 000000000000..fd5066391acb --- /dev/null +++ b/clients/pkg/logentry/stages/logfmt.go @@ -0,0 +1,154 @@ +package stages + +import ( + "fmt" + "reflect" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/go-logfmt/logfmt" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +// Config Errors +const ( + ErrMappingRequired = "logfmt mapping is required" + ErrEmptyLogfmtStageConfig = "empty logfmt stage configuration" + ErrEmptyLogfmtStageSource = "empty source" +) + +// LogfmtConfig represents a logfmt Stage configuration +type LogfmtConfig struct { + Mapping map[string]string `mapstructure:"mapping"` + Source *string `mapstructure:"source"` +} + +// validateLogfmtConfig validates a logfmt stage config and returns an inverse mapping of configured mapping. +// Mapping inverse is done to make lookup easier. The key would be the key from parsed logfmt and +// value would be the key with which the data in extracted map would be set. +func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) { + if c == nil { + return nil, errors.New(ErrEmptyLogfmtStageConfig) + } + + if len(c.Mapping) == 0 { + return nil, errors.New(ErrMappingRequired) + } + + if c.Source != nil && *c.Source == "" { + return nil, errors.New(ErrEmptyLogfmtStageSource) + } + + inverseMapping := make(map[string]string) + for k, v := range c.Mapping { + // if value is not set, use the key for setting data in extracted map. + if v == "" { + v = k + } + inverseMapping[v] = k + } + + return inverseMapping, nil +} + +// logfmtStage sets extracted data using logfmt parser +type logfmtStage struct { + cfg *LogfmtConfig + inverseMapping map[string]string + logger log.Logger +} + +// newLogfmtStage creates a new logfmt pipeline stage from a config. +func newLogfmtStage(logger log.Logger, config interface{}) (Stage, error) { + cfg, err := parseLogfmtConfig(config) + if err != nil { + return nil, err + } + + // inverseMapping would hold the mapping in inverse which would make lookup easier. + // To explain it simply, the key would be the key from parsed logfmt and value would be the key with which the data in extracted map would be set. + inverseMapping, err := validateLogfmtConfig(cfg) + if err != nil { + return nil, err + } + + return toStage(&logfmtStage{ + cfg: cfg, + inverseMapping: inverseMapping, + logger: log.With(logger, "component", "stage", "type", "logfmt"), + }), nil +} + +func parseLogfmtConfig(config interface{}) (*LogfmtConfig, error) { + cfg := &LogfmtConfig{} + err := mapstructure.Decode(config, cfg) + if err != nil { + return nil, err + } + return cfg, nil +} + +// Process implements Stage +func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { + // If a source key is provided, the logfmt stage should process it + // from the extracted map, otherwise should fallback to the entry + input := entry + + if j.cfg.Source != nil { + if _, ok := extracted[*j.cfg.Source]; !ok { + if Debug { + level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", *j.cfg.Source) + } + return + } + + value, err := getString(extracted[*j.cfg.Source]) + if err != nil { + if Debug { + level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", *j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*j.cfg.Source])) + } + return + } + + input = &value + } + + if input == nil { + if Debug { + level.Debug(j.logger).Log("msg", "cannot parse a nil entry") + } + return + } + decoder := logfmt.NewDecoder(strings.NewReader(*input)) + extractedEntriesCount := 0 + for decoder.ScanRecord() { + for decoder.ScanKeyval() { + mapKey, ok := j.inverseMapping[string(decoder.Key())] + if ok { + extracted[mapKey] = string(decoder.Value()) + extractedEntriesCount++ + } + } + } + + if decoder.Err() != nil { + level.Error(j.logger).Log("msg", "failed to decode logfmt", "err", decoder.Err()) + return + } + + if Debug { + if extractedEntriesCount != len(j.inverseMapping) { + level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", extractedEntriesCount, len(j.inverseMapping))) + } + level.Debug(j.logger).Log("msg", "extracted data debug in logfmt stage", "extracted data", fmt.Sprintf("%v", extracted)) + } +} + +// Name implements Stage +func (j *logfmtStage) Name() string { + return StageTypeLogfmt +} diff --git a/clients/pkg/logentry/stages/logfmt_test.go b/clients/pkg/logentry/stages/logfmt_test.go new file mode 100644 index 000000000000..a4b2edf78b09 --- /dev/null +++ b/clients/pkg/logentry/stages/logfmt_test.go @@ -0,0 +1,293 @@ +package stages + +import ( + "testing" + "time" + + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +var testLogfmtYamlSingleStageWithoutSource = ` +pipeline_stages: +- logfmt: + mapping: + out: message + app: + duration: + unknown: +` + +var testLogfmtYamlMultiStageWithSource = ` +pipeline_stages: +- logfmt: + mapping: + extra: +- logfmt: + mapping: + user: + source: extra +` + +func TestPipeline_Logfmt(t *testing.T) { + var testLogfmtLogLine = ` + time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="this is a log line" extra="user=foo"" + ` + t.Parallel() + + tests := map[string]struct { + config string + entry string + expectedExtract map[string]interface{} + }{ + "successfully run a pipeline with 1 logfmt stage without source": { + testLogfmtYamlSingleStageWithoutSource, + testLogfmtLogLine, + map[string]interface{}{ + "out": "this is a log line", + "app": "loki", + "duration": "125", + }, + }, + "successfully run a pipeline with 2 logfmt stages with source": { + testLogfmtYamlMultiStageWithSource, + testLogfmtLogLine, + map[string]interface{}{ + "extra": "user=foo", + "user": "foo", + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + assert.NoError(t, err) + out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] + assert.Equal(t, testData.expectedExtract, out.Extracted) + }) + } +} + +var testLogfmtCfg = `logfmt: + mapping: + foo1: bar1 + foo2:` + +// nolint +func TestLogfmtYamlMapStructure(t *testing.T) { + t.Parallel() + + // testing that we can use yaml data into mapstructure. + var mapstruct map[interface{}]interface{} + assert.NoError(t, yaml.Unmarshal([]byte(testLogfmtCfg), &mapstruct)) + p, ok := mapstruct["logfmt"].(map[interface{}]interface{}) + assert.True(t, ok) + got, err := parseLogfmtConfig(p) + assert.NoError(t, err) + want := &LogfmtConfig{ + Mapping: map[string]string{ + "foo1": "bar1", + "foo2": "", + }, + } + assert.EqualValues(t, want, got) +} + +func TestLogfmtConfig_validate(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + config interface{} + wantMappingCount int + err error + }{ + "empty config": { + nil, + 0, + errors.New(ErrMappingRequired), + }, + "no mapping": { + map[string]interface{}{}, + 0, + errors.New(ErrMappingRequired), + }, + "empty source": { + map[string]interface{}{ + "mapping": map[string]string{ + "extr1": "expr", + }, + "source": "", + }, + 0, + errors.New(ErrEmptyLogfmtStageSource), + }, + "valid without source": { + map[string]interface{}{ + "mapping": map[string]string{ + "foo1": "foo", + "foo2": "", + }, + }, + 2, + nil, + }, + "valid with source": { + map[string]interface{}{ + "mapping": map[string]string{ + "foo1": "foo", + "foo2": "", + }, + "source": "log", + }, + 2, + nil, + }, + } + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + c, err := parseLogfmtConfig(tt.config) + assert.NoError(t, err) + got, err := validateLogfmtConfig(c) + if tt.err != nil { + assert.EqualError(t, err, tt.err.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantMappingCount, len(got)) + }) + } +} + +var testLogfmtLogFixture = ` + time=2012-11-01T22:08:41+00:00 + app=loki + level=WARN + nested="child=value" + message="this is a log line" +` + +func TestLogfmtParser_Parse(t *testing.T) { + t.Parallel() + tests := map[string]struct { + config interface{} + extracted map[string]interface{} + entry string + expectedExtract map[string]interface{} + }{ + "successfully decode logfmt on entry": { + map[string]interface{}{ + "mapping": map[string]string{ + "time": "", + "app": "", + "level": "", + "nested": "", + "message": "", + }, + }, + map[string]interface{}{}, + testLogfmtLogFixture, + map[string]interface{}{ + "time": "2012-11-01T22:08:41+00:00", + "app": "loki", + "level": "WARN", + "nested": "child=value", + "message": "this is a log line", + }, + }, + "successfully decode logfmt on extracted[source]": { + map[string]interface{}{ + "mapping": map[string]string{ + "time": "", + "app": "", + "level": "", + "nested": "", + "message": "", + }, + "source": "log", + }, + map[string]interface{}{ + "log": testLogfmtLogFixture, + }, + "{}", + map[string]interface{}{ + "time": "2012-11-01T22:08:41+00:00", + "app": "loki", + "level": "WARN", + "nested": "child=value", + "message": "this is a log line", + "log": testLogfmtLogFixture, + }, + }, + "missing extracted[source]": { + map[string]interface{}{ + "mapping": map[string]string{ + "app": "", + }, + "source": "log", + }, + map[string]interface{}{}, + testLogfmtLogFixture, + map[string]interface{}{}, + }, + "invalid logfmt on entry": { + map[string]interface{}{ + "mapping": map[string]string{ + "expr1": "", + }, + }, + map[string]interface{}{}, + "{\"invalid\":\"logfmt\"}", + map[string]interface{}{}, + }, + "invalid logfmt on extracted[source]": { + map[string]interface{}{ + "mapping": map[string]string{ + "app": "", + }, + "source": "log", + }, + map[string]interface{}{ + "log": "not logfmt", + }, + testLogfmtLogFixture, + map[string]interface{}{ + "log": "not logfmt", + }, + }, + "nil source": { + map[string]interface{}{ + "mapping": map[string]string{ + "app": "", + }, + "source": "log", + }, + map[string]interface{}{ + "log": nil, + }, + testLogfmtLogFixture, + map[string]interface{}{ + "log": nil, + }, + }, + } + for tName, tt := range tests { + tt := tt + t.Run(tName, func(t *testing.T) { + t.Parallel() + p, err := New(util_log.Logger, nil, StageTypeLogfmt, tt.config, nil) + assert.NoError(t, err) + out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0] + + assert.Equal(t, tt.expectedExtract, out.Extracted) + }) + } +} diff --git a/clients/pkg/logentry/stages/stage.go b/clients/pkg/logentry/stages/stage.go index 39b081613063..3c025bb43368 100644 --- a/clients/pkg/logentry/stages/stage.go +++ b/clients/pkg/logentry/stages/stage.go @@ -16,6 +16,7 @@ import ( const ( StageTypeJSON = "json" + StageTypeLogfmt = "logfmt" StageTypeRegex = "regex" StageTypeReplace = "replace" StageTypeMetric = "metrics" @@ -121,6 +122,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeLogfmt: + s, err = newLogfmtStage(logger, cfg) + if err != nil { + return nil, err + } case StageTypeRegex: s, err = newRegexStage(logger, cfg) if err != nil {