Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-line detection support to promtail #399

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/promtail/api/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package api

const (
FilenameLabel = "__filename__"
)
132 changes: 132 additions & 0 deletions pkg/promtail/concat/concat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package concat

import (
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
"regexp"
"sync"
"time"
)

type Config struct {
// Empty string results in disabling
MultilineStartRegexpString string `yaml:"multiline_start_regexp`
// 0 for wait forever, else duration in seconds
Timeout time.Duration `yaml:"flush_timeout"`
}

type logEntry struct {
labels model.LabelSet
time time.Time
line string
mutex sync.Mutex
}

type Concat struct {
next api.EntryHandler
logger log.Logger
multilineStartRegexp *regexp.Regexp
cfg Config
bufferMutex sync.Mutex
// Map of filename to log entry
buffer map[string]*logEntry
quit chan struct{}
wg sync.WaitGroup
}

func New(logger log.Logger, next api.EntryHandler, cfg Config) (*Concat, error) {
multilineStartRegexp, err := regexp.Compile(cfg.MultilineStartRegexpString)
if err != nil {
return nil, err
}
c := &Concat {
next: next,
logger: logger,
multilineStartRegexp: multilineStartRegexp,
cfg: cfg,
bufferMutex: sync.Mutex{},
buffer: map[string]*logEntry{},
quit: make(chan struct{}),
}
c.wg.Add(1)
return c, nil
}

// Handle implement EntryHandler
func (c *Concat) Handle(labels model.LabelSet, t time.Time, line string) error {
// Disabled concat if regex is empty
if c.cfg.MultilineStartRegexpString == "" {
return c.next.Handle(labels, t, line)
}

filenameLabel, ok := labels[api.FilenameLabel]
if !ok {
return fmt.Errorf("unable to find %s label in message", api.FilenameLabel)
}
filename := string(filenameLabel)

c.bufferMutex.Lock()
existingMessage, messageExists := c.buffer[filename]
shouldFlush := c.multilineStartRegexp.MatchString(line)

if !shouldFlush && !messageExists {
level.Warn(c.logger).Log("msg", "encountered non-multiline-starting line in empty buffer, indicates tailer started in middle of multiline entry", "file", filename)
}

// If the buffer exists and we shouldn't flush, we should append to the existing buffer
if messageExists && !shouldFlush {
c.buffer[filename].line += "\n" + line
} else {
c.buffer[filename] = &logEntry {
labels: labels,
time: t,
line: line,
}
}
// At this point, we're okay to unlock the mutex.
c.bufferMutex.Unlock()

// Flush the old entry if necessary
if shouldFlush && messageExists {
return c.next.Handle(existingMessage.labels, existingMessage.time, existingMessage.line)
}

return nil
}

func (c *Concat) flushLoop(forceFlush bool) {
c.bufferMutex.Lock()
for filename, logEntry := range c.buffer {
// Check whether the log entry should be force-flushed
if forceFlush || time.Now().Sub(logEntry.time) > c.cfg.Timeout {
c.next.Handle(logEntry.labels, logEntry.time, logEntry.line)
}
delete(c.buffer, filename)
}
c.bufferMutex.Unlock()
}

func (c *Concat) Run() {
defer func() {
c.flushLoop(true)
c.wg.Done()
}()
// Set up timer loop to poll buffer every second
timer := time.NewTicker(1*time.Second)
for {
select {
case <-timer.C:
c.flushLoop(false)
case <-c.quit:
return
}
}
}

func (c *Concat) Stop() {
close(c.quit)
c.wg.Wait()
}
192 changes: 192 additions & 0 deletions pkg/promtail/concat/concat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package concat

import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"os"
"sync"
"testing"
"time"
)

type testCase struct {
name string
config Config
input map[string][]string
output map[string][]string
shortWait bool
}

var disabled = Config {
MultilineStartRegexpString: "",
}

var enabled = Config {
MultilineStartRegexpString: "\\d{2}:\\d{2}:\\d{2}",
Timeout: time.Duration(1 * time.Second),
}

