From 6e70704047e3e908e9063a3e7110887e45b6389f Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 26 Nov 2020 15:02:20 +0100 Subject: [PATCH] Collapse multiline logs based on a start line. Summary: This is a very simple approach based on #1380 to provide multiline or block log entries in promtail. A `multiline` stage is added to pipelines. This stages matches a start line. Once a start line is matched all following lines are appended to an entry and not passed on to downstream stages. Once a new start line is matched the former block of multilines is sent. If now new line arrives withing `max_wait_time` the block is flushed to the next stage and a new block is started. --- pkg/logentry/stages/multiline.go | 125 ++++++++++++++++++++++++++ pkg/logentry/stages/multiline_test.go | 47 ++++++++++ pkg/logentry/stages/stage.go | 6 ++ 3 files changed, 178 insertions(+) create mode 100644 pkg/logentry/stages/multiline.go create mode 100644 pkg/logentry/stages/multiline_test.go diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go new file mode 100644 index 000000000000..383856570389 --- /dev/null +++ b/pkg/logentry/stages/multiline.go @@ -0,0 +1,125 @@ +package stages + +import ( + "bytes" + "regexp" + "time" + + "github.com/go-kit/kit/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +const ( + ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" + ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" + ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" +) + +// MultilineConfig contains the configuration for a multilineStage +type MultilineConfig struct { + Expression *string `mapstructure:"firstline"` + MaxWaitTime *string `mapstructure:"max_wait_time"` + maxWait time.Duration + regex *regexp.Regexp +} + +func validateMultilineConfig(cfg *MultilineConfig) error { + if cfg == nil || + (cfg.Expression == nil) { + return errors.New(ErrMultilineStageEmptyConfig) + } + + expr, err := regexp.Compile(*cfg.Expression) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidRegex, err) + } + cfg.regex = expr + + maxWait, err := time.ParseDuration(*cfg.MaxWaitTime) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err) + } + cfg.maxWait = maxWait + + return nil +} + +// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed +type multilineStage struct { + logger log.Logger + cfg *MultilineConfig + buffer *bytes.Buffer + startLineEntry Entry +} + +// newMulitlineStage creates a MulitlineStage from config +func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) { + cfg := &MultilineConfig{} + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateMultilineConfig(cfg) + if err != nil { + return nil, err + } + + return &multilineStage{ + logger: log.With(logger, "component", "stage", "type", "multiline"), + cfg: cfg, + buffer: new(bytes.Buffer), + }, nil +} + +func (m *multilineStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for { + select { + case <- time.After(m.cfg.maxWait): + m.flush(out) + case e, ok := <- in: + if !ok { + return + } + + isFirstLine := m.cfg.regex.MatchString(e.Line) + if isFirstLine { + m.flush(out) + // TODO: we only consider the labels and timestamp from the firt entry. Should merge all entries? + m.startLineEntry = e + } + + // Append block line + if m.buffer.Len() > 0 { + m.buffer.WriteRune('\n') + } + m.buffer.WriteString(e.Line) + } + } + }() + return out +} + +func (m *multilineStage) flush(out chan Entry) { + if m.buffer.Len() == 0 { + return + } + + collapsed := &Entry{ + Labels: m.startLineEntry.Labels, + Extracted: m.startLineEntry.Extracted, + Timestamp: m.startLineEntry.Timestamp, + Line: m.buffer.String(), + } + m.buffer.Reset() + + out <- *collapsed +} + +// Name implements Stage +func (m *multilineStage) Name() string { + return StageTypeMultiline +} \ No newline at end of file diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go new file mode 100644 index 000000000000..02f92c11de97 --- /dev/null +++ b/pkg/logentry/stages/multiline_test.go @@ -0,0 +1,47 @@ +package stages + +import ( + "bytes" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" +) + +func Test_multilineStage_Process(t *testing.T) { + + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + mcfg := &MultilineConfig{Expression: ptrFromString("^START")} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + buffer: new(bytes.Buffer), + } + + out := processEntries(stage, simpleEntry("START line 1"), simpleEntry("not a start line"), simpleEntry("START line 2"), simpleEntry("START line 3")) + + require.Equal(t, "START line 1\nnot a start line", out[0].Line) + require.Equal(t, "START line 2", out[1].Line) + require.Equal(t, "START line 3", out[2].Line) +} + +func simpleEntry(line string) Entry { + return Entry{ + Labels: model.LabelSet{}, + Line: ptrFromString(line), + Extracted: map[string]interface{}{}, + Timestamp: ptrFromTime(time.Now()), + } + +} \ No newline at end of file diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 28eb917c2891..82c265074ebc 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -27,6 +27,7 @@ const ( StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" + StageTypeMultiline = "multiline" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -139,6 +140,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeMultiline: + s, err = newMultilineStage(logger, cfg) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) }