Skip to content

Commit

Permalink
fix: modify workqueue init
Browse files Browse the repository at this point in the history
  • Loading branch information
SchwarzSail committed Sep 29, 2024
1 parent 076d6f1 commit 7cf5c99
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 91 deletions.
4 changes: 2 additions & 2 deletions cmd/classroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/cloudwego/kitex/server"
etcd "github.com/kitex-contrib/registry-etcd"
"github.com/west2-online/fzuhelper-server/cmd/classroom/dal"
"github.com/west2-online/fzuhelper-server/cmd/classroom/service"
"github.com/west2-online/fzuhelper-server/config"
classroom "github.com/west2-online/fzuhelper-server/kitex_gen/classroom/classroomservice"
"github.com/west2-online/fzuhelper-server/pkg/constants"
Expand Down Expand Up @@ -37,6 +36,7 @@ func Init() {

func main() {
Init()
InitWorkerQueue()
r, err := etcd.NewEtcdRegistry([]string{config.Etcd.Addr})
if err != nil {
klog.Fatal(err)
Expand Down Expand Up @@ -73,7 +73,7 @@ func main() {
}),
)
//提前缓存空教室数据
go service.CacheEmptyRooms()
go CacheEmptyRooms()
if err = svr.Run(); err != nil {
panic(err)
}
Expand Down
96 changes: 96 additions & 0 deletions cmd/classroom/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"context"
"github.com/west2-online/fzuhelper-server/cmd/classroom/service"
"github.com/west2-online/fzuhelper-server/config"
"github.com/west2-online/fzuhelper-server/kitex_gen/classroom"
"github.com/west2-online/fzuhelper-server/pkg/constants"
"github.com/west2-online/fzuhelper-server/pkg/logger"
"github.com/west2-online/jwch"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
"strconv"
"time"
)

var WorkQueue workqueue.RateLimitingInterface

