diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 9bb5c902f08..51ac7074289 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -155,9 +155,14 @@ type DockerTaskEngine struct { // all tasks, it must not acquire it for any significant duration // The write mutex should be taken when adding and removing tasks from managedTasks. tasksLock sync.RWMutex - // waitingTasksLock is a mutext for operations on waitingTasksQueue + // waitingTasksLock is a mutex for operations on waitingTasksQueue waitingTasksLock sync.RWMutex + // monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which + // allocate host resources and wakes up waiting host resources. This should be used + // for synchronizing task desired status updates and queue operations + monitorQueuedTasksLock sync.RWMutex + credentialsManager credentials.Manager _time ttime.Time _timeOnce sync.Once @@ -392,15 +397,8 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { if err != nil { break } - taskHostResources := task.ToHostResources() - consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) - if err != nil { - engine.failWaitingTask(err) - } - if consumed { - engine.startWaitingTask() - } else { - // not consumed, go to wait + dequeuedTask := engine.tryDequeueWaitingTasks(task) + if !dequeuedTask { break } } @@ -409,6 +407,39 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { } } +func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool { + // Isolate monitorQueuedTasks processing from changes of desired status updates to prevent + // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks + // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks + // could be processing + engine.monitorQueuedTasksLock.Lock() + defer engine.monitorQueuedTasksLock.Unlock() + taskDesiredStatus := task.GetDesiredStatus() + if taskDesiredStatus.Terminal() { + logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn}) + engine.returnWaitingTask() + return true + } + taskHostResources := task.ToHostResources() + consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) + if err != nil { + engine.failWaitingTask(err) + return true + } + if consumed { + engine.startWaitingTask() + return true + } + return false + // not consumed, go to wait +} + +// To be called when resources are not to be consumed by host resource manager, just dequeues and returns +func (engine *DockerTaskEngine) returnWaitingTask() { + task, _ := engine.dequeueTask() + task.consumedHostResourceEvent <- struct{}{} +} + func (engine *DockerTaskEngine) failWaitingTask(err error) { task, _ := engine.dequeueTask() logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn}) diff --git a/agent/engine/engine_unix_integ_test.go b/agent/engine/engine_unix_integ_test.go index 6ac1db00c3f..a6bbcc5a553 100644 --- a/agent/engine/engine_unix_integ_test.go +++ b/agent/engine/engine_unix_integ_test.go @@ -1227,7 +1227,7 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) { testTask := createTestTask(taskArn) // create container - A := createTestContainerWithImageAndName(baseImageForOS, "A") + A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i)) A.EntryPoint = &entryPointForOS A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} A.Essential = true @@ -1288,3 +1288,96 @@ func TestHostResourceManagerResourceUtilization(t *testing.T) { waitFinished(t, finished, testTimeout) } + +// This task verifies resources are properly released for all tasks for the case where +// stopTask is received from ACS for a task which is queued up in waitingTasksQueue +func TestHostResourceManagerStopTaskNotBlockWaitingTasks(t *testing.T) { + testTimeout := 1 * time.Minute + taskEngine, done, _ := setupWithDefaultConfig(t) + defer done() + + stateChangeEvents := taskEngine.StateChangeEvents() + + tasks := []*apitask.Task{} + stopTasks := []*apitask.Task{} + for i := 0; i < 2; i++ { + taskArn := fmt.Sprintf("IntegTaskArn-%d", i) + testTask := createTestTask(taskArn) + testTask.Memory = int64(768) + + // create container + A := createTestContainerWithImageAndName(baseImageForOS, fmt.Sprintf("A-%d", i)) + A.EntryPoint = &entryPointForOS + A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} + A.Essential = true + A.StopTimeout = uint(6) + testTask.Containers = []*apicontainer.Container{ + A, + } + + tasks = append(tasks, testTask) + + // Stop task payloads from ACS for the tasks + stopTask := createTestTask(fmt.Sprintf("IntegTaskArn-%d", i)) + stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped + stopTask.Containers = []*apicontainer.Container{} + stopTasks = append(stopTasks, stopTask) + } + + // goroutine to schedule tasks + go func() { + taskEngine.AddTask(tasks[0]) + time.Sleep(2 * time.Second) + + // single managedTask which should have started + assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running") + + // stopTask[0] - stop running task[0], this task will go to STOPPING due to trap handler defined and STOPPED after 6s + taskEngine.AddTask(stopTasks[0]) + + time.Sleep(2 * time.Second) + + // this task (task[1]) goes in waitingTasksQueue because not enough memory available + taskEngine.AddTask(tasks[1]) + + time.Sleep(2 * time.Second) + + // stopTask[1] - stop waiting task - task[1] + taskEngine.AddTask(stopTasks[1]) + }() + + finished := make(chan interface{}) + + // goroutine to verify task running order and verify assertions + go func() { + // 1st task goes to RUNNING + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[0]) + + time.Sleep(2500 * time.Millisecond) + + // At this time, task[0] stopTask is received, and SIGTERM sent to task + // but the task[0] is still RUNNING due to trap handler + assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING") + assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED") + + time.Sleep(2 * time.Second) + + // task[1] stops while in waitingTasksQueue while task[0] is in progress + // This is because it is still waiting to progress, has no containers created + // and does not need to wait for stopTimeout, can immediately STSC out + verifyTaskIsStopped(stateChangeEvents, tasks[1]) + + // task[0] stops + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[0]) + + // Verify resources are properly released in host resource manager + assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[0].Arn), "task 0 resources not released") + assert.False(t, taskEngine.(*DockerTaskEngine).hostResourceManager.checkTaskConsumed(tasks[1].Arn), "task 1 resources not released") + + close(finished) + }() + + waitFinished(t, finished, testTimeout) +} diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index beb559547d2..c710330a36e 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -204,6 +204,7 @@ func (mtask *managedTask) overseeTask() { // - Waits until host resource manager succesfully 'consume's task resources and returns // - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately // - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately + // - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources // (resources are later 'release'd on Stopped task emitTaskEvent call) mtask.waitForHostResources() @@ -386,6 +387,14 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool { func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) { // Handle acs message changes this task's desired status to whatever // acs says it should be if it is compatible + + // Isolate change of desired status updates from monitorQueuedTasks processing to prevent + // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks + // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks + // could be processing + mtask.engine.monitorQueuedTasksLock.Lock() + defer mtask.engine.monitorQueuedTasksLock.Unlock() + logger.Info("New acs transition", logger.Fields{ field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), diff --git a/agent/engine/task_manager_test.go b/agent/engine/task_manager_test.go index 3cae8cfcb78..d6c87b43374 100644 --- a/agent/engine/task_manager_test.go +++ b/agent/engine/task_manager_test.go @@ -1420,7 +1420,8 @@ func TestTaskWaitForExecutionCredentials(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() task := &managedTask{ - ctx: ctx, + ctx: ctx, + engine: &DockerTaskEngine{}, Task: &apitask.Task{ KnownStatusUnsafe: apitaskstatus.TaskRunning, DesiredStatusUnsafe: apitaskstatus.TaskRunning,