Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix last collect time conflicts with update time for sync audit … #2583

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions sqle/api/controller/v1/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func CreateInstanceAuditPlan(c echo.Context) error {
AuditPlans: auditPlans,
ActiveStatus: model.ActiveStatusNormal,
}
err = s.Save(ap)
err = s.SaveInstanceAuditPlan(ap)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
Expand Down Expand Up @@ -781,16 +781,18 @@ func GetInstanceAuditPlanOverview(c echo.Context) error {

typeBase := ConvertAuditPlanTypeToRes(v.ID, v.Type)
resAuditPlan := InstanceAuditPlanInfo{
ID: v.ID,
Type: typeBase,
DBType: detail.DBType,
InstanceName: inst.Name,
ExecCmd: execCmd,
RuleTemplate: ruleTemplate,
TotalSQLNums: totalSQLNums,
UnsolvedSQLNums: unsolvedSQLNums,
LastCollectionTime: v.LastCollectionTime,
ActiveStatus: v.ActiveStatus,
ID: v.ID,
Type: typeBase,
DBType: detail.DBType,
InstanceName: inst.Name,
ExecCmd: execCmd,
RuleTemplate: ruleTemplate,
TotalSQLNums: totalSQLNums,
UnsolvedSQLNums: unsolvedSQLNums,
ActiveStatus: v.ActiveStatus,
}
if v.AuditPlanTaskInfo != nil {
resAuditPlan.LastCollectionTime = v.AuditPlanTaskInfo.LastCollectionTime
}
resAuditPlans = append(resAuditPlans, resAuditPlan)
}
Expand Down
85 changes: 63 additions & 22 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,10 @@ func (s *Storage) UpdateInstanceAuditPlanByID(id uint, attrs map[string]interfac
return errors.New(errors.ConnectStorageError, err)
}

