Skip to content

Commit

Permalink
feat: 增加了增量读取功能并改善了错误处理
Browse files Browse the repository at this point in the history
此提交在文件读取功能上进行了扩展,通过在utils/file/file.go中的ReadLineWithParallel函数和FindLineChunks函数添加“start”参数,实现了从指定位置进行增量读取。此外,当扫描器遇到错误时,utils
/ file /
file.go中的错误处理得到了改善,删除了panic表达式,而是直接返回,让函数继续处理。同时在utils/log/survey/survey.go中实现了来自utils/
file/file.go的功能,以使用新的增量读取功能替换旧功能。
  • Loading branch information
kercylan98 committed Nov 28, 2023
1 parent c10494d commit b11baa3
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
31 changes: 23 additions & 8 deletions utils/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package file

import (
"bufio"
"github.com/kercylan98/minotaur/utils/slice"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -149,45 +150,59 @@ func Paths(dir string) []string {
return paths
}

// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic,过程前发生错误将会返回 error
// ReadLineWithParallelByChannel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理
// - 由于是并行处理,所以处理过程中的顺序是不确定的。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error {
// - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) {
file, err := os.Open(filename)
if err != nil {
return err
return 0, err
}
defer func() {
_ = file.Close()
}()

chunks := FindLineChunks(file, chunkSize)
chunks := FindLineChunksByOffset(file, slice.GetValue(start, 0), chunkSize)
var end int64
var endMutex sync.Mutex
var wg sync.WaitGroup
for _, chunk := range chunks {
wg.Add(1)
go func(chunk [2]int64) {
defer wg.Done()

r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0])
endMutex.Lock()
e := chunk[1] - chunk[0]
if e > end {
end = e + 1
}
endMutex.Unlock()
r := io.NewSectionReader(file, chunk[0], e)

scanner := bufio.NewScanner(r)
for scanner.Scan() {
handlerFunc(scanner.Text())
}

if err := scanner.Err(); err != nil {
panic(err)
return
}
}(chunk)
}
wg.Wait()
return nil
return end, nil
}

// FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内
// - 使用该函数得到的分块是完整的行,不会出现行被分割的情况
// - 当过程中发生错误将会发生 panic
// - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置
func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
return FindLineChunksByOffset(file, 0, chunkSize)
}

// FindLineChunksByOffset 该函数与 FindLineChunks 类似,不同的是该函数可以指定 offset 从指定位置开始读取文件
func FindLineChunksByOffset(file *os.File, offset, chunkSize int64) [][2]int64 {
var chunks [][2]int64

fileSize, err := file.Seek(0, io.SeekEnd)
Expand All @@ -199,7 +214,7 @@ func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
panic(err)
}

currentPos := int64(0)
currentPos := offset
for currentPos < fileSize {
start := currentPos
if start != 0 { // 不是文件的开头
Expand Down
13 changes: 13 additions & 0 deletions utils/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/kercylan98/minotaur/utils/file"
"strings"
"testing"
"time"
)

func TestFilePaths(t *testing.T) {
Expand All @@ -20,3 +21,15 @@ func TestFilePaths(t *testing.T) {
}
fmt.Println("total line:", line, "total file:", fileCount)
}

func TestNewIncrementReader(t *testing.T) {
n, _ := file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) {
t.Log(s)
})

time.Sleep(time.Second * 3)
n, _ = file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) {
t.Log(s)
}, n)

}
4 changes: 2 additions & 2 deletions utils/log/survey/survey.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func Close(names ...string) {
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report {
analyzer := new(Analyzer)
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
_, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
})
if err != nil {
Expand All @@ -141,7 +141,7 @@ func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report
func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) *Report {
analyzer := new(Analyzer)
for _, filePath := range filePaths {
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
_, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
})
if err != nil {
Expand Down

0 comments on commit b11baa3

Please sign in to comment.