Skip to content

Commit

Permalink
Fixed wrong status update while simple update url is invalid (#1313)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeevan Kamkar <68366409+jeevan-kamkar@users.noreply.github.com>
  • Loading branch information
chauberahul1993 and jeevan-kamkar authored Sep 22, 2023
1 parent 0cb730a commit 2401d1c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
55 changes: 31 additions & 24 deletions svc-task/thandle/thandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("Work on the task with Id %v has been halted prior to completion due to an explicit request.", taskID)
// TODO
case "Interrupted":
/* This state shall represent that the operation has been interrupted but is
expected to restart and is therefore not complete.
Expand All @@ -1117,7 +1116,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change nitification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has completed with errors..", taskID)
// TODO
case "New":
/* This state shall represent that this task is newly created but the
operation has not yet started.
Expand All @@ -1127,7 +1125,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change nitification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has started.", taskID)
// TODO
case "Pending":
/*This state shall represent that the operation is pending some condition and
has not yet begun to execute.
Expand All @@ -1136,7 +1133,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change nitification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has completed with errors.", taskID)
// TODO
case "Running":
// This state shall represent that the operation is executing.
if payLoad != nil && payLoad.FinalResponseBody != nil {
Expand All @@ -1147,7 +1143,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".TaskProgressChanged"
taskMessage = fmt.Sprintf("The task with Id %v has changed to progress %v percent complete.", taskID, percentComplete)
// TODO
case "Service":
/* This state shall represent that the operation is now running as a service
and expected to continue operation until stopped or killed.
Expand All @@ -1156,14 +1151,12 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has started.", taskID)
// TODO
case "Starting":
// This state shall represent that the operation is starting.
task.TaskState = taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has started.", taskID)
// TODO
case "Stopping":
/* This state shall represent that the operation is stopping but is not yet
complete.
Expand All @@ -1172,7 +1165,6 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has been paused.", taskID)
// TODO
case "Suspended":
/*This state shall represent that the operation has been suspended but is
expected to restart and is therefore not complete.
Expand All @@ -1182,7 +1174,7 @@ func (ts *TasksRPC) updateTaskUtil(ctx context.Context, taskID string, taskState
// Construct the appropriate messageID for task status change notification
taskEvenMessageID = common.TaskEventType + ".Task" + taskState
taskMessage = fmt.Sprintf("The task with Id %v has completed with errors.", taskID)
// TODO

default:
return fmt.Errorf("error invalid input argument for taskState")
}
Expand Down Expand Up @@ -1243,23 +1235,28 @@ func (ts *TasksRPC) updateParentTask(ctx context.Context, taskID, taskStatus, ta
return err
}
l.LogWithFields(ctx).Debugf("Child ID's associated with parent task %s: %v", task.ParentID, childIDs)
if len(childIDs) < 1 || (len(childIDs) == 1 && taskState == common.Completed && payLoad.StatusCode < http.StatusAccepted) {
l.LogWithFields(ctx).Debugf("All tasks are completed ! Updating Parent task %s to completed state", parentTask.ID)
parentTask.StatusCode = http.StatusOK
if parentTask.TaskFinalResponse != nil {
var resp response.RPC
json.Unmarshal(parentTask.TaskFinalResponse, &resp)
parentTask.Payload.HTTPHeaders = resp.Header
parentTask.TaskFinalResponse = nil
parentTask.StatusCode = http.StatusCreated
}
if value, err := services.GetEventSubscriptionID(ctx, parentTask.ID); err == nil {
tcommon.DeleteSubscription(ctx, value)
if len(childIDs) < 1 || (len(childIDs) == 1 && taskState == common.Completed) {
if payLoad.StatusCode < http.StatusAccepted {
parentTask.StatusCode = http.StatusOK
if parentTask.TaskFinalResponse != nil {
var resp response.RPC
json.Unmarshal(parentTask.TaskFinalResponse, &resp)
parentTask.Payload.HTTPHeaders = resp.Header
parentTask.TaskFinalResponse = nil
parentTask.StatusCode = http.StatusCreated
}
if value, err := services.GetEventSubscriptionID(ctx, parentTask.ID); err == nil {
tcommon.DeleteSubscription(ctx, value)
}
ts.updateTaskToCompleted(parentTask)

} else {
parentTask.StatusCode = payLoad.StatusCode
ts.updateTaskToError(parentTask)
}
ts.updateTaskToCompleted(parentTask)
l.LogWithFields(ctx).Debugf("All tasks are completed ! Updating Parent task %s to completed state", parentTask.ID)
return nil
}

return ts.validateChildTasksAndUpdateParentTask(ctx, childIDs, taskID, parentTask)
}

Expand All @@ -1270,7 +1267,6 @@ func (ts *TasksRPC) validateChildTasksAndUpdateParentTask(ctx context.Context, c
s = append(s, "task:"+v)
}
}

data, _ := ts.GetMultipleTaskKeysModel(ctx, s, common.InMemory)
var isSuccess bool = true
for _, subtask := range *data {
Expand Down Expand Up @@ -1323,6 +1319,17 @@ func (ts *TasksRPC) updateTaskToCompleted(task *tmodel.Task) {
ts.UpdateTaskQueue(task)
}

// updateTaskToCompleted update the task to completed state with success response
func (ts *TasksRPC) updateTaskToError(task *tmodel.Task) {
task.TaskState = common.Completed
task.TaskStatus = common.OK
task.PercentComplete = 100
resp := tcommon.GetTaskResponse(task.StatusCode, response.Failure)
body, _ := json.Marshal(resp.Body)
task.TaskResponse = body
ts.UpdateTaskQueue(task)
}

// ProcessTaskEvents receive the task event from plugins
// The function will find out the ODIM task corresponding to the plugin task ID
// and task progress from the events
Expand Down
6 changes: 3 additions & 3 deletions svc-task/tmodel/tmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func GetTaskStatus(ctx context.Context, taskID string, db common.DbType) (*Task,
connPool, err := common.GetDBConnection(common.InMemory)
if err != nil {
l.LogWithFields(ctx).Error("GetTaskStatus : error while trying to get DB Connection : " + err.Error())
return task, fmt.Errorf("error while trying to connnect to DB: %v", err.Error())
return task, fmt.Errorf("error while trying to connect to DB: %v", err.Error())
}
taskData, err = connPool.Read("task", taskID)
if err != nil {
Expand All @@ -229,14 +229,14 @@ func GetMultipleTaskKeys(ctx context.Context, taskIDs []interface{}, db common.D
connPool, err := common.GetDBConnection(common.InMemory)
if err != nil {
l.LogWithFields(ctx).Error("GetTaskStatus : error while trying to get DB Connection : " + err.Error())
return &task, fmt.Errorf("error while trying to connnect to DB: %v", err.Error())
return &task, fmt.Errorf("error while trying to connect to DB: %v", err.Error())
}
for _, value := range taskIDs {
taskList = append(taskList, value.(string))
}
taskData, err := connPool.ReadMultipleKeys(taskList)
if err != nil {
l.LogWithFields(ctx).Error("GetTaskStatus : Unable to read taskdata from DB: " + err.Error())
l.LogWithFields(ctx).Error("GetTaskStatus : Unable to read task data from DB: " + err.Error())
return &task, fmt.Errorf("error while trying to read from DB: %v", err.Error())
}
for _, data := range taskData {
Expand Down

0 comments on commit 2401d1c

Please sign in to comment.