Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed wrong status update in task service #95

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions svc-task/thandle/thandle.go
Original file line number Diff line number Diff line change
@@ -1104,7 +1104,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("Work on the task with Id %v has been halted prior to completion due to an explicit request.", taskID)
// TODO
case "Interrupted":
/* This state shall represent that the operation has been interrupted but is
expected to restart and is therefore not complete.
@@ -1117,7 +1116,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change nitification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has completed with errors..", taskID)
// TODO
case "New":
/* This state shall represent that this task is newly created but the
operation has not yet started.
@@ -1127,7 +1125,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change nitification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has started.", taskID)
// TODO
case "Pending":
/*This state shall represent that the operation is pending some condition and
has not yet begun to execute.
@@ -1136,7 +1133,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change nitification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has completed with errors.", taskID)
// TODO
case "Running":
// This state shall represent that the operation is executing.
if payLoad != nil && payLoad.FinalResponseBody != nil {
@@ -1147,7 +1143,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".TaskProgressChanged"
taskMessage = fmt.Sprintf("The task with Id %v has changed to progress %v percent complete.", taskID, percentComplete)
// TODO
case "Service":
/* This state shall represent that the operation is now running as a service
and expected to continue operation until stopped or killed.
@@ -1156,14 +1151,12 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has started.", taskID)
// TODO
case "Starting":
// This state shall represent that the operation is starting.
task.TaskState = taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has started.", taskID)
// TODO
case "Stopping":
/* This state shall represent that the operation is stopping but is not yet
complete.
@@ -1172,7 +1165,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has been paused.", taskID)
// TODO
case "Suspended":
/*This state shall represent that the operation has been suspended but is
expected to restart and is therefore not complete.
@@ -1182,7 +1174,7 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has completed with errors.", taskID)
// TODO

default:
return fmt.Errorf("error invalid input argument for taskState")
}
@@ -1243,23 +1235,28 @@ func (ts *TasksRPC) updateParentTask(ctx context.Context, taskID, taskStatus, ta
return err
}
l.LogWithFields(ctx).Debugf("Child ID's associated with parent task %s: %v", task.ParentID, childIDs)
if len(childIDs) < 1 || (len(childIDs) == 1 && taskState == common.Completed && payLoad.StatusCode < http.StatusAccepted) {
l.LogWithFields(ctx).Debugf("All tasks are completed ! Updating Parent task %s to completed state", parentTask.ID)
parentTask.StatusCode = http.StatusOK
if parentTask.TaskFinalResponse != nil {
var resp response.RPC
json.Unmarshal(parentTask.TaskFinalResponse, &resp)
parentTask.Payload.HTTPHeaders = resp.Header
parentTask.TaskFinalResponse = nil
parentTask.StatusCode = http.StatusCreated
}
if value, err := services.GetEventSubscriptionID(ctx, parentTask.ID); err == nil {
tcommon.DeleteSubscription(ctx, value)
if len(childIDs) < 1 || (len(childIDs) == 1 && taskState == common.Completed) {
if payLoad.StatusCode < http.StatusAccepted {
parentTask.StatusCode = http.StatusOK
if parentTask.TaskFinalResponse != nil {
var resp response.RPC
json.Unmarshal(parentTask.TaskFinalResponse, &resp)
parentTask.Payload.HTTPHeaders = resp.Header
parentTask.TaskFinalResponse = nil
parentTask.StatusCode = http.StatusCreated
}
if value, err := services.GetEventSubscriptionID(ctx, parentTask.ID); err == nil {
tcommon.DeleteSubscription(ctx, value)
}
ts.updateTaskToCompleted(parentTask)

} else {
parentTask.StatusCode = payLoad.StatusCode
ts.updateTaskToError(parentTask)
}
ts.updateTaskToCompleted(parentTask)
l.LogWithFields(ctx).Debugf("All tasks are completed ! Updating Parent task %s to completed state", parentTask.ID)
return nil
}

return ts.validateChildTasksAndUpdateParentTask(ctx, childIDs, taskID, parentTask)
}

@@ -1270,7 +1267,6 @@ func (ts *TasksRPC) validateChildTasksAndUpdateParentTask(ctx context.Context, c
s = append(s, "task:"+v)
}
}

data, _ := ts.GetMultipleTaskKeysModel(ctx, s, common.InMemory)
var isSuccess bool = true
for _, subtask := range *data {
@@ -1323,6 +1319,17 @@ func (ts *TasksRPC) updateTaskToCompleted(task *tmodel.Task) {
ts.UpdateTaskQueue(task)
}

// updateTaskToCompleted update the task to completed state with success response
func (ts *TasksRPC) updateTaskToError(task *tmodel.Task) {
task.TaskState = common.Completed
task.TaskStatus = common.OK
task.PercentComplete = 100
resp := tcommon.GetTaskResponse(task.StatusCode, response.Failure)
body, _ := json.Marshal(resp.Body)
task.TaskResponse = body
ts.UpdateTaskQueue(task)
}

// ProcessTaskEvents receive the task event from plugins
// The function will find out the ODIM task corresponding to the plugin task ID
// and task progress from the events
6 changes: 3 additions & 3 deletions svc-task/tmodel/tmodel.go
Original file line number Diff line number Diff line change
@@ -208,7 +208,7 @@ func GetTaskStatus(ctx context.Context, taskID string, db common.DbType) (*Task,
connPool, err := common.GetDBConnection(common.InMemory)
if err != nil {
l.LogWithFields(ctx).Error("GetTaskStatus : error while trying to get DB Connection : " + err.Error())
return task, fmt.Errorf("error while trying to connnect to DB: %v", err.Error())
return task, fmt.Errorf("error while trying to connect to DB: %v", err.Error())
}
taskData, err = connPool.Read("task", taskID)
if err != nil {
@@ -229,14 +229,14 @@ func GetMultipleTaskKeys(ctx context.Context, taskIDs []interface{}, db common.D
connPool, err := common.GetDBConnection(common.InMemory)
if err != nil {
l.LogWithFields(ctx).Error("GetTaskStatus : error while trying to get DB Connection : " + err.Error())
return &task, fmt.Errorf("error while trying to connnect to DB: %v", err.Error())
return &task, fmt.Errorf("error while trying to connect to DB: %v", err.Error())
}
for _, value := range taskIDs {
taskList = append(taskList, value.(string))
}
taskData, err := connPool.ReadMultipleKeys(taskList)
if err != nil {
l.LogWithFields(ctx).Error("GetTaskStatus : Unable to read taskdata from DB: " + err.Error())
l.LogWithFields(ctx).Error("GetTaskStatus : Unable to read task data from DB: " + err.Error())
return &task, fmt.Errorf("error while trying to read from DB: %v", err.Error())
}
for _, data := range taskData {