From 7cf5c99e1412d2e30bd2dcab5005aec09933990f Mon Sep 17 00:00:00 2001 From: SchwarzSail <1424928981@qq.com> Date: Sun, 29 Sep 2024 18:08:41 +0800 Subject: [PATCH] fix: modify workqueue init --- cmd/classroom/main.go | 4 +- cmd/classroom/schedule.go | 96 ++++++++++++++++++++++++++++++++ cmd/classroom/service/service.go | 84 ---------------------------- pkg/logger/logger.go | 10 ++-- 4 files changed, 103 insertions(+), 91 deletions(-) create mode 100644 cmd/classroom/schedule.go diff --git a/cmd/classroom/main.go b/cmd/classroom/main.go index 93eaf7fd..f551db9b 100644 --- a/cmd/classroom/main.go +++ b/cmd/classroom/main.go @@ -8,7 +8,6 @@ import ( "github.com/cloudwego/kitex/server" etcd "github.com/kitex-contrib/registry-etcd" "github.com/west2-online/fzuhelper-server/cmd/classroom/dal" - "github.com/west2-online/fzuhelper-server/cmd/classroom/service" "github.com/west2-online/fzuhelper-server/config" classroom "github.com/west2-online/fzuhelper-server/kitex_gen/classroom/classroomservice" "github.com/west2-online/fzuhelper-server/pkg/constants" @@ -37,6 +36,7 @@ func Init() { func main() { Init() + InitWorkerQueue() r, err := etcd.NewEtcdRegistry([]string{config.Etcd.Addr}) if err != nil { klog.Fatal(err) @@ -73,7 +73,7 @@ func main() { }), ) //提前缓存空教室数据 - go service.CacheEmptyRooms() + go CacheEmptyRooms() if err = svr.Run(); err != nil { panic(err) } diff --git a/cmd/classroom/schedule.go b/cmd/classroom/schedule.go new file mode 100644 index 00000000..e3268708 --- /dev/null +++ b/cmd/classroom/schedule.go @@ -0,0 +1,96 @@ +package main + +import ( + "context" + "github.com/west2-online/fzuhelper-server/cmd/classroom/service" + "github.com/west2-online/fzuhelper-server/config" + "github.com/west2-online/fzuhelper-server/kitex_gen/classroom" + "github.com/west2-online/fzuhelper-server/pkg/constants" + "github.com/west2-online/fzuhelper-server/pkg/logger" + "github.com/west2-online/jwch" + "golang.org/x/time/rate" + "k8s.io/client-go/util/workqueue" + "strconv" + "time" +) + +var WorkQueue workqueue.RateLimitingInterface + +func InitWorkerQueue() { + WorkQueue = workqueue.NewNamedRateLimitingQueue( + workqueue.NewMaxOfRateLimiter( + // For syncRec failures(i.e. doRecommend return err), the retry time is (2*minutes)*2^ + // The maximum retry time is 24 hours + workqueue.NewItemExponentialFailureRateLimiter(constants.FailureRateLimiterBaseDelay, constants.FailureRateLimiterMaxDelay), + // 10 qps, 100 bucket size. This is only for retry speed, it's only the overall factor (not per item) + //每秒最多产生 10 个令牌(允许处理 10 个任务)。 + //100:令牌桶最多存储 100 个令牌,允许积累的最大任务数量 + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + constants.ClassroomService) + for i := 0; i < constants.NumWorkers; i++ { + go worker() + } +} + +// CacheEmptyRooms 缓存所有空教室的数据 +// 缓存一周的所有信息,每两天更新一次 +func CacheEmptyRooms() { + for { + var dates []string + currentTime := time.Now() + //设定一周时间 + for i := 0; i < 7; i++ { + date := currentTime.AddDate(0, 0, i).Format("2006-01-02") + dates = append(dates, date) + } + for _, date := range dates { + for _, campus := range constants.CampusArray { + for startTime := 1; startTime <= 11; startTime++ { + for endTime := startTime; endTime <= 11; endTime++ { + args := &classroom.EmptyRoomRequest{ + Date: date, + Campus: campus, + StartTime: strconv.Itoa(startTime), + EndTime: strconv.Itoa(endTime), + } + WorkQueue.Add(args) + logger.LoggerObj.Debugf("classroom.service.CacheEmptyRooms add task %v", args) + } + } + logger.LoggerObj.Infof("classroom.service.CacheEmptyRooms add all tasks of campus %v in the day %v", campus, date) + } + } + time.Sleep(constants.ScheduledTime) + } +} + +// 从工作队列取出task并处理 +func worker() { + for { + ctx := context.Background() + id, cookies := jwch.NewStudent().WithUser(config.DefaultUser.Account, config.DefaultUser.Password).GetIdentifierAndCookies() + l := service.NewClassroomService(ctx, id, cookies) + task, shutDown := WorkQueue.Get() + if shutDown { + logger.LoggerObj.Debug("classroom.service.worker worker shutDown") + return + } + func(task any) { + defer WorkQueue.Done(task) + args, ok := task.(*classroom.EmptyRoomRequest) + if !ok { + logger.LoggerObj.Errorf("classroom.service.worker task type error: %T", task) + return + } + _, err := l.GetEmptyRooms(args) + if err != nil { + logger.LoggerObj.Errorf("classroom.service.worker GetEmptyRooms failed, args %v: %v", err, args) + return + } + //将任务标记为完成 + WorkQueue.Forget(task) + logger.LoggerObj.Debug("classroom.service.worker task %v done", args) + }(task) + } +} diff --git a/cmd/classroom/service/service.go b/cmd/classroom/service/service.go index 1cacd312..7e87e1d9 100644 --- a/cmd/classroom/service/service.go +++ b/cmd/classroom/service/service.go @@ -3,15 +3,8 @@ package service import ( "context" "github.com/west2-online/fzuhelper-server/config" - "github.com/west2-online/fzuhelper-server/kitex_gen/classroom" - "github.com/west2-online/fzuhelper-server/pkg/constants" - "github.com/west2-online/fzuhelper-server/pkg/logger" "github.com/west2-online/jwch" - "golang.org/x/time/rate" - "k8s.io/client-go/util/workqueue" "net/http" - "strconv" - "time" ) type ClassroomService struct { @@ -37,80 +30,3 @@ func NewClassroomService(ctx context.Context, identifier string, cookies []*http cookies: cookies, } } - -// CacheEmptyRooms 缓存所有空教室的数据 -// 缓存一周的所有信息,每两天更新一次 -func CacheEmptyRooms() { - for { - ctx := context.Background() - id, cookies := jwch.NewStudent().WithUser(config.DefaultUser.Account, config.DefaultUser.Password).GetIdentifierAndCookies() - l := NewClassroomService(ctx, id, cookies) - //使用具有限速功能的工作队列,避免教务处的压力过大 - queue := workqueue.NewNamedRateLimitingQueue( - workqueue.NewMaxOfRateLimiter( - // For syncRec failures(i.e. doRecommend return err), the retry time is (2*minutes)*2^ - // The maximum retry time is 24 hours - workqueue.NewItemExponentialFailureRateLimiter(constants.FailureRateLimiterBaseDelay, constants.FailureRateLimiterMaxDelay), - // 10 qps, 100 bucket size. This is only for retry speed, it's only the overall factor (not per item) - //每秒最多产生 10 个令牌(允许处理 10 个任务)。 - //100:令牌桶最多存储 100 个令牌,允许积累的最大任务数量 - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), - constants.ClassroomService) - for i := 0; i < constants.NumWorkers; i++ { - go worker(queue, l) - } - var dates []string - currentTime := time.Now() - //设定一周时间 - for i := 0; i < 7; i++ { - date := currentTime.AddDate(0, 0, i).Format("2006-01-02") - dates = append(dates, date) - } - for _, date := range dates { - for _, campus := range constants.CampusArray { - for startTime := 1; startTime <= 11; startTime++ { - for endTime := startTime; endTime <= 11; endTime++ { - args := &classroom.EmptyRoomRequest{ - Date: date, - Campus: campus, - StartTime: strconv.Itoa(startTime), - EndTime: strconv.Itoa(endTime), - } - queue.Add(args) - logger.LoggerObj.Debugf("classroom.service.CacheEmptyRooms add task %v", args) - } - } - logger.LoggerObj.Infof("classroom.service.CacheEmptyRooms add all tasks of campus %v in the day %v", campus, date) - } - } - time.Sleep(constants.ScheduledTime) - } -} - -// 从工作队列取出task并处理 -func worker(queue workqueue.RateLimitingInterface, l *ClassroomService) { - for { - task, shutDown := queue.Get() - if shutDown { - logger.LoggerObj.Debug("classroom.service.worker worker shutDown") - return - } - func(task any) { - defer queue.Done(task) - args, ok := task.(*classroom.EmptyRoomRequest) - if !ok { - logger.LoggerObj.Errorf("classroom.service.worker task type error: %T", task) - return - } - _, err := l.GetEmptyRooms(args) - if err != nil { - logger.LoggerObj.Errorf("classroom.service.worker GetEmptyRooms failed, args %v: %v", err, args) - return - } - //将任务标记为完成 - queue.Forget(task) - logger.LoggerObj.Debug("classroom.service.worker task %v done", args) - }(task) - } -} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index c77cac26..3378ccac 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -12,11 +12,11 @@ var LoggerObj *zap.SugaredLogger func LoggerInit() { // 配置 zap 的日志等级和输出格式 config := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.InfoLevel), // 设置日志等级 - Development: false, // 非开发模式 - Encoding: "console", // 输出格式(json 或 console) - OutputPaths: []string{"stdout"}, // 输出目标 - ErrorOutputPaths: []string{"stderr"}, // 错误输出目标 + Level: zap.NewAtomicLevelAt(zap.DebugLevel), // 设置日志等级 + Development: false, // 非开发模式 + Encoding: "console", // 输出格式(json 或 console) + OutputPaths: []string{"stdout"}, // 输出目标 + ErrorOutputPaths: []string{"stderr"}, // 错误输出目标 EncoderConfig: zapcore.EncoderConfig{ TimeKey: "time", LevelKey: "level",