Skip to content

Commit

Permalink
Merge pull request #10 from neelayu/fix-race
Browse files Browse the repository at this point in the history
chore: fix race condition in watcher implementations
  • Loading branch information
srebhan authored Nov 30, 2022
2 parents b08fcc6 + ca3b55c commit 19b97bf
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
14 changes: 7 additions & 7 deletions watch/inotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"fmt"
"os"
"path/filepath"

"github.com/influxdata/tail/util"
"sync/atomic"

"gopkg.in/fsnotify.v1"
"gopkg.in/tomb.v1"

"github.com/influxdata/tail/util"
)

// InotifyFileWatcher uses inotify to monitor file changes.
Expand Down Expand Up @@ -72,14 +73,14 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange
}

changes := NewFileChanges()
fw.Size = pos
atomic.StoreInt64(&fw.Size, pos)

go func() {

events := Events(fw.Filename)

for {
prevSize := fw.Size
prevSize := atomic.LoadInt64(&fw.Size)

var evt fsnotify.Event
var ok bool
Expand Down Expand Up @@ -125,14 +126,13 @@ func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange
// XXX: report this error back to the user
util.Fatal("Failed to stat file %v: %v", fw.Filename, err)
}
fw.Size = fi.Size()
atomic.StoreInt64(&fw.Size, fi.Size())

if prevSize > 0 && prevSize > fw.Size {
if prevSize > 0 && prevSize > atomic.LoadInt64(&fw.Size) {
changes.NotifyTruncated()
} else {
changes.NotifyModified()
}
prevSize = fw.Size
}
}
}()
Expand Down
20 changes: 11 additions & 9 deletions watch/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package watch
import (
"os"
"runtime"
"sync/atomic"
"time"

"github.com/influxdata/tail/util"
"gopkg.in/tomb.v1"

"github.com/influxdata/tail/util"
)

// PollingFileWatcher polls the file for changes.
Expand Down Expand Up @@ -54,10 +56,10 @@ func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange
// XXX: use tomb.Tomb to cleanly manage these goroutines. replace
// the fatal (below) with tomb's Kill.

fw.Size = pos
atomic.StoreInt64(&fw.Size, pos)

go func() {
prevSize := fw.Size
prevSize := atomic.LoadInt64(&fw.Size)
for {
select {
case <-t.Dying():
Expand Down Expand Up @@ -87,19 +89,19 @@ func (fw *PollingFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChange
}

// File got truncated?
fw.Size = fi.Size()
if prevSize > 0 && prevSize > fw.Size {
atomic.StoreInt64(&fw.Size, fi.Size())
if prevSize > 0 && prevSize > atomic.LoadInt64(&fw.Size) {
changes.NotifyTruncated()
prevSize = fw.Size
prevSize = atomic.LoadInt64(&fw.Size)
continue
}
// File got bigger?
if prevSize > 0 && prevSize < fw.Size {
if prevSize > 0 && prevSize < atomic.LoadInt64(&fw.Size) {
changes.NotifyModified()
prevSize = fw.Size
prevSize = atomic.LoadInt64(&fw.Size)
continue
}
prevSize = fw.Size
prevSize = atomic.LoadInt64(&fw.Size)

// File was appended to (changed)?
modTime := fi.ModTime()
Expand Down
5 changes: 3 additions & 2 deletions watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package watch
import (
"errors"
"fmt"
"gopkg.in/tomb.v1"
"io/ioutil"
"os"
"os/exec"
Expand All @@ -12,6 +11,8 @@ import (
"sync"
"testing"
"time"

"gopkg.in/tomb.v1"
)

func TestWatchNotify(t *testing.T) {
Expand Down Expand Up @@ -44,8 +45,8 @@ func TestWatchNotify(t *testing.T) {
var werr error
changes := 0
chanClose := make(chan struct{})
wg.Add(1)
go func() {
wg.Add(1)
changes, werr = watchFile(filePath, test.poll, chanClose)
wg.Done()
}()
Expand Down

0 comments on commit 19b97bf

Please sign in to comment.