From 5b3b0263cb1d5f6e5000b117f3d66b17cbcc0f89 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Wed, 4 Sep 2024 11:21:58 +0800 Subject: [PATCH] fix: fix last collect time conflicts with update time for sync audit plan --- sqle/api/controller/v1/instance_audit_plan.go | 24 +++--- sqle/model/instance_audit_plan.go | 85 ++++++++++++++----- sqle/model/utils.go | 1 + sqle/server/auditplan/manager.go | 3 +- sqle/server/auditplan/task_wrap.go | 1 - 5 files changed, 79 insertions(+), 35 deletions(-) diff --git a/sqle/api/controller/v1/instance_audit_plan.go b/sqle/api/controller/v1/instance_audit_plan.go index e02579fcc..23e523b47 100644 --- a/sqle/api/controller/v1/instance_audit_plan.go +++ b/sqle/api/controller/v1/instance_audit_plan.go @@ -191,7 +191,7 @@ func CreateInstanceAuditPlan(c echo.Context) error { AuditPlans: auditPlans, ActiveStatus: model.ActiveStatusNormal, } - err = s.Save(ap) + err = s.SaveInstanceAuditPlan(ap) if err != nil { return controller.JSONBaseErrorReq(c, err) } @@ -781,16 +781,18 @@ func GetInstanceAuditPlanOverview(c echo.Context) error { typeBase := ConvertAuditPlanTypeToRes(v.ID, v.Type) resAuditPlan := InstanceAuditPlanInfo{ - ID: v.ID, - Type: typeBase, - DBType: detail.DBType, - InstanceName: inst.Name, - ExecCmd: execCmd, - RuleTemplate: ruleTemplate, - TotalSQLNums: totalSQLNums, - UnsolvedSQLNums: unsolvedSQLNums, - LastCollectionTime: v.LastCollectionTime, - ActiveStatus: v.ActiveStatus, + ID: v.ID, + Type: typeBase, + DBType: detail.DBType, + InstanceName: inst.Name, + ExecCmd: execCmd, + RuleTemplate: ruleTemplate, + TotalSQLNums: totalSQLNums, + UnsolvedSQLNums: unsolvedSQLNums, + ActiveStatus: v.ActiveStatus, + } + if v.AuditPlanTaskInfo != nil { + resAuditPlan.LastCollectionTime = v.AuditPlanTaskInfo.LastCollectionTime } resAuditPlans = append(resAuditPlans, resAuditPlan) } diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 637b90926..51ab3c0e4 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -111,13 +111,10 @@ func (s *Storage) UpdateInstanceAuditPlanByID(id uint, attrs map[string]interfac return errors.New(errors.ConnectStorageError, err) } -// GetLatestAuditPlanIds 获取所有变更过的记录,包括删除 -// 采集时会更新last_collection_time会同步更新updated_at,此处获取updated_at > last_collection_time的任务,即为配置变更过的任务 -// 影响:会查出所有被删除的任务,在syncTask时做一次额外的删除操作 -func (s *Storage) GetLatestAuditPlanRecordsV2() ([]*AuditPlanDetail, error) { +// 获取所有变更过的记录,包括删除 +func (s *Storage) GetLatestAuditPlanRecordsV2(after time.Time) ([]*AuditPlanDetail, error) { var aps []*AuditPlanDetail - err := s.db.Unscoped().Model(AuditPlanV2{}).Select("audit_plans_v2.id, audit_plans_v2.updated_at,audit_plans_v2.last_collection_time"). - Where("(audit_plans_v2.updated_at > audit_plans_v2.last_collection_time OR last_collection_time IS NULL)").Order("updated_at").Scan(&aps).Error + err := s.db.Unscoped().Model(AuditPlanV2{}).Select("id, updated_at").Where("updated_at > ?", after).Order("updated_at").Find(&aps).Error return aps, errors.New(errors.ConnectStorageError, err) } @@ -131,9 +128,15 @@ type AuditPlanV2 struct { HighPriorityParams params.ParamsWithOperator `json:"high_priority_params" gorm:"type:varchar(1000)"` NeedMarkHighPrioritySQL bool `json:"need_mark_high_priority_sql"` ActiveStatus string `json:"active_status" gorm:"type:varchar(255)"` - LastCollectionTime *time.Time `json:"last_collection_time" gorm:"type:datetime(3)"` - AuditPlanSQLs []*SQLManageRecord `gorm:"-"` + AuditPlanSQLs []*SQLManageRecord `gorm:"-"` + AuditPlanTaskInfo *AuditPlanTaskInfo `gorm:"foreignkey:AuditPlanID"` +} + +type AuditPlanTaskInfo struct { + Model + AuditPlanID uint `json:"audit_plan_id" gorm:"not null"` + LastCollectionTime *time.Time `json:"last_collection_time" gorm:"type:datetime(3)"` } func (a AuditPlanV2) TableName() string { @@ -306,7 +309,7 @@ func (s *Storage) GetAuditPlanByInstanceIdAndType(instanceAuditPlanID string, au func (s *Storage) GetInstanceAuditPlanDetail(instanceAuditPlanID string) (*InstanceAuditPlan, bool, error) { instanceAuditPlan := &InstanceAuditPlan{} - err := s.db.Model(InstanceAuditPlan{}).Where("id = ?", instanceAuditPlanID).Preload("AuditPlans").First(&instanceAuditPlan).Error + err := s.db.Model(InstanceAuditPlan{}).Where("id = ?", instanceAuditPlanID).Preload("AuditPlans").Preload("AuditPlans.AuditPlanTaskInfo").First(&instanceAuditPlan).Error if err == gorm.ErrRecordNotFound { return instanceAuditPlan, false, nil } @@ -322,11 +325,42 @@ func (s *Storage) GetAuditPlanTotalSQL(auditPlanID uint) (int64, error) { return count, errors.ConnectStorageErrWrapper(err) } +func (s *Storage) SaveInstanceAuditPlan(instAuditPlans *InstanceAuditPlan) error { + return s.Tx(func(txDB *gorm.DB) error { + if err := txDB.Save(instAuditPlans).Error; err != nil { + return err + } + apTaskInfos := make([]*AuditPlanTaskInfo, 0, len(instAuditPlans.AuditPlans)) + for _, auditPlan := range instAuditPlans.AuditPlans { + apTaskInfos = append(apTaskInfos, &AuditPlanTaskInfo{ + AuditPlanID: auditPlan.ID, + }) + } + if err := txDB.Save(apTaskInfos).Error; err != nil { + return err + } + return nil + }) +} + func (s *Storage) BatchSaveAuditPlans(auditPlans []*AuditPlanV2) error { return s.Tx(func(txDB *gorm.DB) error { for _, auditPlan := range auditPlans { - if err := txDB.Save(auditPlan).Error; err != nil { - return err + // 新增的扫描任务类型需要保存audit task info + if auditPlan.ID == 0 { + if err := txDB.Save(auditPlan).Error; err != nil { + return err + } + apTaskInfo := &AuditPlanTaskInfo{ + AuditPlanID: auditPlan.ID, + } + if err := txDB.Save(apTaskInfo).Error; err != nil { + return err + } + } else { + if err := txDB.Save(auditPlan).Error; err != nil { + return err + } } } return nil @@ -345,12 +379,14 @@ func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error { } err = txDB.Exec(`UPDATE instance_audit_plans iap LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id - LEFT JOIN sql_manage_records oms ON oms.source_id = ap.instance_audit_plan_id AND oms.source = ap.type - LEFT JOIN sql_manage_record_processes sm ON sm.sql_manage_record_id = oms.id + LEFT JOIN audit_plan_task_infos apti ON apti.audit_plan_id = ap.id + LEFT JOIN sql_manage_records smr ON smr.source_id = ap.instance_audit_plan_id AND smr.source = ap.type + LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id SET iap.deleted_at = now(), ap.deleted_at = now(), - oms.deleted_at = now(), - sm.deleted_at = now() + smr.deleted_at = now(), + smrp.deleted_at = now(), + apti.deleted_at = now() WHERE iap.ID = ?`, instanceAuditPlanId).Error if err != nil { return err @@ -369,11 +405,13 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error { return err } err = txDB.Exec(`UPDATE audit_plans_v2 ap - LEFT JOIN sql_manage_records oms ON oms.source_id = ap.instance_audit_plan_id AND oms.source = ap.type - LEFT JOIN sql_manage_record_processes sm ON sm.sql_manage_record_id = oms.id + LEFT JOIN audit_plan_task_infos apti ON apti.audit_plan_id = ap.id + LEFT JOIN sql_manage_records smr ON smr.source_id = ap.instance_audit_plan_id AND smr.source = ap.type + LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id SET ap.deleted_at = now(), - oms.deleted_at = now(), - sm.deleted_at = now() + smr.deleted_at = now(), + smrp.deleted_at = now(), + apti.deleted_at = now() WHERE ap.id = ?`, auditPlanID).Error if err != nil { return err @@ -456,12 +494,15 @@ func (s *Storage) UpdateManagerSQL(sql *SQLManageRecord) error { func (s *Storage) UpdateManagerSQLStatus(sql *SQLManageRecord) error { const query = ` INSERT INTO sql_manage_record_processes (sql_manage_record_id) - SELECT oms.id FROM sql_manage_records oms WHERE oms.sql_id = ? + SELECT smr.id FROM sql_manage_records smr WHERE smr.sql_id = ? ON DUPLICATE KEY UPDATE sql_manage_record_id = VALUES(sql_manage_record_id);` return s.db.Exec(query, sql.SQLID).Error } func (s *Storage) UpdateAuditPlanLastCollectionTime(auditPlanID uint, collectionTime time.Time) error { - const query = `UPDATE audit_plans_v2 SET last_collection_time = now(3) WHERE id = ?;` - return s.db.Exec(query, auditPlanID).Error + err := s.db.Model(AuditPlanTaskInfo{}).Where("audit_plan_id = ?", auditPlanID).Update("last_collection_time", collectionTime).Error + if err != nil { + return err + } + return nil } diff --git a/sqle/model/utils.go b/sqle/model/utils.go index 322da010a..438ea30ec 100644 --- a/sqle/model/utils.go +++ b/sqle/model/utils.go @@ -160,6 +160,7 @@ var autoMigrateList = []interface{}{ &FeishuScheduledRecord{}, &InstanceAuditPlan{}, &AuditPlanV2{}, + &AuditPlanTaskInfo{}, &SQLManageRecord{}, &SQLManageRecordProcess{}, &SQLManageQueue{}, diff --git a/sqle/server/auditplan/manager.go b/sqle/server/auditplan/manager.go index ce5987106..f7ebec274 100644 --- a/sqle/server/auditplan/manager.go +++ b/sqle/server/auditplan/manager.go @@ -230,7 +230,7 @@ func (mgr *Manager) sync() error { } } // 增量同步智能扫描任务,根据数据库记录的更新时间筛选,更新后将下次筛选的时间为上一次记录的最晚的更新时间。 - aps, err := mgr.persist.GetLatestAuditPlanRecordsV2() + aps, err := mgr.persist.GetLatestAuditPlanRecordsV2(*mgr.lastSyncTime) if err != nil { return err } @@ -241,6 +241,7 @@ func (mgr *Manager) sync() error { if err != nil { mgr.logger.WithField("id", ap.ID).Errorf("sync audit task failed, error: %v", err) } + mgr.lastSyncTime = &ap.UpdatedAt } return nil } diff --git a/sqle/server/auditplan/task_wrap.go b/sqle/server/auditplan/task_wrap.go index 08878837f..bda2a50c3 100644 --- a/sqle/server/auditplan/task_wrap.go +++ b/sqle/server/auditplan/task_wrap.go @@ -230,7 +230,6 @@ func (at *TaskWrapper) extractSQL() { err = at.persist.UpdateAuditPlanLastCollectionTime(at.ap.ID, collectionTime) if err != nil { at.logger.Errorf("update audit plan last collection time failed, error : %v", err) - return } if len(sqls) == 0 { at.logger.Info("extract sql list is empty, skip")