From 6e70704047e3e908e9063a3e7110887e45b6389f Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 26 Nov 2020 15:02:20 +0100 Subject: [PATCH 01/15] Collapse multiline logs based on a start line. Summary: This is a very simple approach based on #1380 to provide multiline or block log entries in promtail. A `multiline` stage is added to pipelines. This stages matches a start line. Once a start line is matched all following lines are appended to an entry and not passed on to downstream stages. Once a new start line is matched the former block of multilines is sent. If now new line arrives withing `max_wait_time` the block is flushed to the next stage and a new block is started. --- pkg/logentry/stages/multiline.go | 125 ++++++++++++++++++++++++++ pkg/logentry/stages/multiline_test.go | 47 ++++++++++ pkg/logentry/stages/stage.go | 6 ++ 3 files changed, 178 insertions(+) create mode 100644 pkg/logentry/stages/multiline.go create mode 100644 pkg/logentry/stages/multiline_test.go diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go new file mode 100644 index 000000000000..383856570389 --- /dev/null +++ b/pkg/logentry/stages/multiline.go @@ -0,0 +1,125 @@ +package stages + +import ( + "bytes" + "regexp" + "time" + + "github.com/go-kit/kit/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +const ( + ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" + ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" + ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" +) + +// MultilineConfig contains the configuration for a multilineStage +type MultilineConfig struct { + Expression *string `mapstructure:"firstline"` + MaxWaitTime *string `mapstructure:"max_wait_time"` + maxWait time.Duration + regex *regexp.Regexp +} + +func validateMultilineConfig(cfg *MultilineConfig) error { + if cfg == nil || + (cfg.Expression == nil) { + return errors.New(ErrMultilineStageEmptyConfig) + } + + expr, err := regexp.Compile(*cfg.Expression) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidRegex, err) + } + cfg.regex = expr + + maxWait, err := time.ParseDuration(*cfg.MaxWaitTime) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err) + } + cfg.maxWait = maxWait + + return nil +} + +// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed +type multilineStage struct { + logger log.Logger + cfg *MultilineConfig + buffer *bytes.Buffer + startLineEntry Entry +} + +// newMulitlineStage creates a MulitlineStage from config +func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) { + cfg := &MultilineConfig{} + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateMultilineConfig(cfg) + if err != nil { + return nil, err + } + + return &multilineStage{ + logger: log.With(logger, "component", "stage", "type", "multiline"), + cfg: cfg, + buffer: new(bytes.Buffer), + }, nil +} + +func (m *multilineStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for { + select { + case <- time.After(m.cfg.maxWait): + m.flush(out) + case e, ok := <- in: + if !ok { + return + } + + isFirstLine := m.cfg.regex.MatchString(e.Line) + if isFirstLine { + m.flush(out) + // TODO: we only consider the labels and timestamp from the firt entry. Should merge all entries? + m.startLineEntry = e + } + + // Append block line + if m.buffer.Len() > 0 { + m.buffer.WriteRune('\n') + } + m.buffer.WriteString(e.Line) + } + } + }() + return out +} + +func (m *multilineStage) flush(out chan Entry) { + if m.buffer.Len() == 0 { + return + } + + collapsed := &Entry{ + Labels: m.startLineEntry.Labels, + Extracted: m.startLineEntry.Extracted, + Timestamp: m.startLineEntry.Timestamp, + Line: m.buffer.String(), + } + m.buffer.Reset() + + out <- *collapsed +} + +// Name implements Stage +func (m *multilineStage) Name() string { + return StageTypeMultiline +} \ No newline at end of file diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go new file mode 100644 index 000000000000..02f92c11de97 --- /dev/null +++ b/pkg/logentry/stages/multiline_test.go @@ -0,0 +1,47 @@ +package stages + +import ( + "bytes" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" +) + +func Test_multilineStage_Process(t *testing.T) { + + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + mcfg := &MultilineConfig{Expression: ptrFromString("^START")} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + buffer: new(bytes.Buffer), + } + + out := processEntries(stage, simpleEntry("START line 1"), simpleEntry("not a start line"), simpleEntry("START line 2"), simpleEntry("START line 3")) + + require.Equal(t, "START line 1\nnot a start line", out[0].Line) + require.Equal(t, "START line 2", out[1].Line) + require.Equal(t, "START line 3", out[2].Line) +} + +func simpleEntry(line string) Entry { + return Entry{ + Labels: model.LabelSet{}, + Line: ptrFromString(line), + Extracted: map[string]interface{}{}, + Timestamp: ptrFromTime(time.Now()), + } + +} \ No newline at end of file diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 28eb917c2891..82c265074ebc 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -27,6 +27,7 @@ const ( StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" + StageTypeMultiline = "multiline" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -139,6 +140,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeMultiline: + s, err = newMultilineStage(logger, cfg) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) } From 7f707623a04588f075bc175f363a3ad55b584582 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 26 Nov 2020 15:53:16 +0100 Subject: [PATCH 02/15] Test multiline stage process. --- pkg/logentry/stages/multiline.go | 21 +++++++-- pkg/logentry/stages/multiline_test.go | 68 +++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index 383856570389..d6e45b4fdb9a 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -2,10 +2,14 @@ package stages import ( "bytes" + "fmt" "regexp" "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" ) @@ -25,8 +29,7 @@ type MultilineConfig struct { } func validateMultilineConfig(cfg *MultilineConfig) error { - if cfg == nil || - (cfg.Expression == nil) { + if cfg == nil || cfg.Expression == nil || cfg.MaxWaitTime == nil { return errors.New(ErrMultilineStageEmptyConfig) } @@ -79,9 +82,12 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { for { select { case <- time.After(m.cfg.maxWait): + level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", m.buffer.String()) m.flush(out) case e, ok := <- in: if !ok { + level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", m.buffer.String()) + m.flush(out) return } @@ -105,14 +111,19 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { func (m *multilineStage) flush(out chan Entry) { if m.buffer.Len() == 0 { + level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", m.buffer.Len()) return } collapsed := &Entry{ - Labels: m.startLineEntry.Labels, Extracted: m.startLineEntry.Extracted, - Timestamp: m.startLineEntry.Timestamp, - Line: m.buffer.String(), + Entry: api.Entry{ + Labels: m.startLineEntry.Entry.Labels, + Entry: logproto.Entry{ + Timestamp: m.startLineEntry.Entry.Entry.Timestamp, + Line: m.buffer.String(), + }, + }, } m.buffer.Reset() diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go index 02f92c11de97..bcc3d2cb5d54 100644 --- a/pkg/logentry/stages/multiline_test.go +++ b/pkg/logentry/stages/multiline_test.go @@ -2,10 +2,13 @@ package stages import ( "bytes" + "sync" "testing" "time" "github.com/cortexproject/cortex/pkg/util" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ww "github.com/weaveworks/common/server" @@ -19,7 +22,7 @@ func Test_multilineStage_Process(t *testing.T) { util.InitLogger(cfg) Debug = true - mcfg := &MultilineConfig{Expression: ptrFromString("^START")} + mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")} err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -31,17 +34,74 @@ func Test_multilineStage_Process(t *testing.T) { out := processEntries(stage, simpleEntry("START line 1"), simpleEntry("not a start line"), simpleEntry("START line 2"), simpleEntry("START line 3")) + require.Len(t, out, 3) require.Equal(t, "START line 1\nnot a start line", out[0].Line) require.Equal(t, "START line 2", out[1].Line) require.Equal(t, "START line 3", out[2].Line) } +func Test_multilineStage_MaxWaitTime(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + maxWait := time.Duration(2 * time.Second) + mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString(maxWait.String())} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + buffer: new(bytes.Buffer), + } + + in := make(chan Entry, 2) + out := stage.Run(in) + + // Accumulate result + mu := new(sync.Mutex) + var res []Entry + go func() { + for e := range out { + mu.Lock() + t.Logf("appending %s", e.Line) + res = append(res, e) + mu.Unlock() + } + return + }() + + // Write input with a delay + go func() { + in <- simpleEntry("START line") + + // Trigger flush due to max wait timeout + time.Sleep(2 * maxWait) + + in <- simpleEntry("not a start line hitting timeout") + + // Signal pipeline we are done. + close(in) + return + }() + + require.Eventually(t, func() bool {mu.Lock(); defer mu.Unlock(); return len(res) == 2;}, time.Duration(3 * maxWait), time.Second) + require.Equal(t, "START line", res[0].Line) + require.Equal(t, "not a start line hitting timeout", res[1].Line) +} func simpleEntry(line string) Entry { return Entry{ - Labels: model.LabelSet{}, - Line: ptrFromString(line), Extracted: map[string]interface{}{}, - Timestamp: ptrFromTime(time.Now()), + Entry: api.Entry{ + Labels: model.LabelSet{}, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, } } \ No newline at end of file From 3db919d0255a242fad2e57a11ac7bacf9cca7624 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 26 Nov 2020 16:42:51 +0100 Subject: [PATCH 03/15] Format code. --- pkg/logentry/stages/multiline.go | 22 +++++++++++----------- pkg/logentry/stages/multiline_test.go | 24 ++++++++++++------------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index d6e45b4fdb9a..f7afc8ae005c 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -15,15 +15,15 @@ import ( ) const ( - ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" - ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" + ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" + ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" ) // MultilineConfig contains the configuration for a multilineStage type MultilineConfig struct { Expression *string `mapstructure:"firstline"` - MaxWaitTime *string `mapstructure:"max_wait_time"` + MaxWaitTime *string `mapstructure:"max_wait_time"` maxWait time.Duration regex *regexp.Regexp } @@ -50,9 +50,9 @@ func validateMultilineConfig(cfg *MultilineConfig) error { // dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed type multilineStage struct { - logger log.Logger - cfg *MultilineConfig - buffer *bytes.Buffer + logger log.Logger + cfg *MultilineConfig + buffer *bytes.Buffer startLineEntry Entry } @@ -81,14 +81,14 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { defer close(out) for { select { - case <- time.After(m.cfg.maxWait): + case <-time.After(m.cfg.maxWait): level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", m.buffer.String()) m.flush(out) - case e, ok := <- in: + case e, ok := <-in: if !ok { level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", m.buffer.String()) m.flush(out) - return + return } isFirstLine := m.cfg.regex.MatchString(e.Line) @@ -121,7 +121,7 @@ func (m *multilineStage) flush(out chan Entry) { Labels: m.startLineEntry.Entry.Labels, Entry: logproto.Entry{ Timestamp: m.startLineEntry.Entry.Entry.Timestamp, - Line: m.buffer.String(), + Line: m.buffer.String(), }, }, } @@ -133,4 +133,4 @@ func (m *multilineStage) flush(out chan Entry) { // Name implements Stage func (m *multilineStage) Name() string { return StageTypeMultiline -} \ No newline at end of file +} diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go index bcc3d2cb5d54..0d2bd26d4bb9 100644 --- a/pkg/logentry/stages/multiline_test.go +++ b/pkg/logentry/stages/multiline_test.go @@ -87,21 +87,21 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { return }() - require.Eventually(t, func() bool {mu.Lock(); defer mu.Unlock(); return len(res) == 2;}, time.Duration(3 * maxWait), time.Second) + require.Eventually(t, func() bool { mu.Lock(); defer mu.Unlock(); return len(res) == 2 }, time.Duration(3*maxWait), time.Second) require.Equal(t, "START line", res[0].Line) require.Equal(t, "not a start line hitting timeout", res[1].Line) } func simpleEntry(line string) Entry { return Entry{ - Extracted: map[string]interface{}{}, - Entry: api.Entry{ - Labels: model.LabelSet{}, - Entry: logproto.Entry{ - Timestamp: time.Now(), - Line: line, - }, - }, - } - -} \ No newline at end of file + Extracted: map[string]interface{}{}, + Entry: api.Entry{ + Labels: model.LabelSet{}, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, + } + +} From 851b91eb72534ef201cdfbac4e63f89aee5ddc31 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 26 Nov 2020 18:12:26 +0100 Subject: [PATCH 04/15] Flush multiline block after `max_lines`. --- pkg/logentry/stages/multiline.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index f7afc8ae005c..31b5547246ec 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -20,12 +20,15 @@ const ( ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" ) +const maxLineDefault uint64 = 128 + // MultilineConfig contains the configuration for a multilineStage type MultilineConfig struct { Expression *string `mapstructure:"firstline"` + regex *regexp.Regexp + MaxLines *uint64 `mapstructure:"max_lines"` MaxWaitTime *string `mapstructure:"max_wait_time"` maxWait time.Duration - regex *regexp.Regexp } func validateMultilineConfig(cfg *MultilineConfig) error { @@ -45,6 +48,11 @@ func validateMultilineConfig(cfg *MultilineConfig) error { } cfg.maxWait = maxWait + if cfg.MaxLines == nil { + cfg.MaxLines = new(uint64) + *cfg.MaxLines = maxLineDefault + } + return nil } @@ -54,6 +62,7 @@ type multilineStage struct { cfg *MultilineConfig buffer *bytes.Buffer startLineEntry Entry + currentLines uint64 } // newMulitlineStage creates a MulitlineStage from config @@ -103,6 +112,11 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { m.buffer.WriteRune('\n') } m.buffer.WriteString(e.Line) + m.currentLines++ + + if m.currentLines == *m.cfg.MaxLines { + m.flush(out) + } } } }() @@ -126,6 +140,7 @@ func (m *multilineStage) flush(out chan Entry) { }, } m.buffer.Reset() + m.currentLines = 0 out <- *collapsed } From 503a0d2ded4879920699060b2bc88c83830f8974 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Fri, 27 Nov 2020 09:50:34 +0100 Subject: [PATCH 05/15] Capture internal state of the stage. --- pkg/logentry/stages/multiline.go | 69 ++++++++++++++++----------- pkg/logentry/stages/multiline_test.go | 3 -- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index 31b5547246ec..ea4dc250a714 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -26,7 +26,7 @@ const maxLineDefault uint64 = 128 type MultilineConfig struct { Expression *string `mapstructure:"firstline"` regex *regexp.Regexp - MaxLines *uint64 `mapstructure:"max_lines"` + MaxLines *uint64 `mapstructure:"max_lines"` MaxWaitTime *string `mapstructure:"max_wait_time"` maxWait time.Duration } @@ -58,11 +58,15 @@ func validateMultilineConfig(cfg *MultilineConfig) error { // dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed type multilineStage struct { - logger log.Logger - cfg *MultilineConfig - buffer *bytes.Buffer - startLineEntry Entry - currentLines uint64 + logger log.Logger + cfg *MultilineConfig +} + +// multilineState captures the internal state of a running multiline stage. +type multilineState struct { + buffer *bytes.Buffer // The lines of the current multiline block. + startLineEntry Entry // The entry of the start line of a multiline block. + currentLines uint64 // The number of lines of the current multiline block. } // newMulitlineStage creates a MulitlineStage from config @@ -80,7 +84,6 @@ func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) { return &multilineStage{ logger: log.With(logger, "component", "stage", "type", "multiline"), cfg: cfg, - buffer: new(bytes.Buffer), }, nil } @@ -88,34 +91,42 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { out := make(chan Entry) go func() { defer close(out) + + state := &multilineState{ + buffer: new(bytes.Buffer), + currentLines: 0, + } + for { select { case <-time.After(m.cfg.maxWait): - level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", m.buffer.String()) - m.flush(out) + level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String()) + m.flush(out, state) case e, ok := <-in: if !ok { - level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", m.buffer.String()) - m.flush(out) + level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String()) + m.flush(out, state) return } isFirstLine := m.cfg.regex.MatchString(e.Line) if isFirstLine { - m.flush(out) - // TODO: we only consider the labels and timestamp from the firt entry. Should merge all entries? - m.startLineEntry = e + m.flush(out, state) + + // The start line entry is used to set timestamp and labels in the flush method. + // The timestamps for following lines are ignored for now. + state.startLineEntry = e } // Append block line - if m.buffer.Len() > 0 { - m.buffer.WriteRune('\n') + if state.buffer.Len() > 0 { + state.buffer.WriteRune('\n') } - m.buffer.WriteString(e.Line) - m.currentLines++ + state.buffer.WriteString(e.Line) + state.currentLines++ - if m.currentLines == *m.cfg.MaxLines { - m.flush(out) + if state.currentLines == *m.cfg.MaxLines { + m.flush(out, state) } } } @@ -123,24 +134,24 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { return out } -func (m *multilineStage) flush(out chan Entry) { - if m.buffer.Len() == 0 { - level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", m.buffer.Len()) +func (m *multilineStage) flush(out chan Entry, s *multilineState) { + if s.buffer.Len() == 0 { + level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) return } collapsed := &Entry{ - Extracted: m.startLineEntry.Extracted, + Extracted: s.startLineEntry.Extracted, Entry: api.Entry{ - Labels: m.startLineEntry.Entry.Labels, + Labels: s.startLineEntry.Entry.Labels, Entry: logproto.Entry{ - Timestamp: m.startLineEntry.Entry.Entry.Timestamp, - Line: m.buffer.String(), + Timestamp: s.startLineEntry.Entry.Entry.Timestamp, + Line: s.buffer.String(), }, }, } - m.buffer.Reset() - m.currentLines = 0 + s.buffer.Reset() + s.currentLines = 0 out <- *collapsed } diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go index 0d2bd26d4bb9..4ba9541ca1c5 100644 --- a/pkg/logentry/stages/multiline_test.go +++ b/pkg/logentry/stages/multiline_test.go @@ -1,7 +1,6 @@ package stages import ( - "bytes" "sync" "testing" "time" @@ -29,7 +28,6 @@ func Test_multilineStage_Process(t *testing.T) { stage := &multilineStage{ cfg: mcfg, logger: util.Logger, - buffer: new(bytes.Buffer), } out := processEntries(stage, simpleEntry("START line 1"), simpleEntry("not a start line"), simpleEntry("START line 2"), simpleEntry("START line 3")) @@ -54,7 +52,6 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { stage := &multilineStage{ cfg: mcfg, logger: util.Logger, - buffer: new(bytes.Buffer), } in := make(chan Entry, 2) From dc715cb3a56c16fa2a7bde75227e99b8a4406feb Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Fri, 27 Nov 2020 17:10:55 +0100 Subject: [PATCH 06/15] Process different multiline streams in parallel. --- pkg/logentry/stages/multiline.go | 103 +++++++++++++++++--------- pkg/logentry/stages/multiline_test.go | 60 +++++++++++++-- 2 files changed, 123 insertions(+), 40 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index ea4dc250a714..485678b40be8 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "regexp" + "sync" "time" "github.com/go-kit/kit/log" @@ -12,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/promtail/api" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" + "github.com/prometheus/common/model" ) const ( @@ -92,48 +94,79 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { go func() { defer close(out) - state := &multilineState{ - buffer: new(bytes.Buffer), - currentLines: 0, - } + streams := make(map[model.Fingerprint](chan Entry)) + wg := new(sync.WaitGroup) - for { - select { - case <-time.After(m.cfg.maxWait): - level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String()) - m.flush(out, state) - case e, ok := <-in: - if !ok { - level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String()) - m.flush(out, state) - return - } - - isFirstLine := m.cfg.regex.MatchString(e.Line) - if isFirstLine { - m.flush(out, state) - - // The start line entry is used to set timestamp and labels in the flush method. - // The timestamps for following lines are ignored for now. - state.startLineEntry = e - } - - // Append block line - if state.buffer.Len() > 0 { - state.buffer.WriteRune('\n') - } - state.buffer.WriteString(e.Line) - state.currentLines++ - - if state.currentLines == *m.cfg.MaxLines { - m.flush(out, state) - } + for e := range in { + key := e.Labels.FastFingerprint() + s, ok := streams[key] + if !ok { + level.Debug(m.logger).Log("msg", "creating new stream", "stream", key) + s = make(chan Entry) + streams[key] = s + + wg.Add(1) + go m.runMultiline(s, out, wg) } + level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line) + s <- e } + + // Close all streams and wait for them to finish being processed. + for _, s := range streams { + close(s) + } + wg.Wait() }() return out } +func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.WaitGroup) { + defer wg.Done() + + state := &multilineState{ + buffer: new(bytes.Buffer), + currentLines: 0, + } + + for { + select { + case <-time.After(m.cfg.maxWait): + level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String()) + m.flush(out, state) + case e, ok := <-in: + level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint()) + + if !ok { + level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + m.flush(out, state) + return + } + + isFirstLine := m.cfg.regex.MatchString(e.Line) + if isFirstLine { + level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + m.flush(out, state) + + // The start line entry is used to set timestamp and labels in the flush method. + // The timestamps for following lines are ignored for now. + state.startLineEntry = e + } + + // Append block line + if state.buffer.Len() > 0 { + state.buffer.WriteRune('\n') + } + state.buffer.WriteString(e.Line) + state.currentLines++ + + if state.currentLines == *m.cfg.MaxLines { + m.flush(out, state) + } + } + } +} + func (m *multilineStage) flush(out chan Entry, s *multilineState) { if s.buffer.Len() == 0 { level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go index 4ba9541ca1c5..767d26461396 100644 --- a/pkg/logentry/stages/multiline_test.go +++ b/pkg/logentry/stages/multiline_test.go @@ -1,6 +1,7 @@ package stages import ( + "sort" "sync" "testing" "time" @@ -30,13 +31,62 @@ func Test_multilineStage_Process(t *testing.T) { logger: util.Logger, } - out := processEntries(stage, simpleEntry("START line 1"), simpleEntry("not a start line"), simpleEntry("START line 2"), simpleEntry("START line 3")) + out := processEntries(stage, + simpleEntry("START line 1", "label"), + simpleEntry("not a start line", "label"), + simpleEntry("START line 2", "label"), + simpleEntry("START line 3", "label")) require.Len(t, out, 3) require.Equal(t, "START line 1\nnot a start line", out[0].Line) require.Equal(t, "START line 2", out[1].Line) require.Equal(t, "START line 3", out[2].Line) } +func Test_multilineStage_MultiStreams(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + } + + out := processEntries(stage, + simpleEntry("START line 1", "one"), + simpleEntry("not a start line 1", "one"), + simpleEntry("START line 1", "two"), + simpleEntry("not a start line 2", "one"), + simpleEntry("START line 2", "two"), + simpleEntry("START line 2", "one"), + simpleEntry("not a start line 1", "one"), + ) + + sort.Slice(out, func(l, r int) bool { + return out[l].Timestamp.Before(out[r].Timestamp) + }) + + require.Len(t, out, 4) + + require.Equal(t, "START line 1\nnot a start line 1\nnot a start line 2", out[0].Line) + require.Equal(t, model.LabelValue("one"), out[0].Labels["value"]) + + require.Equal(t, "START line 1", out[1].Line) + require.Equal(t, model.LabelValue("two"), out[1].Labels["value"]) + + require.Equal(t, "START line 2", out[2].Line) + require.Equal(t, model.LabelValue("two"), out[2].Labels["value"]) + + require.Equal(t, "START line 2\nnot a start line 1", out[3].Line) + require.Equal(t, model.LabelValue("one"), out[3].Labels["value"]) +} + func Test_multilineStage_MaxWaitTime(t *testing.T) { // Enable debug logging cfg := &ww.Config{} @@ -72,12 +122,12 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { // Write input with a delay go func() { - in <- simpleEntry("START line") + in <- simpleEntry("START line", "label") // Trigger flush due to max wait timeout time.Sleep(2 * maxWait) - in <- simpleEntry("not a start line hitting timeout") + in <- simpleEntry("not a start line hitting timeout", "label") // Signal pipeline we are done. close(in) @@ -89,11 +139,11 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { require.Equal(t, "not a start line hitting timeout", res[1].Line) } -func simpleEntry(line string) Entry { +func simpleEntry(line, label string) Entry { return Entry{ Extracted: map[string]interface{}{}, Entry: api.Entry{ - Labels: model.LabelSet{}, + Labels: model.LabelSet{"value": model.LabelValue(label)}, Entry: logproto.Entry{ Timestamp: time.Now(), Line: line, From 6ac637597b96c470361e41f917cad624babc8eaf Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 09:02:44 +0100 Subject: [PATCH 07/15] Start documenting multiline stage. --- docs/sources/clients/promtail/pipelines.md | 1 + .../clients/promtail/stages/multiline.md | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 docs/sources/clients/promtail/stages/multiline.md diff --git a/docs/sources/clients/promtail/pipelines.md b/docs/sources/clients/promtail/pipelines.md index ee1da3cf71da..765717ce897a 100644 --- a/docs/sources/clients/promtail/pipelines.md +++ b/docs/sources/clients/promtail/pipelines.md @@ -206,6 +206,7 @@ Parsing stages: Transform stages: + - [multiline](../stages/multiline/): Merges multiple lines, e.g. stack traces, into multiline blocks. - [template](../stages/template/): Use Go templates to modify extracted data. Action stages: diff --git a/docs/sources/clients/promtail/stages/multiline.md b/docs/sources/clients/promtail/stages/multiline.md new file mode 100644 index 000000000000..e0193ef182bc --- /dev/null +++ b/docs/sources/clients/promtail/stages/multiline.md @@ -0,0 +1,21 @@ +--- +title: multiline +--- + +# `multiline` stage + +... + +## Schema + +```yaml +multiline: + # TODO: document + firstline: + + # TODO: document + max_lines: + + # TODO: document + max_wait_time: +``` \ No newline at end of file From 48b09d6026490ace21510bba27bb65e76fafe2db Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 11:48:50 +0100 Subject: [PATCH 08/15] Give an example configuration for `multiline` stage. --- .../clients/promtail/stages/multiline.md | 55 +++++++++++++++++-- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/docs/sources/clients/promtail/stages/multiline.md b/docs/sources/clients/promtail/stages/multiline.md index e0193ef182bc..93b924f2ae89 100644 --- a/docs/sources/clients/promtail/stages/multiline.md +++ b/docs/sources/clients/promtail/stages/multiline.md @@ -4,18 +4,63 @@ title: multiline # `multiline` stage -... +The `multiline` stage multiple lines into a multiline block before passing it on to the next stage in the pipeline. + +A new block is identified by the `firstline` regular expression. Any line that does *not* match the expression is considered to be part of the block of the previous match. ## Schema ```yaml multiline: - # TODO: document + # RE2 regular expression, if matched will start a new multiline block. + # This expresion must be provided. firstline: - # TODO: document + # The maximum wait time will be parsed as a Go duration: https://golang.org/pkg/time/#ParseDuration. + # If now new logs arrive withing this maximum wait time the current block will be sent on. + # This is useful if the opserved application dies with e.g. an exception. No new logs will arrive and the exception + # block is sent *after* the maximum wait time expired. + max_wait_time: + + # Maximum number of lines a block can have. If block has more lines a new block is started. + # The default is 128 lines. max_lines: +``` - # TODO: document - max_wait_time: +## Examples + +Let's say we have the following logs from a very simple [flask](https://flask.palletsprojects.com) service. + +``` +[2020-12-03 11:36:20] "GET /hello HTTP/1.1" 200 - +[2020-12-03 11:36:23] ERROR in app: Exception on /error [GET] +Traceback (most recent call last): + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app + response = self.full_dispatch_request() + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request + rv = self.handle_user_exception(e) + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception + reraise(exc_type, exc_value, tb) + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise + raise value + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request + rv = self.dispatch_request() + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request + return self.view_functions[rule.endpoint](**req.view_args) + File "/home/pallets/src/deployment_tools/hello.py", line 10, in error + raise Exception("Sorry, this route always breaks") +Exception: Sorry, this route always breaks +[2020-12-03 11:36:23] "GET /error HTTP/1.1" 500 - +[2020-12-03 11:36:26] "GET /hello HTTP/1.1" 200 - +[2020-12-03 11:36:27] "GET /hello HTTP/1.1" 200 - +``` + +We would like to collapse all lines of the traceback into one multiline block. All blocks start with a timestamp in brackets. Thus we configure a `multiline` stage with the `firstline` regular expression `^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]`. This will match the start of the traceback but not the following lines until `Exception: Sorry, this route always breaks`. These will be part of a multiline block and one log entry in Loki. + +```yaml +multiline: + # Identify timestamps as first line of a multiline block. + firstline: "^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]" + + max_wait_time: 3s ``` \ No newline at end of file From 79c8c2758cc52abb95a6d8140953f404b3f1c5a2 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 12:48:48 +0100 Subject: [PATCH 09/15] Make linter happy. --- pkg/logentry/stages/multiline.go | 5 +++-- pkg/logentry/stages/multiline_test.go | 11 +++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index 485678b40be8..c7796dd01531 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -9,11 +9,12 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/promtail/api" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" ) const ( diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go index 767d26461396..1c4d1d1a3b6d 100644 --- a/pkg/logentry/stages/multiline_test.go +++ b/pkg/logentry/stages/multiline_test.go @@ -7,11 +7,12 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/promtail/api" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ww "github.com/weaveworks/common/server" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" ) func Test_multilineStage_Process(t *testing.T) { @@ -94,7 +95,7 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { util.InitLogger(cfg) Debug = true - maxWait := time.Duration(2 * time.Second) + maxWait := 2 * time.Second mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString(maxWait.String())} err := validateMultilineConfig(mcfg) require.NoError(t, err) @@ -117,7 +118,6 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { res = append(res, e) mu.Unlock() } - return }() // Write input with a delay @@ -131,10 +131,9 @@ func Test_multilineStage_MaxWaitTime(t *testing.T) { // Signal pipeline we are done. close(in) - return }() - require.Eventually(t, func() bool { mu.Lock(); defer mu.Unlock(); return len(res) == 2 }, time.Duration(3*maxWait), time.Second) + require.Eventually(t, func() bool { mu.Lock(); defer mu.Unlock(); return len(res) == 2 }, 3*maxWait, time.Second) require.Equal(t, "START line", res[0].Line) require.Equal(t, "not a start line hitting timeout", res[1].Line) } From f9ed1339d4059bceded4a2642d391e63c1c07dd6 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 14:58:56 +0100 Subject: [PATCH 10/15] Pass through entries until first start line. --- pkg/logentry/stages/multiline.go | 7 +++++++ pkg/logentry/stages/multiline_test.go | 12 ++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index c7796dd01531..ee88ac8b0353 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -102,6 +102,13 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { key := e.Labels.FastFingerprint() s, ok := streams[key] if !ok { + // Pass through entries until we hit first start line. + if !m.cfg.regex.MatchString(e.Line) { + level.Debug(m.logger).Log("msg", "pass through entry", "stream", key) + out <- e + continue + } + level.Debug(m.logger).Log("msg", "creating new stream", "stream", key) s = make(chan Entry) streams[key] = s diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go index 1c4d1d1a3b6d..9b58bca094af 100644 --- a/pkg/logentry/stages/multiline_test.go +++ b/pkg/logentry/stages/multiline_test.go @@ -33,15 +33,19 @@ func Test_multilineStage_Process(t *testing.T) { } out := processEntries(stage, + simpleEntry("not a start line before 1", "label"), + simpleEntry("not a start line before 2", "label"), simpleEntry("START line 1", "label"), simpleEntry("not a start line", "label"), simpleEntry("START line 2", "label"), simpleEntry("START line 3", "label")) - require.Len(t, out, 3) - require.Equal(t, "START line 1\nnot a start line", out[0].Line) - require.Equal(t, "START line 2", out[1].Line) - require.Equal(t, "START line 3", out[2].Line) + require.Len(t, out, 5) + require.Equal(t, "not a start line before 1", out[0].Line) + require.Equal(t, "not a start line before 2", out[1].Line) + require.Equal(t, "START line 1\nnot a start line", out[2].Line) + require.Equal(t, "START line 2", out[3].Line) + require.Equal(t, "START line 3", out[4].Line) } func Test_multilineStage_MultiStreams(t *testing.T) { // Enable debug logging From ca7ad81413d5667508d527a9307c1188672b9431 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 15:51:27 +0100 Subject: [PATCH 11/15] Update pkg/logentry/stages/multiline.go Co-authored-by: Cyril Tovena --- pkg/logentry/stages/multiline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index ee88ac8b0353..b037c66db1d8 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -175,7 +175,7 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa } } -func (m *multilineStage) flush(out chan Entry, s *multilineState) { +flush(out chan Entry, s *multilineState) { if s.buffer.Len() == 0 { level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) return From c07875ececaee466d255bd5b6626fa0afa7803d7 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 15:51:46 +0100 Subject: [PATCH 12/15] Default maxz wait time to 3 seconds. --- .../clients/promtail/stages/multiline.md | 1 + pkg/logentry/stages/multiline.go | 23 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/docs/sources/clients/promtail/stages/multiline.md b/docs/sources/clients/promtail/stages/multiline.md index 93b924f2ae89..9755cb1f0edf 100644 --- a/docs/sources/clients/promtail/stages/multiline.md +++ b/docs/sources/clients/promtail/stages/multiline.md @@ -20,6 +20,7 @@ multiline: # If now new logs arrive withing this maximum wait time the current block will be sent on. # This is useful if the opserved application dies with e.g. an exception. No new logs will arrive and the exception # block is sent *after* the maximum wait time expired. + # It defaults to 3s. max_wait_time: # Maximum number of lines a block can have. If block has more lines a new block is started. diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index b037c66db1d8..c156b2290717 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -23,7 +23,10 @@ const ( ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" ) -const maxLineDefault uint64 = 128 +const ( + maxLineDefault uint64 = 128 + maxWaitDefault = 3 * time.Second +) // MultilineConfig contains the configuration for a multilineStage type MultilineConfig struct { @@ -35,7 +38,7 @@ type MultilineConfig struct { } func validateMultilineConfig(cfg *MultilineConfig) error { - if cfg == nil || cfg.Expression == nil || cfg.MaxWaitTime == nil { + if cfg == nil || cfg.Expression == nil { return errors.New(ErrMultilineStageEmptyConfig) } @@ -45,11 +48,15 @@ func validateMultilineConfig(cfg *MultilineConfig) error { } cfg.regex = expr - maxWait, err := time.ParseDuration(*cfg.MaxWaitTime) - if err != nil { - return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err) + if cfg.MaxWaitTime != nil { + maxWait, err := time.ParseDuration(*cfg.MaxWaitTime) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err) + } + cfg.maxWait = maxWait + } else { + cfg.maxWait = maxWaitDefault } - cfg.maxWait = maxWait if cfg.MaxLines == nil { cfg.MaxLines = new(uint64) @@ -181,7 +188,7 @@ flush(out chan Entry, s *multilineState) { return } - collapsed := &Entry{ + collapsed := Entry{ Extracted: s.startLineEntry.Extracted, Entry: api.Entry{ Labels: s.startLineEntry.Entry.Labels, @@ -194,7 +201,7 @@ flush(out chan Entry, s *multilineState) { s.buffer.Reset() s.currentLines = 0 - out <- *collapsed + out <- collapsed } // Name implements Stage From 2c4dc8ed25dc3eff7247fe62ea744be961801ca5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 3 Dec 2020 09:57:50 -0500 Subject: [PATCH 13/15] Update pkg/logentry/stages/multiline.go --- pkg/logentry/stages/multiline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index c156b2290717..2c23d258e7c9 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -182,7 +182,7 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa } } -flush(out chan Entry, s *multilineState) { +func flush(out chan Entry, s *multilineState) { if s.buffer.Len() == 0 { level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) return From bb1cf59128c115bfe0c0a728da7b7bb2ebed7e54 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 16:11:57 +0100 Subject: [PATCH 14/15] Wrap all debug logs. --- pkg/logentry/stages/multiline.go | 34 +++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index 2c23d258e7c9..3467a39f857c 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -111,19 +111,25 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { if !ok { // Pass through entries until we hit first start line. if !m.cfg.regex.MatchString(e.Line) { - level.Debug(m.logger).Log("msg", "pass through entry", "stream", key) + if Debug { + level.Debug(m.logger).Log("msg", "pass through entry", "stream", key) + } out <- e continue } - level.Debug(m.logger).Log("msg", "creating new stream", "stream", key) + if Debug { + level.Debug(m.logger).Log("msg", "creating new stream", "stream", key) + } s = make(chan Entry) streams[key] = s wg.Add(1) go m.runMultiline(s, out, wg) } - level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line) + if Debug { + level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line) + } s <- e } @@ -147,20 +153,28 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa for { select { case <-time.After(m.cfg.maxWait): - level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String()) + if Debug { + level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String()) + } m.flush(out, state) case e, ok := <-in: - level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint()) + if Debug { + level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint()) + } if !ok { - level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + if Debug { + level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + } m.flush(out, state) return } isFirstLine := m.cfg.regex.MatchString(e.Line) if isFirstLine { - level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + if Debug { + level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + } m.flush(out, state) // The start line entry is used to set timestamp and labels in the flush method. @@ -182,9 +196,11 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa } } -func flush(out chan Entry, s *multilineState) { +func(m *multilineStage) flush(out chan Entry, s *multilineState) { if s.buffer.Len() == 0 { - level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) + if Debug { + level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) + } return } From 138371ba1c8897ed96884ea7156e19b28755a4a7 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 3 Dec 2020 16:17:09 +0100 Subject: [PATCH 15/15] Format code. --- pkg/logentry/stages/multiline.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go index 3467a39f857c..731d3ae31c9e 100644 --- a/pkg/logentry/stages/multiline.go +++ b/pkg/logentry/stages/multiline.go @@ -25,7 +25,7 @@ const ( const ( maxLineDefault uint64 = 128 - maxWaitDefault = 3 * time.Second + maxWaitDefault = 3 * time.Second ) // MultilineConfig contains the configuration for a multilineStage @@ -196,7 +196,7 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa } } -func(m *multilineStage) flush(out chan Entry, s *multilineState) { +func (m *multilineStage) flush(out chan Entry, s *multilineState) { if s.buffer.Len() == 0 { if Debug { level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len())