diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go new file mode 100644 index 000000000000..032af529ac9c --- /dev/null +++ b/pkg/logentry/stages/multiline.go @@ -0,0 +1,93 @@ +package stages + +import ( + "bytes" + "regexp" + "time" + + "github.com/go-kit/kit/log" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +const ( + ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" + ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" +) + +const MultilineDropReason = "multiline collapse" + +// MultilineConfig contains the configuration for a multilineStage +type MultilineConfig struct { + Expression *string `mapstructure:"firstline"` + regex *regexp.Regexp +} + +func validateMultilineConfig(cfg *MultilineConfig) error { + if cfg == nil || + (cfg.Expression == nil) { + return errors.New(ErrMultilineStageEmptyConfig) + } + expr, err := regexp.Compile(*cfg.Expression) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidRegex, err) + } + cfg.regex = expr + + return nil +} + +// newMulitlineStage creates a MulitlineStage from config +func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) { + cfg := &MultilineConfig{} + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateMultilineConfig(cfg) + if err != nil { + return nil, err + } + + return &multilineStage{ + logger: log.With(logger, "component", "stage", "type", "multiline"), + cfg: cfg, + buffer: new(bytes.Buffer), + }, nil +} + +// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed +type multilineStage struct { + logger log.Logger + cfg *MultilineConfig + buffer *bytes.Buffer +} + +// Process implements Stage +func (m *multilineStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { + isFirstLine := m.cfg.regex.MatchString(*entry) + + if isFirstLine { + previous := m.buffer.String() + + m.buffer.Reset() + m.buffer.WriteString(*entry) + + *entry = previous + } else { + // Append block line + if m.buffer.Len() > 0 { + m.buffer.WriteRune('\n') + } + m.buffer.WriteString(*entry) + + // Adds the drop label to not be sent by the api.EntryHandler + labels[dropLabel] = model.LabelValue(MultilineDropReason) + } +} + +// Name implements Stage +func (m *multilineStage) Name() string { + return StageTypeMultiline +} diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go new file mode 100644 index 000000000000..3c6f16dbb9b5 --- /dev/null +++ b/pkg/logentry/stages/multiline_test.go @@ -0,0 +1,43 @@ +package stages + +import ( + "bytes" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" +) + +func Test_multilineStage_Process(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + mcfg := &MultilineConfig{Expression: ptrFromString("^START")} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + buffer: new(bytes.Buffer), + } + + stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), ptrFromString("START line 1")) + stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), ptrFromString("not a start line")) + + nextStart := "START line 2" + stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), &nextStart) + + require.Equal(t, "START line 1\nnot a start line", nextStart) + + nextStart = "START line 3" + stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), &nextStart) + + require.Equal(t, "START line 2", nextStart) +} diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 144e006ee14f..8d299601d51f 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -25,6 +25,7 @@ const ( StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" + StageTypeMultiline = "multiline" ) // Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -118,6 +119,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeMultiline: + s, err = newMultilineStage(logger, cfg) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) }