Skip to content

Commit

Permalink
refactor: 优化 survey,移除 All 函数,新增 Flusher 接口,可自行实现其他持久化方式
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Aug 25, 2023
1 parent c6f8c19 commit d9ba1bc
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 93 deletions.
9 changes: 9 additions & 0 deletions utils/log/survey/flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package survey

// Flusher 用于刷新缓冲区的接口
type Flusher interface {
// Flush 将缓冲区的数据持久化
Flush(records []string)
// Info 返回当前刷新器的信息
Info() string
}
73 changes: 73 additions & 0 deletions utils/log/survey/flusher_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package survey

import (
"bufio"
"fmt"
"github.com/kercylan98/minotaur/utils/log"
"os"
"path/filepath"
"strings"
"time"
)

// NewFileFlusher 创建一个文件刷新器
// - layout 为日志文件名的时间戳格式 (默认为 time.DateOnly)
func NewFileFlusher(filePath string, layout ...string) *FileFlusher {
fn := filepath.Base(filePath)
ext := filepath.Ext(fn)
fn = strings.TrimSuffix(fn, ext)
dir := filepath.Dir(filePath)
fl := &FileFlusher{
dir: dir,
fn: fn,
fe: ext,
layout: time.DateOnly,
layoutLen: len(time.DateOnly),
}
if len(layout) > 0 {
fl.layout = layout[0]
fl.layoutLen = len(fl.layout)
}
return fl
}

type FileFlusher struct {
dir string
fn string
fe string
layout string
layoutLen int
}

func (slf *FileFlusher) Flush(records []string) {
var (
file *os.File
writer *bufio.Writer
err error
last string
)
for _, data := range records {
tick := data[0:slf.layoutLen]
if tick != last {
if file != nil {
_ = writer.Flush()
_ = file.Close()
}
fp := filepath.Join(slf.dir, fmt.Sprintf("%s.%s%s", slf.fn, tick, slf.fe))
file, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal("Survey", log.String("Action", "DateSwitch"), log.String("FilePath", fp), log.Err(err))
return
}
writer = bufio.NewWriterSize(file, 1024*10240)
last = tick
}
_, _ = writer.WriteString(data)
}
_ = writer.Flush()
_ = file.Close()
}

func (slf *FileFlusher) Info() string {
return fmt.Sprintf("%s/%s.${DATE}%s", slf.dir, slf.fn, slf.fe)
}
54 changes: 6 additions & 48 deletions utils/log/survey/logger.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
package survey

import (
"bufio"
"fmt"
"github.com/kercylan98/minotaur/utils/log"
"os"
"path/filepath"
"sync"
"time"
)

// logger 用于埋点数据的运营日志记录器
type logger struct {
bl sync.Mutex // writer lock
wl sync.Mutex // flush lock
dir string
fn string
fe string
bs []string
layout string
layoutLen int
dataLayout string
dataLayoutLen int
interval time.Duration
bl sync.Mutex // writer lock
wl sync.Mutex // flush lock
bs []string
interval time.Duration
flusher Flusher
}

// flush 将记录器缓冲区的数据写入到文件
Expand All @@ -39,33 +28,7 @@ func (slf *logger) flush() {

slf.wl.Lock()
defer slf.wl.Unlock()

var (
file *os.File
writer *bufio.Writer
err error
last string
)
for _, data := range ds {
tick := data[0:slf.layoutLen]
if tick != last {
if file != nil {
_ = writer.Flush()
_ = file.Close()
}
fp := slf.filePath(tick)
file, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal("Survey", log.String("Action", "DateSwitch"), log.String("FilePath", fp), log.Err(err))
return
}
writer = bufio.NewWriterSize(file, 1024*10240)
last = tick
}
_, _ = writer.WriteString(data)
}
_ = writer.Flush()
_ = file.Close()
slf.flusher.Flush(ds)
}

// writer 写入数据到记录器缓冲区
Expand All @@ -77,8 +40,3 @@ func (slf *logger) writer(d string) {
slf.flush()
}
}

// filePath 获取文件路径
func (slf *logger) filePath(t string) string {
return filepath.Join(slf.dir, fmt.Sprintf("%s.%s%s", slf.fn, t, slf.fe))
}
9 changes: 0 additions & 9 deletions utils/log/survey/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@ import "time"
// Option 选项
type Option func(logger *logger)

// WithLayout 设置日志文件名的时间戳格式
// - 默认为 time.DateOnly
func WithLayout(layout string) Option {
return func(logger *logger) {
logger.layout = layout
logger.layoutLen = len(layout)
}
}

// WithFlushInterval 设置日志文件刷新间隔
// - 默认为 3s,当日志文件刷新间隔 <= 0 时,将会在每次写入日志时刷新日志文件
func WithFlushInterval(interval time.Duration) Option {
Expand Down
40 changes: 4 additions & 36 deletions utils/log/survey/survey.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"github.com/kercylan98/minotaur/utils/file"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"path/filepath"
"strings"
"sync"
"time"
)
Expand All @@ -19,10 +17,7 @@ var (
)

// Reg 注册一个运营日志记录器
func Reg(name, filePath string, options ...Option) {
fn := filepath.Base(filePath)
ext := filepath.Ext(fn)
fn = strings.TrimSuffix(fn, ext)
func Reg(name string, flusher Flusher, options ...Option) {

timerSurveyLock.Lock()
defer timerSurveyLock.Unlock()
Expand All @@ -31,16 +26,9 @@ func Reg(name, filePath string, options ...Option) {
if exist {
panic(fmt.Errorf("survey %s already exist", name))
}
dir := filepath.Dir(filePath)
logger := &logger{
dir: dir,
fn: fn,
fe: ext,
layout: time.DateOnly,
layoutLen: len(time.DateOnly),
dataLayout: time.DateTime,
dataLayoutLen: len(time.DateTime) + 3,
interval: time.Second * 3,
flusher: flusher,
interval: time.Second * 3,
}
for _, option := range options {
option(logger)
Expand Down Expand Up @@ -68,7 +56,7 @@ func Reg(name, filePath string, options ...Option) {
}

survey[name] = logger
log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext))
log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("Info", logger.flusher.Info()))
}

// Record 记录一条运营日志
Expand Down Expand Up @@ -125,26 +113,6 @@ func Close(names ...string) {
}
}

// All 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
// - handle 为并行执行的,需要自行处理并发安全
func All(name string, t time.Time, handle func(record R)) {
timerSurveyLock.Lock()
logger := survey[name]
timerSurveyLock.Unlock()
if logger == nil {
return
}
fp := logger.filePath(t.Format(logger.layout))
logger.wl.Lock()
defer logger.wl.Unlock()
err := file.ReadLineWithParallel(fp, 1*1024*1024*1024, func(s string) {
handle(R(s))
})
if err != nil {
panic(err)
}
}

// AllWithPath 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
// - handle 为并行执行的,需要自行处理并发安全
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
Expand Down

0 comments on commit d9ba1bc

Please sign in to comment.