Skip to content

Commit

Permalink
feat: 添加协议服务器状态
Browse files Browse the repository at this point in the history
  • Loading branch information
godLei6 committed Oct 22, 2024
1 parent 0d1609b commit b22f080
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ go 1.21.4

require (
gitee.com/unitedrhino/core v1.0.0
gitee.com/unitedrhino/share v1.0.8
gitee.com/unitedrhino/share v1.0.9
gitee.com/unitedrhino/squirrel v1.20.5
github.com/dgraph-io/ristretto v0.1.0
github.com/dop251/goja v0.0.0-20230402114112-623f9dda9079
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ import (
"time"
)

type StaticHandle struct {
type HalfHourHandle struct {
svcCtx *svc.ServiceContext
ctx context.Context
logx.Logger
}

func NewStaticHandle(ctx context.Context, svcCtx *svc.ServiceContext) *StaticHandle {
return &StaticHandle{
func NewHalfHourHandle(ctx context.Context, svcCtx *svc.ServiceContext) *HalfHourHandle {
return &HalfHourHandle{
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
ctx: ctx,
}
}

func (l *StaticHandle) Handle() error { //产品品类设备数量统计
func (l *HalfHourHandle) Handle() error { //产品品类设备数量统计
w := sync.WaitGroup{}
w.Add(6)
utils.Go(l.ctx, func() {
Expand Down Expand Up @@ -86,7 +86,7 @@ func (l *StaticHandle) Handle() error { //产品品类设备数量统计
w.Wait()
return nil
}
func (l *StaticHandle) AreaDeviceStatic() error { //区域下的设备数量统计
func (l *HalfHourHandle) AreaDeviceStatic() error { //区域下的设备数量统计
ret, err := l.svcCtx.AreaM.AreaInfoIndex(l.ctx, &sys.AreaInfoIndexReq{})
if err != nil {
return err
Expand All @@ -101,7 +101,7 @@ func (l *StaticHandle) AreaDeviceStatic() error { //区域下的设备数量统

var count atomic.Int64

func (l *StaticHandle) DeviceOnlineFix() error { //设备在线修复
func (l *HalfHourHandle) DeviceOnlineFix() error { //设备在线修复
nc := count.Add(1)
if nc/2 == 1 { //1小时处理一次
return nil
Expand Down Expand Up @@ -130,7 +130,7 @@ func (l *StaticHandle) DeviceOnlineFix() error { //设备在线修复
return nil
}

func (l *StaticHandle) DeviceExp() error { //设备过期处理
func (l *HalfHourHandle) DeviceExp() error { //设备过期处理
{ //有效期到了之后不启用
err := relationDB.NewDeviceInfoRepo(l.ctx).UpdateWithField(l.ctx,
relationDB.DeviceFilter{ExpTime: stores.CmpAnd(stores.CmpLte(time.Now()), stores.CmpIsNull(false))},
Expand All @@ -149,7 +149,7 @@ func (l *StaticHandle) DeviceExp() error { //设备过期处理
}
return nil
}
func (l *StaticHandle) DeviceAbnormalRecover() error { //设备上下线异常恢复
func (l *HalfHourHandle) DeviceAbnormalRecover() error { //设备上下线异常恢复
now := time.Now()
dis, err := relationDB.NewDeviceInfoRepo(l.ctx).FindByFilter(l.ctx, relationDB.DeviceFilter{
Statuses: []int64{def.DeviceStatusAbnormal},
Expand Down Expand Up @@ -189,7 +189,7 @@ func (l *StaticHandle) DeviceAbnormalRecover() error { //设备上下线异常
return nil
}

func (l *StaticHandle) DeviceAbnormalSet() error { //设备上下线异常设置
func (l *HalfHourHandle) DeviceAbnormalSet() error { //设备上下线异常设置
now := time.Now()
dis, err := relationDB.NewDeviceInfoRepo(l.ctx).FindByFilter(l.ctx, relationDB.DeviceFilter{
LastLoginTime: &def.TimeRange{
Expand Down Expand Up @@ -232,7 +232,7 @@ func (l *StaticHandle) DeviceAbnormalSet() error { //设备上下线异常设置
return nil
}

func (l *StaticHandle) DeviceMsgCount() error { //产品品类设备数量统计
func (l *HalfHourHandle) DeviceMsgCount() error { //产品品类设备数量统计
end := time.Now()
var fm = end.Minute() / 30 * 30
var countData []*relationDB.DmDeviceMsgCount
Expand Down Expand Up @@ -273,7 +273,7 @@ func (l *StaticHandle) DeviceMsgCount() error { //产品品类设备数量统计
return nil
}

func (l *StaticHandle) ProductCategoryStatic() error { //产品品类设备数量统计
func (l *HalfHourHandle) ProductCategoryStatic() error { //产品品类设备数量统计
pcDB := relationDB.NewProductCategoryRepo(l.ctx)
pcs, err := pcDB.FindByFilter(l.ctx, relationDB.ProductCategoryFilter{}, nil)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions service/dmsvr/internal/event/staticEvent/oneMinute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package staticEvent

import (
"context"
"gitee.com/unitedrhino/share/stores"
"gitee.com/unitedrhino/things/service/dmsvr/internal/repo/relationDB"
"gitee.com/unitedrhino/things/service/dmsvr/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)

type OneMinuteHandle struct {
svcCtx *svc.ServiceContext
ctx context.Context
logx.Logger
}

func NewOneMinuteHandle(ctx context.Context, svcCtx *svc.ServiceContext) *OneMinuteHandle {
return &OneMinuteHandle{
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
ctx: ctx,
}
}

func (l *OneMinuteHandle) Handle() error { //产品品类设备数量统计
err := stores.WithNoDebug(l.ctx, relationDB.NewProtocolServiceRepo).DownStatus(l.ctx)
if err != nil {
l.Error(err)
}
return nil
}
8 changes: 8 additions & 0 deletions service/dmsvr/internal/repo/relationDB/protocolService.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package relationDB

import (
"context"
"gitee.com/unitedrhino/share/def"
"gitee.com/unitedrhino/share/stores"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"time"
)

/*
Expand Down Expand Up @@ -82,6 +84,12 @@ func (p ProtocolServiceRepo) Update(ctx context.Context, data *DmProtocolService
return stores.ErrFmt(err)
}

func (p ProtocolServiceRepo) DownStatus(ctx context.Context) error {
err := p.db.WithContext(ctx).Model(DmProtocolService{}).Where("status = ? and updated_time<?", def.True, time.Now().Add(-time.Minute*5)).
Update("status", def.False).Error
return stores.ErrFmt(err)
}

func (p ProtocolServiceRepo) DeleteByFilter(ctx context.Context, f ProtocolServiceFilter) error {
db := p.fmtFilter(ctx, f)
err := db.Delete(&DmProtocolService{}).Error
Expand Down
25 changes: 21 additions & 4 deletions service/dmsvr/internal/startup/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,14 @@ func InitEventBus(svcCtx *svc.ServiceContext) {
if t.Before(time.Now().Add(-time.Second * 2)) { //2秒之前的跳过
return nil
}
return staticEvent.NewStaticHandle(ctxs.WithRoot(ctx), svcCtx).Handle()
return staticEvent.NewHalfHourHandle(ctxs.WithRoot(ctx), svcCtx).Handle()
})
logx.Must(err)
err = svcCtx.FastEvent.QueueSubscribe(eventBus.DmDeviceStaticOneMinute, func(ctx context.Context, t time.Time, body []byte) error {
if t.Before(time.Now().Add(-time.Second * 2)) { //2秒之前的跳过
return nil
}
return staticEvent.NewOneMinuteHandle(ctxs.WithRoot(ctx), svcCtx).Handle()
})
logx.Must(err)
err = svcCtx.FastEvent.Start()
Expand All @@ -396,7 +403,7 @@ func TimerInit(svcCtx *svc.ServiceContext) {
_, err := svcCtx.TimedM.TaskInfoCreate(ctx, &timedmanage.TaskInfo{
GroupCode: def.TimedUnitedRhinoQueueGroupCode, //组编码
Type: 1, //任务类型 1 定时任务 2 延时任务
Name: "iThings ota升级定时任务", // 任务名称
Name: "联犀 ota升级定时任务", // 任务名称
Code: "iThingsOtaDeviceUpgradePush", //任务编码
Params: fmt.Sprintf(`{"topic":"%s","payload":""}`, eventBus.DmOtaDeviceUpgradePush), // 任务参数,延时任务如果没有传任务参数会拿数据库的参数来执行
CronExpr: "@every 5s", // cron执行表达式
Expand All @@ -409,7 +416,7 @@ func TimerInit(svcCtx *svc.ServiceContext) {
_, err = svcCtx.TimedM.TaskInfoCreate(ctx, &timedmanage.TaskInfo{
GroupCode: def.TimedUnitedRhinoQueueGroupCode, //组编码
Type: 1, //任务类型 1 定时任务 2 延时任务
Name: "iThings 设备在线状态改变处理", // 任务名称
Name: "联犀 设备在线状态改变处理", // 任务名称
Code: "dmDeviceOnlineStatusChange", //任务编码
Params: fmt.Sprintf(`{"topic":"%s","payload":""}`, eventBus.DmDeviceOnlineStatusChange), // 任务参数,延时任务如果没有传任务参数会拿数据库的参数来执行
CronExpr: "@every 1s", // cron执行表达式
Expand All @@ -422,13 +429,23 @@ func TimerInit(svcCtx *svc.ServiceContext) {
_, err = svcCtx.TimedM.TaskInfoCreate(ctx, &timedmanage.TaskInfo{
GroupCode: def.TimedUnitedRhinoQueueGroupCode, //组编码
Type: 1, //任务类型 1 定时任务 2 延时任务
Name: "iThings 设备半小时统计", // 任务名称
Name: "联犀 设备半小时统计", // 任务名称
Code: "dmDeviceStaticHalfHour", //任务编码
Params: fmt.Sprintf(`{"topic":"%s","payload":""}`, eventBus.DmDeviceStaticHalfHour), // 任务参数,延时任务如果没有传任务参数会拿数据库的参数来执行
CronExpr: "@every 30m", // cron执行表达式
Status: def.StatusWaitRun, // 状态
Priority: 3, //优先级: 10:critical 最高优先级 3: default 普通优先级 1:low 低优先级
})
_, err = svcCtx.TimedM.TaskInfoCreate(ctx, &timedmanage.TaskInfo{
GroupCode: def.TimedUnitedRhinoQueueGroupCode, //组编码
Type: 1, //任务类型 1 定时任务 2 延时任务
Name: "联犀 设备1分钟统计", // 任务名称
Code: "dmDeviceStaticOneMinute", //任务编码
Params: fmt.Sprintf(`{"topic":"%s","payload":""}`, eventBus.DmDeviceStaticOneMinute), // 任务参数,延时任务如果没有传任务参数会拿数据库的参数来执行
CronExpr: "@every 1m", // cron执行表达式
Status: def.StatusWaitRun, // 状态
Priority: 3, //优先级: 10:critical 最高优先级 3: default 普通优先级 1:low 低优先级
})
if err != nil && !errors.Cmp(errors.Fmt(err), errors.Duplicate) {
logx.Must(err)
}
Expand Down

0 comments on commit b22f080

Please sign in to comment.