Skip to content

Commit

Permalink
Merge pull request #916 from redHJ/pdr-8405
Browse files Browse the repository at this point in the history
tailx/dirx newest模式下如果expire的文件再次写入新数据可能会将老数据一起读入
  • Loading branch information
wonderflow authored Jan 24, 2019
2 parents 4e02b9f + 0b7407f commit 98e4c7c
Show file tree
Hide file tree
Showing 16 changed files with 739 additions and 100 deletions.
2 changes: 1 addition & 1 deletion reader/cloudtrail/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
validFilePattern, _ := conf.GetStringOr(KeyValidFilePattern, "*")
bufSize, _ := conf.GetIntOr(KeyBufSize, reader.DefaultBufSize)
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
sf, err := reader.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest)
sf, err := reader.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, nil)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion reader/dirx/dir_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ type newReaderOptions struct {
Whence string
BufferSize int

expireMap map[string]int64

MsgChan chan<- message
ErrChan chan<- error

Expand All @@ -285,7 +287,7 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool) (*dir
if isNewDir && subMeta.IsNotExist() {
opts.Whence = WhenceOldest // 非存量文件第一次读取时从头开始读
}
fr, err := reader.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence)
fr, err := reader.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, opts.expireMap)
if err != nil {
return nil, fmt.Errorf("new sequence file: %v", err)
}
Expand Down
15 changes: 15 additions & 0 deletions reader/dirx/dirx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/qiniu/logkit/conf"
"github.com/qiniu/logkit/reader"
. "github.com/qiniu/logkit/reader/config"
"github.com/qiniu/logkit/utils"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -69,6 +70,8 @@ type Reader struct {
whence string
bufferSize int

expireMap map[string]int64

notFirstTime bool
readSameInode bool
}
Expand Down Expand Up @@ -161,6 +164,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
whence: whence,
bufferSize: bufferSize,
readSameInode: readSameInode,
expireMap: make(map[string]int64),
}, nil
}

