Skip to content

Commit

Permalink
Merge branch 'main' into mapped_grpc_cancellation_error_to_internal_s…
Browse files Browse the repository at this point in the history
…erver_error
  • Loading branch information
vlad-diachenko committed Feb 2, 2022
2 parents 567c726 + 3fb572a commit 95c58a8
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 159 deletions.
17 changes: 9 additions & 8 deletions clients/pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ func NewFileTarget(
targetEventHandler: targetEventHandler,
}

err := t.sync()
if err != nil {
return nil, errors.Wrap(err, "filetarget.sync")
}

go t.run()
return t, nil
}
Expand Down Expand Up @@ -159,7 +154,13 @@ func (t *FileTarget) run() {
close(t.done)
}()

err := t.sync()
if err != nil {
level.Error(t.logger).Log("msg", "error running sync function", "error", err)
}

ticker := time.NewTicker(t.targetConfig.SyncPeriod)
defer ticker.Stop()

for {
select {
Expand Down Expand Up @@ -248,7 +249,7 @@ func (t *FileTarget) startWatching(dirs map[string]struct{}) {
if _, ok := t.watches[dir]; ok {
continue
}
level.Debug(t.logger).Log("msg", "watching new directory", "directory", dir)
level.Info(t.logger).Log("msg", "watching new directory", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStart,
Expand All @@ -261,7 +262,7 @@ func (t *FileTarget) stopWatching(dirs map[string]struct{}) {
if _, ok := t.watches[dir]; !ok {
continue
}
level.Debug(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStop,
Expand All @@ -280,7 +281,7 @@ func (t *FileTarget) startTailing(ps []string) {
continue
}
if fi.IsDir() {
level.Error(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p)
level.Info(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p)
continue
}
level.Debug(t.logger).Log("msg", "tailing new file", "filename", p)
Expand Down
150 changes: 68 additions & 82 deletions clients/pkg/promtail/targets/file/filetarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/atomic"
Expand All @@ -17,25 +19,17 @@ import (

"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/positions"
"github.com/grafana/loki/clients/pkg/promtail/targets/testutils"
)

func TestFileTargetSync(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

testutils.InitRandom()
dirName := "/tmp/" + testutils.RandName()
positionsFileName := dirName + "/positions.yml"
logDir1 := dirName + "/log1"
logDir1File1 := logDir1 + "/test1.log"
logDir1File2 := logDir1 + "/test2.log"

err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(dirName) }()
dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir1 := filepath.Join(dirName, "log1")
logDir1File1 := filepath.Join(logDir1, "test1.log")
logDir1File2 := filepath.Join(logDir1, "test2.log")

// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
Expand Down Expand Up @@ -74,11 +68,9 @@ func TestFileTargetSync(t *testing.T) {
}()
path := logDir1 + "/*.log"
target, err := NewFileTarget(metrics, logger, client, ps, path, nil, nil, &Config{
SyncPeriod: 10 * time.Second,
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
}, nil, fakeHandler)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)

// Start with nothing watched.
if len(target.watches) != 0 {
Expand All @@ -89,12 +81,12 @@ func TestFileTargetSync(t *testing.T) {
}

// Create the base dir, still nothing watched.
if err = os.MkdirAll(logDir1, 0750); err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
err = os.MkdirAll(logDir1, 0750)
assert.NoError(t, err)

err = target.sync()
assert.NoError(t, err)

if len(target.watches) != 0 {
t.Fatal("Expected watches to be 0 at this point in the test...")
}
Expand All @@ -104,64 +96,64 @@ func TestFileTargetSync(t *testing.T) {

// Add a file, which should create a watcher and a tailer.
_, err = os.Create(logDir1File1)
if err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 1 {
t.Fatal("Expected watches to be 1 at this point in the test...")
}
if len(target.tails) != 1 {
t.Fatal("Expected tails to be 1 at this point in the test...")
}
assert.NoError(t, err)

// Delay sync() call to make sure the filesystem watch event does not fire during sync()
time.Sleep(10 * time.Millisecond)
err = target.sync()
assert.NoError(t, err)

assert.Equal(t, 1, len(target.watches),
"Expected watches to be 1 at this point in the test...",
)
assert.Equal(t, 1, len(target.tails),
"Expected tails to be 1 at this point in the test...",
)
require.Eventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...")

// Add another file, should get another tailer.
_, err = os.Create(logDir1File2)
if err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 1 {
t.Fatal("Expected watches to be 1 at this point in the test...")
}
if len(target.tails) != 2 {
t.Fatal("Expected tails to be 2 at this point in the test...")
}
assert.NoError(t, err)

err = target.sync()
assert.NoError(t, err)

assert.Equal(t, 1, len(target.watches),
"Expected watches to be 1 at this point in the test...",
)
assert.Equal(t, 2, len(target.tails),
"Expected tails to be 2 at this point in the test...",
)

// Remove one of the files, tailer should stop.
if err = os.Remove(logDir1File1); err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 1 {
t.Fatal("Expected watches to be 1 at this point in the test...")
}
if len(target.tails) != 1 {
t.Fatal("Expected tails to be 1 at this point in the test...")
}
err = os.Remove(logDir1File1)
assert.NoError(t, err)

err = target.sync()
assert.NoError(t, err)

assert.Equal(t, 1, len(target.watches),
"Expected watches to be 1 at this point in the test...",
)
assert.Equal(t, 1, len(target.tails),
"Expected tails to be 1 at this point in the test...",
)

// Remove the entire directory, other tailer should stop and watcher should go away.
if err = os.RemoveAll(logDir1); err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 0 {
t.Fatal("Expected watches to be 0 at this point in the test...")
}
if len(target.tails) != 0 {
t.Fatal("Expected tails to be 0 at this point in the test...")
}
err = os.RemoveAll(logDir1)
assert.NoError(t, err)

err = target.sync()
assert.NoError(t, err)

assert.Equal(t, 0, len(target.watches),
"Expected watches to be 0 at this point in the test...",
)
assert.Equal(t, 0, len(target.tails),
"Expected tails to be 0 at this point in the test...",
)
require.Eventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...")
Expand All @@ -177,18 +169,12 @@ func TestHandleFileCreationEvent(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

testutils.InitRandom()
dirName := "/tmp/" + testutils.RandName()
positionsFileName := dirName + "/positions.yml"
logDir := dirName + "/log"
logFile := logDir + "/test1.log"
dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir := filepath.Join(dirName, "log")
logFile := filepath.Join(logDir, "test1.log")

err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(dirName) }()
if err = os.MkdirAll(logDir, 0750); err != nil {
if err := os.MkdirAll(logDir, 0750); err != nil {
t.Fatal(err)
}

Expand Down
42 changes: 33 additions & 9 deletions clients/pkg/promtail/targets/file/filetargetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ const (
type FileTargetManager struct {
log log.Logger
quit context.CancelFunc
done chan struct{}
syncers map[string]*targetSyncer
manager *discovery.Manager

watcher *fsnotify.Watcher
targetEventHandler chan fileTargetEvent

wg sync.WaitGroup
}

// NewFileTargetManager creates a new TargetManager.
Expand All @@ -70,7 +71,6 @@ func NewFileTargetManager(
tm := &FileTargetManager{
log: logger,
quit: quit,
done: make(chan struct{}),
watcher: watcher,
targetEventHandler: make(chan fileTargetEvent),
syncers: map[string]*targetSyncer{},
Expand Down Expand Up @@ -134,14 +134,19 @@ func NewFileTargetManager(
configs[cfg.JobName] = cfg.ServiceDiscoveryConfig.Configs()
}

tm.wg.Add(3)
go tm.run(ctx)
go tm.watch(ctx)
go tm.watchTargetEvents(ctx)
go tm.watchFsEvents(ctx)

go util.LogError("running target manager", tm.manager.Run)

return tm, tm.manager.ApplyConfig(configs)
}

func (tm *FileTargetManager) watch(ctx context.Context) {
func (tm *FileTargetManager) watchTargetEvents(ctx context.Context) {
defer tm.wg.Done()

for {
select {
case event := <-tm.targetEventHandler:
Expand All @@ -155,9 +160,21 @@ func (tm *FileTargetManager) watch(ctx context.Context) {
level.Error(tm.log).Log("msg", " failed to remove directory from watcher", "error", err)
}
}
case <-ctx.Done():
return
}
}
}

func (tm *FileTargetManager) watchFsEvents(ctx context.Context) {
defer tm.wg.Done()

for {
select {
case event := <-tm.watcher.Events:
// we only care about Create events
if event.Op == fsnotify.Create {
level.Info(tm.log).Log("msg", "received file watcher event", "name", event.Name, "op", event.Op.String())
for _, s := range tm.syncers {
s.sendFileCreateEvent(event)
}
Expand All @@ -171,7 +188,8 @@ func (tm *FileTargetManager) watch(ctx context.Context) {
}

func (tm *FileTargetManager) run(ctx context.Context) {
defer close(tm.done)
defer tm.wg.Done()

for {
select {
case targetGroups := <-tm.manager.SyncCh():
Expand All @@ -197,7 +215,8 @@ func (tm *FileTargetManager) Ready() bool {
// Stop the TargetManager.
func (tm *FileTargetManager) Stop() {
tm.quit()
<-tm.done
tm.wg.Wait()

for _, s := range tm.syncers {
s.stop()
}
Expand Down Expand Up @@ -307,9 +326,14 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group, targetEventHandler chan
}

level.Info(s.log).Log("msg", "Adding target", "key", key)
watcher := make(chan fsnotify.Event)
s.fileEventWatchers[string(path)] = watcher
t, err := s.newTarget(string(path), labels, discoveredLabels, watcher, targetEventHandler)

wkey := string(path)
watcher, ok := s.fileEventWatchers[wkey]
if !ok {
watcher = make(chan fsnotify.Event)
s.fileEventWatchers[wkey] = watcher
}
t, err := s.newTarget(wkey, labels, discoveredLabels, watcher, targetEventHandler)
if err != nil {
dropped = append(dropped, target.NewDroppedTarget(fmt.Sprintf("Failed to create target: %s", err.Error()), discoveredLabels))
level.Error(s.log).Log("msg", "Failed to create target", "key", key, "error", err)
Expand Down
Loading

0 comments on commit 95c58a8

Please sign in to comment.