diff --git a/utils/log/survey/flusher.go b/utils/log/survey/flusher.go new file mode 100644 index 00000000..eb7b510e --- /dev/null +++ b/utils/log/survey/flusher.go @@ -0,0 +1,9 @@ +package survey + +// Flusher 用于刷新缓冲区的接口 +type Flusher interface { + // Flush 将缓冲区的数据持久化 + Flush(records []string) + // Info 返回当前刷新器的信息 + Info() string +} diff --git a/utils/log/survey/flusher_file.go b/utils/log/survey/flusher_file.go new file mode 100644 index 00000000..184a0a31 --- /dev/null +++ b/utils/log/survey/flusher_file.go @@ -0,0 +1,73 @@ +package survey + +import ( + "bufio" + "fmt" + "github.com/kercylan98/minotaur/utils/log" + "os" + "path/filepath" + "strings" + "time" +) + +// NewFileFlusher 创建一个文件刷新器 +// - layout 为日志文件名的时间戳格式 (默认为 time.DateOnly) +func NewFileFlusher(filePath string, layout ...string) *FileFlusher { + fn := filepath.Base(filePath) + ext := filepath.Ext(fn) + fn = strings.TrimSuffix(fn, ext) + dir := filepath.Dir(filePath) + fl := &FileFlusher{ + dir: dir, + fn: fn, + fe: ext, + layout: time.DateOnly, + layoutLen: len(time.DateOnly), + } + if len(layout) > 0 { + fl.layout = layout[0] + fl.layoutLen = len(fl.layout) + } + return fl +} + +type FileFlusher struct { + dir string + fn string + fe string + layout string + layoutLen int +} + +func (slf *FileFlusher) Flush(records []string) { + var ( + file *os.File + writer *bufio.Writer + err error + last string + ) + for _, data := range records { + tick := data[0:slf.layoutLen] + if tick != last { + if file != nil { + _ = writer.Flush() + _ = file.Close() + } + fp := filepath.Join(slf.dir, fmt.Sprintf("%s.%s%s", slf.fn, tick, slf.fe)) + file, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Fatal("Survey", log.String("Action", "DateSwitch"), log.String("FilePath", fp), log.Err(err)) + return + } + writer = bufio.NewWriterSize(file, 1024*10240) + last = tick + } + _, _ = writer.WriteString(data) + } + _ = writer.Flush() + _ = file.Close() +} + +func (slf *FileFlusher) Info() string { + return fmt.Sprintf("%s/%s.${DATE}%s", slf.dir, slf.fn, slf.fe) +} diff --git a/utils/log/survey/logger.go b/utils/log/survey/logger.go index ddd03a67..0fe66dc7 100644 --- a/utils/log/survey/logger.go +++ b/utils/log/survey/logger.go @@ -1,28 +1,17 @@ package survey import ( - "bufio" - "fmt" - "github.com/kercylan98/minotaur/utils/log" - "os" - "path/filepath" "sync" "time" ) // logger 用于埋点数据的运营日志记录器 type logger struct { - bl sync.Mutex // writer lock - wl sync.Mutex // flush lock - dir string - fn string - fe string - bs []string - layout string - layoutLen int - dataLayout string - dataLayoutLen int - interval time.Duration + bl sync.Mutex // writer lock + wl sync.Mutex // flush lock + bs []string + interval time.Duration + flusher Flusher } // flush 将记录器缓冲区的数据写入到文件 @@ -39,33 +28,7 @@ func (slf *logger) flush() { slf.wl.Lock() defer slf.wl.Unlock() - - var ( - file *os.File - writer *bufio.Writer - err error - last string - ) - for _, data := range ds { - tick := data[0:slf.layoutLen] - if tick != last { - if file != nil { - _ = writer.Flush() - _ = file.Close() - } - fp := slf.filePath(tick) - file, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - log.Fatal("Survey", log.String("Action", "DateSwitch"), log.String("FilePath", fp), log.Err(err)) - return - } - writer = bufio.NewWriterSize(file, 1024*10240) - last = tick - } - _, _ = writer.WriteString(data) - } - _ = writer.Flush() - _ = file.Close() + slf.flusher.Flush(ds) } // writer 写入数据到记录器缓冲区 @@ -77,8 +40,3 @@ func (slf *logger) writer(d string) { slf.flush() } } - -// filePath 获取文件路径 -func (slf *logger) filePath(t string) string { - return filepath.Join(slf.dir, fmt.Sprintf("%s.%s%s", slf.fn, t, slf.fe)) -} diff --git a/utils/log/survey/options.go b/utils/log/survey/options.go index ad33eb91..b160caca 100644 --- a/utils/log/survey/options.go +++ b/utils/log/survey/options.go @@ -5,15 +5,6 @@ import "time" // Option 选项 type Option func(logger *logger) -// WithLayout 设置日志文件名的时间戳格式 -// - 默认为 time.DateOnly -func WithLayout(layout string) Option { - return func(logger *logger) { - logger.layout = layout - logger.layoutLen = len(layout) - } -} - // WithFlushInterval 设置日志文件刷新间隔 // - 默认为 3s,当日志文件刷新间隔 <= 0 时,将会在每次写入日志时刷新日志文件 func WithFlushInterval(interval time.Duration) Option { diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index a3264bea..7675abd5 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -5,8 +5,6 @@ import ( "github.com/kercylan98/minotaur/utils/file" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/super" - "path/filepath" - "strings" "sync" "time" ) @@ -19,10 +17,7 @@ var ( ) // Reg 注册一个运营日志记录器 -func Reg(name, filePath string, options ...Option) { - fn := filepath.Base(filePath) - ext := filepath.Ext(fn) - fn = strings.TrimSuffix(fn, ext) +func Reg(name string, flusher Flusher, options ...Option) { timerSurveyLock.Lock() defer timerSurveyLock.Unlock() @@ -31,16 +26,9 @@ func Reg(name, filePath string, options ...Option) { if exist { panic(fmt.Errorf("survey %s already exist", name)) } - dir := filepath.Dir(filePath) logger := &logger{ - dir: dir, - fn: fn, - fe: ext, - layout: time.DateOnly, - layoutLen: len(time.DateOnly), - dataLayout: time.DateTime, - dataLayoutLen: len(time.DateTime) + 3, - interval: time.Second * 3, + flusher: flusher, + interval: time.Second * 3, } for _, option := range options { option(logger) @@ -68,7 +56,7 @@ func Reg(name, filePath string, options ...Option) { } survey[name] = logger - log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext)) + log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("Info", logger.flusher.Info())) } // Record 记录一条运营日志 @@ -125,26 +113,6 @@ func Close(names ...string) { } } -// All 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic -// - handle 为并行执行的,需要自行处理并发安全 -func All(name string, t time.Time, handle func(record R)) { - timerSurveyLock.Lock() - logger := survey[name] - timerSurveyLock.Unlock() - if logger == nil { - return - } - fp := logger.filePath(t.Format(logger.layout)) - logger.wl.Lock() - defer logger.wl.Unlock() - err := file.ReadLineWithParallel(fp, 1*1024*1024*1024, func(s string) { - handle(R(s)) - }) - if err != nil { - panic(err) - } -} - // AllWithPath 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic // - handle 为并行执行的,需要自行处理并发安全 // - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据