From b11baa3653cb6f4532c40649b8c5b0ddc0b12acb Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 28 Nov 2023 11:59:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E4=BA=86=E5=A2=9E?= =?UTF-8?q?=E9=87=8F=E8=AF=BB=E5=8F=96=E5=8A=9F=E8=83=BD=E5=B9=B6=E6=94=B9?= =?UTF-8?q?=E5=96=84=E4=BA=86=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 此提交在文件读取功能上进行了扩展,通过在utils/file/file.go中的ReadLineWithParallel函数和FindLineChunks函数添加“start”参数,实现了从指定位置进行增量读取。此外,当扫描器遇到错误时,utils / file / file.go中的错误处理得到了改善,删除了panic表达式,而是直接返回,让函数继续处理。同时在utils/log/survey/survey.go中实现了来自utils/ file/file.go的功能,以使用新的增量读取功能替换旧功能。 --- utils/file/file.go | 31 +++++++++++++++++++++++-------- utils/file/file_test.go | 13 +++++++++++++ utils/log/survey/survey.go | 4 ++-- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/utils/file/file.go b/utils/file/file.go index 97a1e5a7..cbe6a601 100644 --- a/utils/file/file.go +++ b/utils/file/file.go @@ -2,6 +2,7 @@ package file import ( "bufio" + "github.com/kercylan98/minotaur/utils/slice" "io" "os" "path/filepath" @@ -149,25 +150,34 @@ func Paths(dir string) []string { return paths } -// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic,过程前发生错误将会返回 error +// ReadLineWithParallelByChannel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理 // - 由于是并行处理,所以处理过程中的顺序是不确定的。 -func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error { +// - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。 +func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) { file, err := os.Open(filename) if err != nil { - return err + return 0, err } defer func() { _ = file.Close() }() - chunks := FindLineChunks(file, chunkSize) + chunks := FindLineChunksByOffset(file, slice.GetValue(start, 0), chunkSize) + var end int64 + var endMutex sync.Mutex var wg sync.WaitGroup for _, chunk := range chunks { wg.Add(1) go func(chunk [2]int64) { defer wg.Done() - r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0]) + endMutex.Lock() + e := chunk[1] - chunk[0] + if e > end { + end = e + 1 + } + endMutex.Unlock() + r := io.NewSectionReader(file, chunk[0], e) scanner := bufio.NewScanner(r) for scanner.Scan() { @@ -175,12 +185,12 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str } if err := scanner.Err(); err != nil { - panic(err) + return } }(chunk) } wg.Wait() - return nil + return end, nil } // FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内 @@ -188,6 +198,11 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str // - 当过程中发生错误将会发生 panic // - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置 func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 { + return FindLineChunksByOffset(file, 0, chunkSize) +} + +// FindLineChunksByOffset 该函数与 FindLineChunks 类似,不同的是该函数可以指定 offset 从指定位置开始读取文件 +func FindLineChunksByOffset(file *os.File, offset, chunkSize int64) [][2]int64 { var chunks [][2]int64 fileSize, err := file.Seek(0, io.SeekEnd) @@ -199,7 +214,7 @@ func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 { panic(err) } - currentPos := int64(0) + currentPos := offset for currentPos < fileSize { start := currentPos if start != 0 { // 不是文件的开头 diff --git a/utils/file/file_test.go b/utils/file/file_test.go index ae459984..3e97aec1 100644 --- a/utils/file/file_test.go +++ b/utils/file/file_test.go @@ -5,6 +5,7 @@ import ( "github.com/kercylan98/minotaur/utils/file" "strings" "testing" + "time" ) func TestFilePaths(t *testing.T) { @@ -20,3 +21,15 @@ func TestFilePaths(t *testing.T) { } fmt.Println("total line:", line, "total file:", fileCount) } + +func TestNewIncrementReader(t *testing.T) { + n, _ := file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) { + t.Log(s) + }) + + time.Sleep(time.Second * 3) + n, _ = file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) { + t.Log(s) + }, n) + +} diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index 750c2614..181286fd 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -127,7 +127,7 @@ func Close(names ...string) { // - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据 func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report { analyzer := new(Analyzer) - err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + _, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { handle(analyzer, R(s)) }) if err != nil { @@ -141,7 +141,7 @@ func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) *Report { analyzer := new(Analyzer) for _, filePath := range filePaths { - err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + _, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { handle(analyzer, R(s)) }) if err != nil {