// GetLatestAuditPlanIds 获取所有变更过的记录,包括删除
// 采集时会更新last_collection_time会同步更新updated_at,此处获取updated_at > last_collection_time的任务,即为配置变更过的任务
// 影响:会查出所有被删除的任务,在syncTask时做一次额外的删除操作
func (s *Storage) GetLatestAuditPlanRecordsV2() ([]*AuditPlanDetail, error) {
// 获取所有变更过的记录,包括删除
func (s *Storage) GetLatestAuditPlanRecordsV2(after time.Time) ([]*AuditPlanDetail, error) {
var aps []*AuditPlanDetail
err := s.db.Unscoped().Model(AuditPlanV2{}).Select("audit_plans_v2.id, audit_plans_v2.updated_at,audit_plans_v2.last_collection_time").
Where("(audit_plans_v2.updated_at > audit_plans_v2.last_collection_time OR last_collection_time IS NULL)").Order("updated_at").Scan(&aps).Error
err := s.db.Unscoped().Model(AuditPlanV2{}).Select("id, updated_at").Where("updated_at > ?", after).Order("updated_at").Find(&aps).Error
return aps, errors.New(errors.ConnectStorageError, err)
}

Expand All @@ -131,9 +128,15 @@ type AuditPlanV2 struct {
HighPriorityParams params.ParamsWithOperator `json:"high_priority_params" gorm:"type:varchar(1000)"`
NeedMarkHighPrioritySQL bool `json:"need_mark_high_priority_sql"`
ActiveStatus string `json:"active_status" gorm:"type:varchar(255)"`
LastCollectionTime *time.Time `json:"last_collection_time" gorm:"type:datetime(3)"`

AuditPlanSQLs []*SQLManageRecord `gorm:"-"`
AuditPlanSQLs []*SQLManageRecord `gorm:"-"`
AuditPlanTaskInfo *AuditPlanTaskInfo `gorm:"foreignkey:AuditPlanID"`
}

type AuditPlanTaskInfo struct {
Model
AuditPlanID uint `json:"audit_plan_id" gorm:"not null"`
LastCollectionTime *time.Time `json:"last_collection_time" gorm:"type:datetime(3)"`
}

func (a AuditPlanV2) TableName() string {
Expand Down Expand Up @@ -306,7 +309,7 @@ func (s *Storage) GetAuditPlanByInstanceIdAndType(instanceAuditPlanID string, au

func (s *Storage) GetInstanceAuditPlanDetail(instanceAuditPlanID string) (*InstanceAuditPlan, bool, error) {
instanceAuditPlan := &InstanceAuditPlan{}
err := s.db.Model(InstanceAuditPlan{}).Where("id = ?", instanceAuditPlanID).Preload("AuditPlans").First(&instanceAuditPlan).Error
err := s.db.Model(InstanceAuditPlan{}).Where("id = ?", instanceAuditPlanID).Preload("AuditPlans").Preload("AuditPlans.AuditPlanTaskInfo").First(&instanceAuditPlan).Error
if err == gorm.ErrRecordNotFound {
return instanceAuditPlan, false, nil
}
Expand All @@ -322,11 +325,42 @@ func (s *Storage) GetAuditPlanTotalSQL(auditPlanID uint) (int64, error) {
return count, errors.ConnectStorageErrWrapper(err)
}

func (s *Storage) SaveInstanceAuditPlan(instAuditPlans *InstanceAuditPlan) error {
return s.Tx(func(txDB *gorm.DB) error {
if err := txDB.Save(instAuditPlans).Error; err != nil {
return err
}
apTaskInfos := make([]*AuditPlanTaskInfo, 0, len(instAuditPlans.AuditPlans))
for _, auditPlan := range instAuditPlans.AuditPlans {
apTaskInfos = append(apTaskInfos, &AuditPlanTaskInfo{
AuditPlanID: auditPlan.ID,
})
}
if err := txDB.Save(apTaskInfos).Error; err != nil {
return err
}
return nil
})
}

func (s *Storage) BatchSaveAuditPlans(auditPlans []*AuditPlanV2) error {
return s.Tx(func(txDB *gorm.DB) error {
for _, auditPlan := range auditPlans {
if err := txDB.Save(auditPlan).Error; err != nil {
return err
// 新增的扫描任务类型需要保存audit task info
if auditPlan.ID == 0 {
if err := txDB.Save(auditPlan).Error; err != nil {
return err
}
apTaskInfo := &AuditPlanTaskInfo{
AuditPlanID: auditPlan.ID,
}
if err := txDB.Save(apTaskInfo).Error; err != nil {
return err
}
} else {
if err := txDB.Save(auditPlan).Error; err != nil {
return err
}
}
}
return nil
Expand All @@ -345,12 +379,14 @@ func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error {
}
err = txDB.Exec(`UPDATE instance_audit_plans iap
LEFT JOIN audit_plans_v2 ap ON iap.id = ap.instance_audit_plan_id
LEFT JOIN sql_manage_records oms ON oms.source_id = ap.instance_audit_plan_id AND oms.source = ap.type
LEFT JOIN sql_manage_record_processes sm ON sm.sql_manage_record_id = oms.id
LEFT JOIN audit_plan_task_infos apti ON apti.audit_plan_id = ap.id
LEFT JOIN sql_manage_records smr ON smr.source_id = ap.instance_audit_plan_id AND smr.source = ap.type
LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id
SET iap.deleted_at = now(),
ap.deleted_at = now(),
oms.deleted_at = now(),
sm.deleted_at = now()
smr.deleted_at = now(),
smrp.deleted_at = now(),
apti.deleted_at = now()
WHERE iap.ID = ?`, instanceAuditPlanId).Error
if err != nil {
return err
Expand All @@ -369,11 +405,13 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error {
return err
}
err = txDB.Exec(`UPDATE audit_plans_v2 ap
LEFT JOIN sql_manage_records oms ON oms.source_id = ap.instance_audit_plan_id AND oms.source = ap.type
LEFT JOIN sql_manage_record_processes sm ON sm.sql_manage_record_id = oms.id
LEFT JOIN audit_plan_task_infos apti ON apti.audit_plan_id = ap.id
LEFT JOIN sql_manage_records smr ON smr.source_id = ap.instance_audit_plan_id AND smr.source = ap.type
LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id
SET ap.deleted_at = now(),
oms.deleted_at = now(),
sm.deleted_at = now()
smr.deleted_at = now(),
smrp.deleted_at = now(),
apti.deleted_at = now()
WHERE ap.id = ?`, auditPlanID).Error
if err != nil {
return err
Expand Down Expand Up @@ -456,12 +494,15 @@ func (s *Storage) UpdateManagerSQL(sql *SQLManageRecord) error {

func (s *Storage) UpdateManagerSQLStatus(sql *SQLManageRecord) error {
const query = ` INSERT INTO sql_manage_record_processes (sql_manage_record_id)
SELECT oms.id FROM sql_manage_records oms WHERE oms.sql_id = ?
SELECT smr.id FROM sql_manage_records smr WHERE smr.sql_id = ?
ON DUPLICATE KEY UPDATE sql_manage_record_id = VALUES(sql_manage_record_id);`
return s.db.Exec(query, sql.SQLID).Error
}

func (s *Storage) UpdateAuditPlanLastCollectionTime(auditPlanID uint, collectionTime time.Time) error {
const query = `UPDATE audit_plans_v2 SET last_collection_time = now(3) WHERE id = ?;`
return s.db.Exec(query, auditPlanID).Error
err := s.db.Model(AuditPlanTaskInfo{}).Where("audit_plan_id = ?", auditPlanID).Update("last_collection_time", collectionTime).Error
if err != nil {
return err
}
return nil
}
1 change: 1 addition & 0 deletions sqle/model/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ var autoMigrateList = []interface{}{
&FeishuScheduledRecord{},
&InstanceAuditPlan{},
&AuditPlanV2{},
&AuditPlanTaskInfo{},
&SQLManageRecord{},
&SQLManageRecordProcess{},
&SQLManageQueue{},
Expand Down
3 changes: 2 additions & 1 deletion sqle/server/auditplan/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (mgr *Manager) sync() error {
}
}
// 增量同步智能扫描任务,根据数据库记录的更新时间筛选,更新后将下次筛选的时间为上一次记录的最晚的更新时间。
aps, err := mgr.persist.GetLatestAuditPlanRecordsV2()
aps, err := mgr.persist.GetLatestAuditPlanRecordsV2(*mgr.lastSyncTime)
if err != nil {
return err
}
Expand All @@ -241,6 +241,7 @@ func (mgr *Manager) sync() error {
if err != nil {
mgr.logger.WithField("id", ap.ID).Errorf("sync audit task failed, error: %v", err)
}
mgr.lastSyncTime = &ap.UpdatedAt
}
return nil
}
Expand Down
1 change: 0 additions & 1 deletion sqle/server/auditplan/task_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func (at *TaskWrapper) extractSQL() {
err = at.persist.UpdateAuditPlanLastCollectionTime(at.ap.ID, collectionTime)
if err != nil {
at.logger.Errorf("update audit plan last collection time failed, error : %v", err)
return
}
if len(sqls) == 0 {
at.logger.Info("extract sql list is empty, skip")
Expand Down
Loading