Skip to content

Commit

Permalink
fix: task query optimize (#21904)
Browse files Browse the repository at this point in the history
Co-authored-by: Qiu Jian <qiujian@yunionyun.com>
  • Loading branch information
swordqiu and Qiu Jian authored Dec 31, 2024
1 parent af3bb67 commit 34204c5
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 39 deletions.
5 changes: 4 additions & 1 deletion pkg/apis/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ type TaskListInput struct {
IsMulti *bool `json:"is_multi" negative:"is_single" help:"is multi task"`
IsComplete *bool `json:"is_complete" negative:"not_complete" help:"is task completed, either fail or complete"`
IsInit *bool `json:"is_init" negative:"not_init" help:"is task started?"`
Stage []string `json:"stage" help:"task stage"`
Stage []string `json:"stage" help:"tasks in stages"`
NotStage []string `json:"not_stage" help:"tasks not in stages"`
ParentId []string `json:"parent_id" help:"filter tasks by parent_task_id"`
IsRoot *bool `json:"is_root" help:"filter root tasks"`

SubTask *bool `json:"sub_task" help:"show sub task states"`
}

type TaskDetails struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudcommon/db/taskman/subtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
"subtasks",
)}
SubTaskManager.SetVirtualObject(SubTaskManager)
SubTaskManager.TableSpec().AddIndex(true, "task_id", "stage", "subtask_id", "status")
}