var testCases = []testCase {
{
name: "disabled concat",
config: disabled,
input: map[string][]string {
"path1": {
"path1 line 1",
"path1 line 2",
},
"path2": {
"path2 line 1",
"path2 line 2",
},
},
output: map[string][]string {
"path1": {
"path1 line 1",
"path1 line 2",
},
"path2": {
"path2 line 1",
"path2 line 2",
},
},
}, {
name: "base concat case",
config: enabled,
input: map[string][]string {
"path1": {
"00:00:00 path1 line 1",
"path1 subline 1",
"00:00:00 path1 line 2",
},
"path2": {
"00:00:00 path2 line 1",
"00:00:00 path2 line 2",
},
},
output: map[string][]string {
"path1": {
"00:00:00 path1 line 1\npath1 subline 1",
"00:00:00 path1 line 2",
},
"path2": {
"00:00:00 path2 line 1",
"00:00:00 path2 line 2",
},
},
shortWait: false,
}, {
name: "base concat case",
config: enabled,
input: map[string][]string {
"path1": {
"00:00:00 path1 line 1",
"path1 subline 1",
"00:00:00 path1 line 2",
},
"path2": {
"00:00:00 path2 line 1",
"00:00:00 path2 line 2",
},
},
output: map[string][]string {
"path1": {
"00:00:00 path1 line 1\npath1 subline 1",
"00:00:00 path1 line 2",
},
"path2": {
"00:00:00 path2 line 1",
"00:00:00 path2 line 2",
},
},
shortWait: false,
}, {
name: "test that message buffers if no timeout",
config: enabled,
input: map[string][]string {
"path1": {
"00:00:00 path1 line 1",
"path1 subline 1",
},
"path2": {
"00:00:00 path2 line 1",
"00:00:00 path2 line 2",
},
},
output: map[string][]string {
// Everything is buffered in path1, and path2 gets the first message through
"path2": {
"00:00:00 path2 line 1",
},
},
shortWait: true,
},
}

type testHandler struct {
receivedMap map[string][]string
mutex sync.Mutex
}

func (h *testHandler) Handle(labels model.LabelSet, time time.Time, entry string) error {
filename := string(labels[api.FilenameLabel])
h.mutex.Lock()
lines, ok := h.receivedMap[filename]
if !ok {
lines = []string{}
}
h.receivedMap[filename] = append(lines, entry)
h.mutex.Unlock()
return nil
}

func feed(wg *sync.WaitGroup, concat *Concat, filename string, lines []string) error {
defer wg.Done()

labels := model.LabelSet {
api.FilenameLabel: model.LabelValue(filename),
}
for _, line := range lines {
err := concat.Handle(labels, time.Now(), line)
if err != nil {
return err
}
}
return nil
}

func Test(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
logger = level.NewFilter(logger, level.AllowInfo())

for _, testCase := range testCases {
handler := &testHandler{
receivedMap: map[string][]string{},
mutex: sync.Mutex{},
}
concat, err := New(logger, handler, testCase.config)
if err != nil {
t.Fatal("Unexpected concat initialization error for test case \"", testCase.name, "\"\nerror", err)
}
go concat.Run()

var wg sync.WaitGroup
wg.Add(len(testCase.input))
for filename, lines := range testCase.input {
go feed(&wg, concat, filename, lines)
}
wg.Wait()

if !testCase.shortWait {
// Wait for flush interval + 1 second
time.Sleep(testCase.config.Timeout + 1 * time.Second)
}

assert.Equal(t, testCase.output, handler.receivedMap, "Lines don't match in test case \"%s\"", testCase.name)
concat.Stop()
}
}
9 changes: 7 additions & 2 deletions pkg/promtail/scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package scrape

import (
"fmt"

"github.com/grafana/loki/pkg/promtail/concat"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/pkg/relabel"

Expand All @@ -15,11 +15,16 @@ type Config struct {
EntryParser api.EntryParser `yaml:"entry_parser"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
ConcatConfig concat.Config `yaml:"concat_config"`
}

// DefaultScrapeConfig is the default Config.
var DefaultScrapeConfig = Config{
EntryParser: api.Docker,
EntryParser: api.Docker,
ConcatConfig: concat.Config {
MultilineStartRegexpString: "",
Timeout: 0,
},
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down
6 changes: 1 addition & 5 deletions pkg/promtail/targets/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
fsnotify "gopkg.in/fsnotify.v1"
"gopkg.in/fsnotify.v1"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/promtail/api"
Expand All @@ -40,10 +40,6 @@ var (
})
)

const (
filenameLabel = "__filename__"
)

// Config describes behavior for Target
type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
Expand Down
Loading