func InitWorkerQueue() {
WorkQueue = workqueue.NewNamedRateLimitingQueue(
workqueue.NewMaxOfRateLimiter(
// For syncRec failures(i.e. doRecommend return err), the retry time is (2*minutes)*2^<num-failures>
// The maximum retry time is 24 hours
workqueue.NewItemExponentialFailureRateLimiter(constants.FailureRateLimiterBaseDelay, constants.FailureRateLimiterMaxDelay),
// 10 qps, 100 bucket size. This is only for retry speed, it's only the overall factor (not per item)
//每秒最多产生 10 个令牌(允许处理 10 个任务)。
//100:令牌桶最多存储 100 个令牌,允许积累的最大任务数量
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
constants.ClassroomService)
for i := 0; i < constants.NumWorkers; i++ {
go worker()
}
}

// CacheEmptyRooms 缓存所有空教室的数据
// 缓存一周的所有信息,每两天更新一次
func CacheEmptyRooms() {
for {
var dates []string
currentTime := time.Now()
//设定一周时间
for i := 0; i < 7; i++ {
date := currentTime.AddDate(0, 0, i).Format("2006-01-02")
dates = append(dates, date)
}
for _, date := range dates {
for _, campus := range constants.CampusArray {
for startTime := 1; startTime <= 11; startTime++ {
for endTime := startTime; endTime <= 11; endTime++ {
args := &classroom.EmptyRoomRequest{
Date: date,
Campus: campus,
StartTime: strconv.Itoa(startTime),
EndTime: strconv.Itoa(endTime),
}
WorkQueue.Add(args)
logger.LoggerObj.Debugf("classroom.service.CacheEmptyRooms add task %v", args)
}
}
logger.LoggerObj.Infof("classroom.service.CacheEmptyRooms add all tasks of campus %v in the day %v", campus, date)
}
}
time.Sleep(constants.ScheduledTime)
}
}

// 从工作队列取出task并处理
func worker() {
for {
ctx := context.Background()
id, cookies := jwch.NewStudent().WithUser(config.DefaultUser.Account, config.DefaultUser.Password).GetIdentifierAndCookies()
l := service.NewClassroomService(ctx, id, cookies)
task, shutDown := WorkQueue.Get()
if shutDown {
logger.LoggerObj.Debug("classroom.service.worker worker shutDown")
return
}
func(task any) {
defer WorkQueue.Done(task)
args, ok := task.(*classroom.EmptyRoomRequest)
if !ok {
logger.LoggerObj.Errorf("classroom.service.worker task type error: %T", task)
return
}
_, err := l.GetEmptyRooms(args)
if err != nil {
logger.LoggerObj.Errorf("classroom.service.worker GetEmptyRooms failed, args %v: %v", err, args)
return
}
//将任务标记为完成
WorkQueue.Forget(task)
logger.LoggerObj.Debug("classroom.service.worker task %v done", args)
}(task)
}
}
84 changes: 0 additions & 84 deletions cmd/classroom/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,8 @@ package service
import (
"context"
"github.com/west2-online/fzuhelper-server/config"
"github.com/west2-online/fzuhelper-server/kitex_gen/classroom"
"github.com/west2-online/fzuhelper-server/pkg/constants"
"github.com/west2-online/fzuhelper-server/pkg/logger"
"github.com/west2-online/jwch"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
"net/http"
"strconv"
"time"
)

type ClassroomService struct {
Expand All @@ -37,80 +30,3 @@ func NewClassroomService(ctx context.Context, identifier string, cookies []*http
cookies: cookies,
}
}

// CacheEmptyRooms 缓存所有空教室的数据
// 缓存一周的所有信息,每两天更新一次
func CacheEmptyRooms() {
for {
ctx := context.Background()
id, cookies := jwch.NewStudent().WithUser(config.DefaultUser.Account, config.DefaultUser.Password).GetIdentifierAndCookies()
l := NewClassroomService(ctx, id, cookies)
//使用具有限速功能的工作队列,避免教务处的压力过大
queue := workqueue.NewNamedRateLimitingQueue(
workqueue.NewMaxOfRateLimiter(
// For syncRec failures(i.e. doRecommend return err), the retry time is (2*minutes)*2^<num-failures>
// The maximum retry time is 24 hours
workqueue.NewItemExponentialFailureRateLimiter(constants.FailureRateLimiterBaseDelay, constants.FailureRateLimiterMaxDelay),
// 10 qps, 100 bucket size. This is only for retry speed, it's only the overall factor (not per item)
//每秒最多产生 10 个令牌(允许处理 10 个任务)。
//100:令牌桶最多存储 100 个令牌,允许积累的最大任务数量
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
constants.ClassroomService)
for i := 0; i < constants.NumWorkers; i++ {
go worker(queue, l)
}
var dates []string
currentTime := time.Now()
//设定一周时间
for i := 0; i < 7; i++ {
date := currentTime.AddDate(0, 0, i).Format("2006-01-02")
dates = append(dates, date)
}
for _, date := range dates {
for _, campus := range constants.CampusArray {
for startTime := 1; startTime <= 11; startTime++ {
for endTime := startTime; endTime <= 11; endTime++ {
args := &classroom.EmptyRoomRequest{
Date: date,
Campus: campus,
StartTime: strconv.Itoa(startTime),
EndTime: strconv.Itoa(endTime),
}
queue.Add(args)
logger.LoggerObj.Debugf("classroom.service.CacheEmptyRooms add task %v", args)
}
}
logger.LoggerObj.Infof("classroom.service.CacheEmptyRooms add all tasks of campus %v in the day %v", campus, date)
}
}
time.Sleep(constants.ScheduledTime)
}
}

// 从工作队列取出task并处理
func worker(queue workqueue.RateLimitingInterface, l *ClassroomService) {
for {
task, shutDown := queue.Get()
if shutDown {
logger.LoggerObj.Debug("classroom.service.worker worker shutDown")
return
}
func(task any) {
defer queue.Done(task)
args, ok := task.(*classroom.EmptyRoomRequest)
if !ok {
logger.LoggerObj.Errorf("classroom.service.worker task type error: %T", task)
return
}
_, err := l.GetEmptyRooms(args)
if err != nil {
logger.LoggerObj.Errorf("classroom.service.worker GetEmptyRooms failed, args %v: %v", err, args)
return
}
//将任务标记为完成
queue.Forget(task)
logger.LoggerObj.Debug("classroom.service.worker task %v done", args)
}(task)
}
}
10 changes: 5 additions & 5 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ var LoggerObj *zap.SugaredLogger
func LoggerInit() {
// 配置 zap 的日志等级和输出格式
config := zap.Config{
Level: zap.NewAtomicLevelAt(zap.InfoLevel), // 设置日志等级
Development: false, // 非开发模式
Encoding: "console", // 输出格式(json 或 console)
OutputPaths: []string{"stdout"}, // 输出目标
ErrorOutputPaths: []string{"stderr"}, // 错误输出目标
Level: zap.NewAtomicLevelAt(zap.DebugLevel), // 设置日志等级
Development: false, // 非开发模式
Encoding: "console", // 输出格式(json 或 console)
OutputPaths: []string{"stdout"}, // 输出目标
ErrorOutputPaths: []string{"stderr"}, // 错误输出目标
EncoderConfig: zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
Expand Down

0 comments on commit 7cf5c99

Please sign in to comment.