type SSubTask struct {
Expand Down
11 changes: 10 additions & 1 deletion pkg/cloudcommon/db/taskman/taskobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ type STaskObjectManager struct {
var TaskObjectManager *STaskObjectManager

func init() {
TaskObjectManager = &STaskObjectManager{SModelBaseManager: db.NewModelBaseManager(STaskObject{}, "taskobjects_tbl", "taskobject", "taskobjects")}
TaskObjectManager = &STaskObjectManager{
SModelBaseManager: db.NewModelBaseManager(
STaskObject{},
"taskobjects_tbl",
"taskobject",
"taskobjects",
),
}
TaskObjectManager.SetVirtualObject(TaskObjectManager)
TaskObjectManager.TableSpec().AddIndex(true, "task_id", "obj_id", "tenant_id", "domain_id")
}

type STaskObject struct {
Expand Down
108 changes: 71 additions & 37 deletions pkg/cloudcommon/db/taskman/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func init() {
userCredWidthLimit, _ = strconv.Atoi(widthStr)
}
}
TaskManager.TableSpec().AddIndex(true, "id", "created_at", "tenant_id", "domain_id", "parent_task_id", "obj_id", "stage")
}

type STask struct {
Expand All @@ -118,23 +119,23 @@ type STask struct {
ObjType string `old_name:"obj_name" json:"obj_type" width:"128" charset:"utf8" nullable:"true" list:"user"`
Object string `json:"object" width:"128" charset:"utf8" nullable:"true" list:"user"` // Column(VARCHAR(128, charset='utf8'), nullable=False)
ObjId string `width:"128" charset:"ascii" nullable:"false" list:"user" index:"true"` // Column(VARCHAR(ID_LENGTH, charset='ascii'), nullable=False)
TaskName string `width:"64" charset:"ascii" nullable:"false" list:"user"` // Column(VARCHAR(64, charset='ascii'), nullable=False)
TaskName string `width:"64" charset:"ascii" nullable:"false" list:"user" index:"true"` // Column(VARCHAR(64, charset='ascii'), nullable=False)

UserCred mcclient.TokenCredential `width:"1024" charset:"utf8" nullable:"false" get:"user"` // Column(VARCHAR(1024, charset='ascii'), nullable=False)
// OwnerCred string `width:"512" charset:"ascii" nullable:"true"` // Column(VARCHAR(512, charset='ascii'), nullable=True)
Params *jsonutils.JSONDict `charset:"utf8" length:"medium" nullable:"false" get:"user"` // Column(MEDIUMTEXT(charset='ascii'), nullable=False)

Stage string `width:"64" charset:"ascii" nullable:"false" default:"on_init" list:"user"` // Column(VARCHAR(64, charset='ascii'), nullable=False, default='on_init')
Stage string `width:"64" charset:"ascii" nullable:"false" default:"on_init" list:"user" index:"true"` // Column(VARCHAR(64, charset='ascii'), nullable=False, default='on_init')

// 父任务Id
ParentTaskId string `width:"36" charset:"ascii" list:"user" index:"true" json:"parent_task_id"`

taskObject db.IStandaloneModel `ignore:"true"`
taskObjects []db.IStandaloneModel `ignore:"true"`

SubTaskCount int `ignore:"true"`
FailSubTaskCnt int `ignore:"true"`
SUccSubTaskCnt int `ignore:"true"`
SubTaskCount int `ignore:"true" json:"sub_task_count"`
FailSubTaskCnt int `ignore:"true" json:"fail_sub_task_cnt"`
SuccSubTaskCnt int `ignore:"true" json:"succ_sub_task_cnt"`
}

func (manager *STaskManager) CreateByInsertOrUpdate() bool {
Expand Down Expand Up @@ -182,23 +183,55 @@ func (task *STask) GetOwnerId() mcclient.IIdentityProvider {
return task.SProjectizedResourceBase.GetOwnerId()
}

func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery {
objectQAltered := false
objectQ := TaskObjectManager.Query().Snapshot()
objectQ = TaskObjectManager.SProjectizedResourceBaseManager.FilterByOwner(ctx, objectQ, man, userCred, owner, scope)
objectQAltered = objectQ.IsAltered()
objectQ = objectQ.AppendField(sqlchemy.DISTINCT("task_id", objectQ.Field("task_id")))

singleTaskQAltered := false
singleTaskQ := TaskManager.Query().NotEquals("obj_id", MULTI_OBJECTS_ID).Snapshot()
singleTaskQ = TaskManager.SProjectizedResourceBaseManager.FilterByOwner(ctx, singleTaskQ, man, userCred, owner, scope)
singleTaskQAltered = singleTaskQ.IsAltered()
singleTaskQ = singleTaskQ.AppendField(sqlchemy.DISTINCT("task_id", singleTaskQ.Field("id")))

if objectQAltered || singleTaskQAltered {
subQ := sqlchemy.Union(objectQ, singleTaskQ).Query().SubQuery()
q = q.Join(subQ, sqlchemy.Equals(q.Field("id"), subQ.Field("task_id")))
func getExtendTaskObjectQuery(fields ...string) *sqlchemy.SSubQuery {
taskQ := TaskManager.Query("id", "obj_id", "tenant_id", "domain_id").SubQuery()
objectQ := TaskObjectManager.Query().SubQuery()

taskTenantQ := taskQ.Query()
taskTenantQ = taskTenantQ.LeftJoin(objectQ, sqlchemy.Equals(taskQ.Field("id"), objectQ.Field("task_id")))
if utils.IsInArray("task_id", fields) {
taskTenantQ = taskTenantQ.AppendField(taskQ.Field("id").Label("task_id"))
}
if utils.IsInArray("tenant_id", fields) {
taskTenantQ = taskTenantQ.AppendField(sqlchemy.NewFunction(
sqlchemy.NewCase().
When(sqlchemy.Equals(taskQ.Field("obj_id"), MULTI_OBJECTS_ID), objectQ.Field("tenant_id")).Else(taskQ.Field("tenant_id")),
"tenant_id",
true,
))
}
if utils.IsInArray("domain_id", fields) {
taskTenantQ = taskTenantQ.AppendField(sqlchemy.NewFunction(
sqlchemy.NewCase().
When(sqlchemy.Equals(taskQ.Field("obj_id"), MULTI_OBJECTS_ID), objectQ.Field("domain_id")).Else(taskQ.Field("domain_id")),
"domain_id",
true,
))
}
if utils.IsInArray("obj_id", fields) {
taskTenantQ = taskTenantQ.AppendField(sqlchemy.NewFunction(
sqlchemy.NewCase().
When(sqlchemy.Equals(taskQ.Field("obj_id"), MULTI_OBJECTS_ID), objectQ.Field("obj_id")).Else(taskQ.Field("obj_id")),
"obj_id",
true,
))
}
return taskTenantQ.SubQuery()
}

func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery {
taskTenantSubQ := getExtendTaskObjectQuery("task_id", "tenant_id", "domain_id").Query()

taskTenantSubQ = TaskManager.SProjectizedResourceBaseManager.FilterByOwner(ctx, taskTenantSubQ, man, userCred, owner, scope)
taskTenantSubQ = taskTenantSubQ.SubQuery().Query()
taskTenantSubQ = taskTenantSubQ.AppendField(taskTenantSubQ.Field("task_id"))
taskTenantSubQ = taskTenantSubQ.AppendField(taskTenantSubQ.Field("tenant_id"))
taskTenantSubQ = taskTenantSubQ.AppendField(taskTenantSubQ.Field("domain_id"))
taskTenantSubQ = taskTenantSubQ.GroupBy("task_id", "tenant_id", "domain_id")
taskTenantTaskIdQ := taskTenantSubQ.Distinct().SubQuery()

q = q.Join(taskTenantTaskIdQ, sqlchemy.Equals(q.Field("id"), taskTenantTaskIdQ.Field("task_id")))

return q
}

Expand Down Expand Up @@ -460,14 +493,6 @@ func (manager *STaskManager) fetchTask(idStr string) *STask {
return task
}

/*func (manager *STaskManager) getTaskName(taskId string) string {
baseTask := manager.fetchTask(taskId)
if baseTask == nil {
return ""
}
return baseTask.TaskName
}*/

func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) {
baseTask := manager.fetchTask(taskId)
if baseTask == nil {
Expand Down Expand Up @@ -884,7 +909,7 @@ func (task *STask) IsCurrentStageComplete() bool {
totalSubtasksCnt, _ := SubTaskManager.GetTotalSubtasksCount(task.Id, task.Stage)
initSubtasksCnt, _ := SubTaskManager.GetInitSubtasksCount(task.Id, task.Stage)
log.Debugf("Task %s IsCurrentStageComplete totalSubtasks %d initSubtasks %d ", task.String(), totalSubtasksCnt, initSubtasksCnt)
task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) / float32(totalSubtasksCnt))
task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) * 100 / float32(totalSubtasksCnt))
if totalSubtasksCnt > 0 && initSubtasksCnt == 0 {
return true
} else {
Expand Down Expand Up @@ -1089,12 +1114,17 @@ func (manager *STaskManager) ListItemFilter(
}

if len(input.ObjId) > 0 {
taskObjsQ := TaskObjectManager.Query().In("obj_id", input.ObjId)
taskObjsQ = taskObjsQ.AppendField(sqlchemy.DISTINCT("task_id", taskObjsQ.Field("task_id")))
tasksQ := TaskManager.Query().In("obj_id", input.ObjId)
tasksQ = tasksQ.AppendField(tasksQ.Field("id").Label("task_id"))
taskIdQ := sqlchemy.Union(taskObjsQ, tasksQ).Query().SubQuery()
q = q.Join(taskIdQ, sqlchemy.Equals(taskIdQ.Field("task_id"), q.Field("id")))
taskExtSubQ := getExtendTaskObjectQuery("task_id", "obj_id").Query()

taskExtSubQ = taskExtSubQ.In("obj_id", input.ObjId)

taskExtSubQ = taskExtSubQ.SubQuery().Query()
taskExtSubQ = taskExtSubQ.AppendField(taskExtSubQ.Field("task_id"))
taskExtSubQ = taskExtSubQ.AppendField(taskExtSubQ.Field("obj_id"))
taskExtSubQ = taskExtSubQ.GroupBy("task_id", "obj_id")
taskObjTaskIdQ := taskExtSubQ.Distinct().SubQuery()

q = q.Join(taskObjTaskIdQ, sqlchemy.Equals(taskObjTaskIdQ.Field("task_id"), q.Field("id")))
}

if len(input.ObjName) > 0 {
Expand All @@ -1113,6 +1143,10 @@ func (manager *STaskManager) ListItemFilter(
q = q.In("stage", input.Stage)
}

if len(input.NotStage) > 0 {
q = q.NotIn("stage", input.NotStage)
}

if len(input.ParentId) > 0 {
q = q.In("parent_task_id", input.ParentId)
}
Expand Down Expand Up @@ -1149,7 +1183,7 @@ func (manager *STaskManager) ListItemFilter(
}
}

if input.Details != nil && *input.Details {
if input.SubTask != nil && *input.SubTask {
subSQFunc := func(status string, cntField string) *sqlchemy.SSubQuery {
subQ := SubTaskManager.Query()
if len(status) > 0 {
Expand Down

0 comments on commit 34204c5

Please sign in to comment.