diff --git a/pkg/promtail/api/constants.go b/pkg/promtail/api/constants.go new file mode 100644 index 000000000000..cc1e29e73024 --- /dev/null +++ b/pkg/promtail/api/constants.go @@ -0,0 +1,5 @@ +package api + +const ( + FilenameLabel = "__filename__" +) diff --git a/pkg/promtail/concat/concat.go b/pkg/promtail/concat/concat.go new file mode 100644 index 000000000000..3e697dd2c1b8 --- /dev/null +++ b/pkg/promtail/concat/concat.go @@ -0,0 +1,132 @@ +package concat + +import ( + "fmt" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/prometheus/common/model" + "regexp" + "sync" + "time" +) + +type Config struct { + // Empty string results in disabling + MultilineStartRegexpString string `yaml:"multiline_start_regexp` + // 0 for wait forever, else duration in seconds + Timeout time.Duration `yaml:"flush_timeout"` +} + +type logEntry struct { + labels model.LabelSet + time time.Time + line string + mutex sync.Mutex +} + +type Concat struct { + next api.EntryHandler + logger log.Logger + multilineStartRegexp *regexp.Regexp + cfg Config + bufferMutex sync.Mutex + // Map of filename to log entry + buffer map[string]*logEntry + quit chan struct{} + wg sync.WaitGroup +} + +func New(logger log.Logger, next api.EntryHandler, cfg Config) (*Concat, error) { + multilineStartRegexp, err := regexp.Compile(cfg.MultilineStartRegexpString) + if err != nil { + return nil, err + } + c := &Concat { + next: next, + logger: logger, + multilineStartRegexp: multilineStartRegexp, + cfg: cfg, + bufferMutex: sync.Mutex{}, + buffer: map[string]*logEntry{}, + quit: make(chan struct{}), + } + c.wg.Add(1) + return c, nil +} + +// Handle implement EntryHandler +func (c *Concat) Handle(labels model.LabelSet, t time.Time, line string) error { + // Disabled concat if regex is empty + if c.cfg.MultilineStartRegexpString == "" { + return c.next.Handle(labels, t, line) + } + + filenameLabel, ok := labels[api.FilenameLabel] + if !ok { + return fmt.Errorf("unable to find %s label in message", api.FilenameLabel) + } + filename := string(filenameLabel) + + c.bufferMutex.Lock() + existingMessage, messageExists := c.buffer[filename] + shouldFlush := c.multilineStartRegexp.MatchString(line) + + if !shouldFlush && !messageExists { + level.Warn(c.logger).Log("msg", "encountered non-multiline-starting line in empty buffer, indicates tailer started in middle of multiline entry", "file", filename) + } + + // If the buffer exists and we shouldn't flush, we should append to the existing buffer + if messageExists && !shouldFlush { + c.buffer[filename].line += "\n" + line + } else { + c.buffer[filename] = &logEntry { + labels: labels, + time: t, + line: line, + } + } + // At this point, we're okay to unlock the mutex. + c.bufferMutex.Unlock() + + // Flush the old entry if necessary + if shouldFlush && messageExists { + return c.next.Handle(existingMessage.labels, existingMessage.time, existingMessage.line) + } + + return nil +} + +func (c *Concat) flushLoop(forceFlush bool) { + c.bufferMutex.Lock() + for filename, logEntry := range c.buffer { + // Check whether the log entry should be force-flushed + if forceFlush || time.Now().Sub(logEntry.time) > c.cfg.Timeout { + c.next.Handle(logEntry.labels, logEntry.time, logEntry.line) + } + delete(c.buffer, filename) + } + c.bufferMutex.Unlock() +} + +func (c *Concat) Run() { + defer func() { + c.flushLoop(true) + c.wg.Done() + }() + // Set up timer loop to poll buffer every second + timer := time.NewTicker(1*time.Second) + for { + select { + case <-timer.C: + c.flushLoop(false) + case <-c.quit: + return + } + } +} + +func (c *Concat) Stop() { + close(c.quit) + c.wg.Wait() +} diff --git a/pkg/promtail/concat/concat_test.go b/pkg/promtail/concat/concat_test.go new file mode 100644 index 000000000000..7b583ccb9374 --- /dev/null +++ b/pkg/promtail/concat/concat_test.go @@ -0,0 +1,192 @@ +package concat + +import ( + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "os" + "sync" + "testing" + "time" +) + +type testCase struct { + name string + config Config + input map[string][]string + output map[string][]string + shortWait bool +} + +var disabled = Config { + MultilineStartRegexpString: "", +} + +var enabled = Config { + MultilineStartRegexpString: "\\d{2}:\\d{2}:\\d{2}", + Timeout: time.Duration(1 * time.Second), +} + +var testCases = []testCase { + { + name: "disabled concat", + config: disabled, + input: map[string][]string { + "path1": { + "path1 line 1", + "path1 line 2", + }, + "path2": { + "path2 line 1", + "path2 line 2", + }, + }, + output: map[string][]string { + "path1": { + "path1 line 1", + "path1 line 2", + }, + "path2": { + "path2 line 1", + "path2 line 2", + }, + }, + }, { + name: "base concat case", + config: enabled, + input: map[string][]string { + "path1": { + "00:00:00 path1 line 1", + "path1 subline 1", + "00:00:00 path1 line 2", + }, + "path2": { + "00:00:00 path2 line 1", + "00:00:00 path2 line 2", + }, + }, + output: map[string][]string { + "path1": { + "00:00:00 path1 line 1\npath1 subline 1", + "00:00:00 path1 line 2", + }, + "path2": { + "00:00:00 path2 line 1", + "00:00:00 path2 line 2", + }, + }, + shortWait: false, + }, { + name: "base concat case", + config: enabled, + input: map[string][]string { + "path1": { + "00:00:00 path1 line 1", + "path1 subline 1", + "00:00:00 path1 line 2", + }, + "path2": { + "00:00:00 path2 line 1", + "00:00:00 path2 line 2", + }, + }, + output: map[string][]string { + "path1": { + "00:00:00 path1 line 1\npath1 subline 1", + "00:00:00 path1 line 2", + }, + "path2": { + "00:00:00 path2 line 1", + "00:00:00 path2 line 2", + }, + }, + shortWait: false, + }, { + name: "test that message buffers if no timeout", + config: enabled, + input: map[string][]string { + "path1": { + "00:00:00 path1 line 1", + "path1 subline 1", + }, + "path2": { + "00:00:00 path2 line 1", + "00:00:00 path2 line 2", + }, + }, + output: map[string][]string { + // Everything is buffered in path1, and path2 gets the first message through + "path2": { + "00:00:00 path2 line 1", + }, + }, + shortWait: true, + }, +} + +type testHandler struct { + receivedMap map[string][]string + mutex sync.Mutex +} + +func (h *testHandler) Handle(labels model.LabelSet, time time.Time, entry string) error { + filename := string(labels[api.FilenameLabel]) + h.mutex.Lock() + lines, ok := h.receivedMap[filename] + if !ok { + lines = []string{} + } + h.receivedMap[filename] = append(lines, entry) + h.mutex.Unlock() + return nil +} + +func feed(wg *sync.WaitGroup, concat *Concat, filename string, lines []string) error { + defer wg.Done() + + labels := model.LabelSet { + api.FilenameLabel: model.LabelValue(filename), + } + for _, line := range lines { + err := concat.Handle(labels, time.Now(), line) + if err != nil { + return err + } + } + return nil +} + +func Test(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + logger = level.NewFilter(logger, level.AllowInfo()) + + for _, testCase := range testCases { + handler := &testHandler{ + receivedMap: map[string][]string{}, + mutex: sync.Mutex{}, + } + concat, err := New(logger, handler, testCase.config) + if err != nil { + t.Fatal("Unexpected concat initialization error for test case \"", testCase.name, "\"\nerror", err) + } + go concat.Run() + + var wg sync.WaitGroup + wg.Add(len(testCase.input)) + for filename, lines := range testCase.input { + go feed(&wg, concat, filename, lines) + } + wg.Wait() + + if !testCase.shortWait { + // Wait for flush interval + 1 second + time.Sleep(testCase.config.Timeout + 1 * time.Second) + } + + assert.Equal(t, testCase.output, handler.receivedMap, "Lines don't match in test case \"%s\"", testCase.name) + concat.Stop() + } +} diff --git a/pkg/promtail/scrape/scrape.go b/pkg/promtail/scrape/scrape.go index 932b51e43388..4cf82b66fdb1 100644 --- a/pkg/promtail/scrape/scrape.go +++ b/pkg/promtail/scrape/scrape.go @@ -2,7 +2,7 @@ package scrape import ( "fmt" - + "github.com/grafana/loki/pkg/promtail/concat" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/pkg/relabel" @@ -15,11 +15,16 @@ type Config struct { EntryParser api.EntryParser `yaml:"entry_parser"` RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"` ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"` + ConcatConfig concat.Config `yaml:"concat_config"` } // DefaultScrapeConfig is the default Config. var DefaultScrapeConfig = Config{ - EntryParser: api.Docker, + EntryParser: api.Docker, + ConcatConfig: concat.Config { + MultilineStartRegexpString: "", + Timeout: 0, + }, } // UnmarshalYAML implements the yaml.Unmarshaler interface. diff --git a/pkg/promtail/targets/filetarget.go b/pkg/promtail/targets/filetarget.go index de5bba46d234..9918a6c040a8 100644 --- a/pkg/promtail/targets/filetarget.go +++ b/pkg/promtail/targets/filetarget.go @@ -13,7 +13,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - fsnotify "gopkg.in/fsnotify.v1" + "gopkg.in/fsnotify.v1" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/promtail/api" @@ -40,10 +40,6 @@ var ( }) ) -const ( - filenameLabel = "__filename__" -) - // Config describes behavior for Target type Config struct { SyncPeriod time.Duration `yaml:"sync_period"` diff --git a/pkg/promtail/targets/filetargetmanager.go b/pkg/promtail/targets/filetargetmanager.go index 4ba4e7f6cc20..5101020d1f05 100644 --- a/pkg/promtail/targets/filetargetmanager.go +++ b/pkg/promtail/targets/filetargetmanager.go @@ -2,6 +2,9 @@ package targets import ( "context" + "fmt" + "github.com/grafana/loki/pkg/promtail/concat" + "github.com/pkg/errors" "os" "strings" @@ -73,13 +76,18 @@ func NewFileTargetManager( config := map[string]sd_config.ServiceDiscoveryConfig{} for _, cfg := range scrapeConfigs { + concat, err := concat.New(logger, client, cfg.ConcatConfig) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("error parsing concat config for %s", cfg.JobName)) + } s := &syncer{ log: logger, positions: positions, relabelConfig: cfg.RelabelConfigs, targets: map[string]*FileTarget{}, hostname: hostname, - entryHandler: cfg.EntryParser.Wrap(client), + concat: concat, + entryHandler: cfg.EntryParser.Wrap(concat), targetConfig: targetConfig, } tm.syncers[cfg.JobName] = s @@ -114,6 +122,7 @@ type syncer struct { log log.Logger positions *positions.Positions entryHandler api.EntryHandler + concat *concat.Concat hostname string targets map[string]*FileTarget @@ -199,6 +208,7 @@ func (s *syncer) stop() { level.Info(s.log).Log("msg", "Removing target", "key", key) target.Stop() delete(s.targets, key) + s.concat.Stop() } } diff --git a/pkg/promtail/targets/tailer.go b/pkg/promtail/targets/tailer.go index 8be8ef7e1444..dd93b60fa118 100644 --- a/pkg/promtail/targets/tailer.go +++ b/pkg/promtail/targets/tailer.go @@ -60,7 +60,7 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions *positions tailer := &tailer{ logger: logger, - handler: api.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler), + handler: api.AddLabelsMiddleware(model.LabelSet{api.FilenameLabel: model.LabelValue(path)}).Wrap(handler), positions: positions, path: path,