diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 51ac7074289..9bb5c902f08 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -155,14 +155,9 @@ type DockerTaskEngine struct { // all tasks, it must not acquire it for any significant duration // The write mutex should be taken when adding and removing tasks from managedTasks. tasksLock sync.RWMutex - // waitingTasksLock is a mutex for operations on waitingTasksQueue + // waitingTasksLock is a mutext for operations on waitingTasksQueue waitingTasksLock sync.RWMutex - // monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which - // allocate host resources and wakes up waiting host resources. This should be used - // for synchronizing task desired status updates and queue operations - monitorQueuedTasksLock sync.RWMutex - credentialsManager credentials.Manager _time ttime.Time _timeOnce sync.Once @@ -397,8 +392,15 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { if err != nil { break } - dequeuedTask := engine.tryDequeueWaitingTasks(task) - if !dequeuedTask { + taskHostResources := task.ToHostResources() + consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) + if err != nil { + engine.failWaitingTask(err) + } + if consumed { + engine.startWaitingTask() + } else { + // not consumed, go to wait break } } @@ -407,39 +409,6 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { } } -func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool { - // Isolate monitorQueuedTasks processing from changes of desired status updates to prevent - // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks - // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks - // could be processing - engine.monitorQueuedTasksLock.Lock() - defer engine.monitorQueuedTasksLock.Unlock() - taskDesiredStatus := task.GetDesiredStatus() - if taskDesiredStatus.Terminal() { - logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn}) - engine.returnWaitingTask() - return true - } - taskHostResources := task.ToHostResources() - consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) - if err != nil { - engine.failWaitingTask(err) - return true - } - if consumed { - engine.startWaitingTask() - return true - } - return false - // not consumed, go to wait -} - -// To be called when resources are not to be consumed by host resource manager, just dequeues and returns -func (engine *DockerTaskEngine) returnWaitingTask() { - task, _ := engine.dequeueTask() - task.consumedHostResourceEvent <- struct{}{} -} - func (engine *DockerTaskEngine) failWaitingTask(err error) { task, _ := engine.dequeueTask() logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn}) diff --git a/agent/engine/engine_unix_integ_test.go b/agent/engine/engine_unix_integ_test.go index a6bbcc5a553..6ac1db00c3f 100644 --- a/agent/engine/engine_unix_integ_test.go +++ b/agent/engine/engine_unix_integ_test.go @@ -1227,7 +1227,7 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) { testTask := createTestTask(taskArn) // create container - A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i)) + A := createTestContainerWithImageAndName(baseImageForOS, "A") A.EntryPoint = &entryPointForOS A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} A.Essential = true @@ -1288,96 +1288,3 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) { waitFinished(t, finished, testTimeout) } - -// This task verifies resources are properly released for all tasks for the case where -// stopTask is received from ACS for a task which is queued up in waitingTasksQueue -func TestHostResourceManagerStopTaskNotBlockWaitingTasks(t *testing.T) { - testTimeout := 1 * time.Minute - taskEngine, done, _ := setupWithDefaultConfig(t) - defer done() - - stateChangeEvents := taskEngine.StateChangeEvents() - - tasks := []*apitask.Task{} - stopTasks := []*apitask.Task{} - for i := 0; i < 2; i++ { - taskArn := fmt.Sprintf("IntegTaskArn-%d", i) - testTask := createTestTask(taskArn) - testTask.Memory = int64(768) - - // create container - A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i)) - A.EntryPoint = &entryPointForOS - A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} - A.Essential = true - A.StopTimeout = uint(6) - testTask.Containers = []*apicontainer.Container{ - A, - } - - tasks = append(tasks, testTask) - - // Stop task payloads from ACS for the tasks - stopTask := createTestTask(fmt.Sprintf("IntegTaskArn-%d", i)) - stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped - stopTask.Containers = []*apicontainer.Container{} - stopTasks = append(stopTasks, stopTask) - } - - // goroutine to schedule tasks - go func() { - taskEngine.AddTask(tasks[0]) - time.Sleep(2 * time.Second) - - // single managedTask which should have started - assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running") - - // stopTask[0] - stop running task[0], this task will go to STOPPING due to trap handler defined and STOPPED after 6s - taskEngine.AddTask(stopTasks[0]) - - time.Sleep(2 * time.Second) - - // this task (task[1]) goes in waitingTasksQueue because not enough memory available - taskEngine.AddTask(tasks[1]) - - time.Sleep(2 * time.Second) - - // stopTask[1] - stop waiting task - task[1] - taskEngine.AddTask(stopTasks[1]) - }() - - finished := make(chan interface{}) - - // goroutine to verify task running order and verify assertions - go func() { - // 1st task goes to RUNNING - verifyContainerRunningStateChange(t, taskEngine) - verifyTaskIsRunning(stateChangeEvents, tasks[0]) - - time.Sleep(2500 * time.Millisecond) - - // At this time, task[0] stopTask is received, and SIGTERM sent to task - // but the task[0] is still RUNNING due to trap handler - assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING") - assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED") - - time.Sleep(2 * time.Second) - - // task[1] stops while in waitingTasksQueue while task[0] is in progress - // This is because it is still waiting to progress, has no containers created - // and does not need to wait for stopTimeout, can immediately STSC out - verifyTaskIsStopped(stateChangeEvents, tasks[1]) - - // task[0] stops - verifyContainerStoppedStateChange(t, taskEngine) - verifyTaskIsStopped(stateChangeEvents, tasks[0]) - - // Verify resources are properly released in host resource manager - assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[0].Arn), "task 0 resources not released") - assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[1].Arn), "task 1 resources not released") - - close(finished) - }() - - waitFinished(t, finished, testTimeout) -} diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index c710330a36e..beb559547d2 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -204,7 +204,6 @@ func (mtask *managedTask) overseeTask() { // - Waits until host resource manager succesfully 'consume's task resources and returns // - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately // - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately - // - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources // (resources are later 'release'd on Stopped task emitTaskEvent call) mtask.waitForHostResources() @@ -387,14 +386,6 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool { func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) { // Handle acs message changes this task's desired status to whatever // acs says it should be if it is compatible - - // Isolate change of desired status updates from monitorQueuedTasks processing to prevent - // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks - // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks - // could be processing - mtask.engine.monitorQueuedTasksLock.Lock() - defer mtask.engine.monitorQueuedTasksLock.Unlock() - logger.Info("New acs transition", logger.Fields{ field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), diff --git a/agent/engine/task_manager_test.go b/agent/engine/task_manager_test.go index d6c87b43374..3cae8cfcb78 100644 --- a/agent/engine/task_manager_test.go +++ b/agent/engine/task_manager_test.go @@ -1420,8 +1420,7 @@ func TestTaskWaitForExecutionCredentials(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() task := &managedTask{ - ctx: ctx, - engine: &DockerTaskEngine{}, + ctx: ctx, Task: &apitask.Task{ KnownStatusUnsafe: apitaskstatus.TaskRunning, DesiredStatusUnsafe: apitaskstatus.TaskRunning,