Expand Down Expand Up @@ -262,6 +266,14 @@ func (r *Reader) statLogPath() {

// 过期的文件不追踪,除非之前追踪的并且有日志没读完
if !r.dirReaders.hasCachedLine(logPath) && HasDirExpired(logPath, r.dirReaders.expire) {
if r.whence == WhenceNewest {
fileMap, err := utils.GetFiles(r.meta.RunnerName, logPath)
if err != nil {
log.Errorf("Runner[%v] get log path %q failed %v, ignored this time", r.meta.RunnerName, logPath, err)
} else {
utils.UpdateExpireMap(r.meta.RunnerName, fileMap, r.expireMap)
}
}
log.Debugf("Runner[%v] log path %q has expired, ignored this time", r.meta.RunnerName, logPath)
continue
}
Expand All @@ -280,6 +292,7 @@ func (r *Reader) statLogPath() {
MsgChan: r.msgChan,
ErrChan: r.errChan,
ReadSameInode: r.readSameInode,
expireMap: r.expireMap,
}, r.notFirstTime)
if err != nil {
err = fmt.Errorf("create new reader for log path %q failed: %v", logPath, err)
Expand Down Expand Up @@ -328,6 +341,7 @@ func (r *Reader) Start() error {
defer ticker.Stop()
for {
r.dirReaders.checkExpiredDirs()
utils.CheckNotExistFile(r.meta.RunnerName, r.expireMap)
r.statLogPath()

select {
Expand Down Expand Up @@ -429,6 +443,7 @@ func (r *Reader) Close() error {
}

func (r *Reader) Reset() error {
r.expireMap = make(map[string]int64)
return r.dirReaders.Reset()
}

Expand Down
109 changes: 109 additions & 0 deletions reader/dirx/dirx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ func createFileWithContent(filepathn, lines string) {
file.Close()
}

func appendFileWithContent(filepathn, lines string) {
file, err := os.OpenFile(filepathn, os.O_APPEND|os.O_WRONLY, DefaultFilePerm)
if err != nil {
log.Error(err)
return
}
file.WriteString(lines)
file.Close()
}

func createDirWithName(dirx string) {
err := os.MkdirAll(dirx, DefaultDirPerm)
if err != nil {
Expand All @@ -47,6 +57,7 @@ func TestStart(t *testing.T) {
"multiReaderSyncMetaOneLineTest": multiReaderSyncMetaOneLineTest,
"multiReaderSyncMetaMutilineTest": multiReaderSyncMetaMutilineTest,
"multiReaderNewestTest": multiReaderNewestTest,
"multiReaderNewestOffsetTest": multiReaderNewestOffsetTest,
"multiReaderSameInodeTest": multiReaderSameInodeTest,
}

Expand Down Expand Up @@ -610,6 +621,104 @@ func multiReaderNewestTest(t *testing.T) {
assert.Equal(t, StatsInfo{}, dr.Status())
}

func multiReaderNewestOffsetTest(t *testing.T) {
dirname := "multiReaderNewestOffsetTest"
dir1 := filepath.Join(dirname, "logs/abc")
dir2 := filepath.Join(dirname, "logs/xyz")
dir1file1 := filepath.Join(dir1, "file1.log")
dir2file1 := filepath.Join(dir2, "file1.log")

createDirWithName(dirname)
defer os.RemoveAll(dirname)

createDirWithName(dir1)
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\n")
time.Sleep(15 * time.Second)
expectResults := map[string]int{
"abc\nx\n": 1,
"abc\ny\n": 1,
"abc\nz\n": 1,
"abc\na\n": 1,
"abc\nb\n": 1,
"abc\nc\n": 1,
}
actualResults := make(map[string]int)
logPathPattern := filepath.Join(dirname, "logs/*")
c := conf.MapConf{
"log_path": logPathPattern,
"stat_interval": "1s",
"expire": "10s",
"max_open_files": "128",
"read_from": "newest",
"reader_buf_size": "1024",
"meta_path": dirname,
"mode": ModeDirx,
}
meta, err := reader.NewMetaWithConf(c)
assert.NoError(t, err)
r, err := NewReader(meta, c)
assert.NoError(t, err)

err = r.SetMode(ReadModeHeadPatternString, "^abc*")
assert.Nil(t, err)
dr := r.(*Reader)
assert.NoError(t, dr.Start())
t.Log("Reader has started")

assert.Equal(t, 10*time.Second, dr.expire)
assert.Equal(t, 720*time.Hour, dr.submetaExpire)

maxNum := 0
emptyNum := 0
for {
data, err := dr.ReadLine()
assert.Nil(t, err)
if data != "" {
t.Log("Data:", data, dr.Source(), maxNum)
actualResults[data]++
maxNum++
} else {
emptyNum++
}
if emptyNum > 5 {
break
}
}
assert.EqualValues(t, 0, maxNum)
t.Log("Reader has finished reading one")

emptyNum = 0
// 确保上个 reader 已过期,新的 reader 已经探测到并创建成功
createDirWithName(dir2)
createFileWithContent(dir2file1, "abc\nx\nabc\ny\nabc\nz\n")
appendFileWithContent(dir1file1, "abc\na\nabc\nb\nabc\nc\n")
time.Sleep(3 * time.Second)
assert.Equal(t, 2, dr.dirReaders.Num(), "Number of readers")

t.Log("Reader has started to read two")
emptyNum = 0
for {
data, err := dr.ReadLine()
if data != "" {
t.Log("Data:", data, dr.Source(), maxNum)
actualResults[data]++
maxNum++
} else {
emptyNum++
}
if err == io.EOF {
break
}
if maxNum >= 6 || emptyNum > 10 {
break
}
}
t.Log("Reader has finished reading two")

assert.EqualValues(t, expectResults, actualResults)
assert.Equal(t, StatsInfo{}, dr.Status())
}

func multiReaderSameInodeTest(t *testing.T) {
dirname := "multiReaderSameInodeTest"
dir1 := filepath.Join(dirname, "logs/abc")
Expand Down
4 changes: 2 additions & 2 deletions reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func NewFileDirReader(meta *Meta, conf conf.MapConf) (reader Reader, err error)
newfileNewLine, _ := conf.GetBoolOr(KeyNewFileNewLine, false)
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
readSameInode, _ := conf.GetBoolOr(KeyReadSameInode, false)
fr, err := NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence)
fr, err := NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, nil)
if err != nil {
return
}
Expand All @@ -177,7 +177,7 @@ func NewSingleFileReader(meta *Meta, conf conf.MapConf) (reader Reader, err erro
whence, _ := conf.GetStringOr(KeyWhence, WhenceOldest)
errDirectReturn, _ := conf.GetBoolOr(KeyErrDirectReturn, true)

fr, err := NewSingleFile(meta, logpath, whence, errDirectReturn)
fr, err := NewSingleFile(meta, logpath, whence, 0, errDirectReturn)
if err != nil {
return
}
Expand Down
56 changes: 53 additions & 3 deletions reader/seqfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type SeqFile struct {
lastSyncPath string
lastSyncOffset int64

expireMap map[string]int64

ReadSameInode bool //记录已经度过的filename_inode是否继续读
}

Expand Down Expand Up @@ -110,7 +112,7 @@ func getStartFile(path, whence string, meta *Meta, sf *SeqFile) (f *os.File, dir
return
}

func NewSeqFile(meta *Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string) (sf *SeqFile, err error) {
func NewSeqFile(meta *Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string, expireMap map[string]int64) (sf *SeqFile, err error) {
sf = &SeqFile{
ignoreFileSuffix: suffixes,
ignoreHidden: ignoreHidden,
Expand All @@ -119,6 +121,7 @@ func NewSeqFile(meta *Meta, path string, ignoreHidden, newFileNewLine bool, suff
newFileAsNewLine: newFileNewLine,
meta: meta,
inodeDone: make(map[string]bool),
expireMap: expireMap,
}
//原来的for循环替换成单次执行,启动的时候出错就直接报错给用户即可,不需要等待重试。
f, dir, currFile, offset, err := getStartFile(path, whence, meta, sf)
Expand All @@ -130,7 +133,9 @@ func NewSeqFile(meta *Meta, path string, ignoreHidden, newFileNewLine bool, suff
err = nil
dir = path
}

if f != nil {
offset = sf.getOffset(f, offset, false)
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
f.Close()
Expand Down Expand Up @@ -518,8 +523,9 @@ func (sf *SeqFile) newOpen() (err error) {
} else {
sf.ratereader = f
}
sf.offset = sf.getOffset(f, 0, true)
sf.f = f
sf.offset = 0

sf.inode, err = utilsos.GetIdentifyIDByPath(sf.currFile)
if err != nil {
return
Expand Down Expand Up @@ -559,7 +565,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
} else {
sf.ratereader = f
}
sf.offset = 0
sf.offset = sf.getOffset(f, 0, true)
sf.inode, err = utilsos.GetIdentifyIDByPath(sf.currFile)
if err != nil {
return err
Expand Down Expand Up @@ -651,3 +657,47 @@ type LineSkipper interface {
IsNewOpen() bool
SetSkipped()
}

func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
if len(sf.expireMap) == 0 || offset != 0 || f == nil {
return offset
}

if sf.meta.IsExist() {
deleteNotExist(filepath.Dir(f.Name()), sf.expireMap)
return offset
}

fileName := f.Name()
inode, err := utilsos.GetIdentifyIDByPath(fileName)
if err != nil {
log.Errorf("Runner[%s] NewSeqFile get file %s inode error %v, ignore...", sf.meta.RunnerName, fileName, err)
return offset
}
inodeStr := strconv.FormatUint(inode, 10)
offset = sf.expireMap[inodeStr+"_"+fileName]
if seek {
_, err = f.Seek(sf.offset, io.SeekStart)
if err != nil {
log.Errorf("Runner[%s] file: %s seek offset: %d failed: %v", sf.meta.RunnerName, f.Name(), sf.offset, err)
}
}
return offset
}

func deleteNotExist(dir string, expireMap map[string]int64) {
if dir == "" {
return
}
var arr []string
for inodeFile := range expireMap {
arr = strings.SplitN(inodeFile, "_", 2)
if len(arr) < 2 {
continue
}
if filepath.Dir(arr[1]) != dir {
continue
}
delete(expireMap, inodeFile)
}
}
Loading

0 comments on commit 98e4c7c

Please sign in to comment.