From c3b121a7af0471be00ea55ae8602b317b48a0eb2 Mon Sep 17 00:00:00 2001 From: Prateek Chaudhry Date: Thu, 15 Jun 2023 00:34:35 -0700 Subject: [PATCH] dont consume resources for acs stopped tasks --- agent/engine/docker_task_engine.go | 51 ++++++++++++++++++++++++------ agent/engine/task_manager.go | 28 ++++++++++++++++ 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 5f4e86e018a..23263a3c6ab 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -155,9 +155,14 @@ type DockerTaskEngine struct { // all tasks, it must not acquire it for any significant duration // The write mutex should be taken when adding and removing tasks from managedTasks. tasksLock sync.RWMutex - // waitingTasksLock is a mutext for operations on waitingTasksQueue + // waitingTasksLock is a mutex for operations on waitingTasksQueue waitingTasksLock sync.RWMutex + // monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which + // allocate host resources and wakes up waiting host resources. This should be used + // for synchronizing task desired status updates and queue operations + monitorQueuedTasksLock sync.RWMutex + credentialsManager credentials.Manager _time ttime.Time _timeOnce sync.Once @@ -392,15 +397,8 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { if err != nil { break } - taskHostResources := task.ToHostResources() - consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) - if err != nil { - engine.failWaitingTask(err) - } - if consumed { - engine.startWaitingTask() - } else { - // not consumed, go to wait + dequeuedTask := engine.tryDequeueWaitingTasks(task) + if !dequeuedTask { break } } @@ -409,6 +407,39 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { } } +func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool { + // Isolate monitorQueuedTasks processing from changes of desired status updates to prevent + // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks + // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks + // could be processing + engine.monitorQueuedTasksLock.Lock() + defer engine.monitorQueuedTasksLock.Unlock() + taskDesiredStatus := task.GetDesiredStatus() + if taskDesiredStatus.Terminal() { + logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn}) + engine.returnWaitingTask() + return true + } + taskHostResources := task.ToHostResources() + consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) + if err != nil { + engine.failWaitingTask(err) + return true + } + if consumed { + engine.startWaitingTask() + return true + } + return false + // not consumed, go to wait +} + +// To be called when resources are not to be consumed by host resource manager, just dequeues and returns +func (engine *DockerTaskEngine) returnWaitingTask() { + task, _ := engine.dequeueTask() + task.consumedHostResourceEvent <- struct{}{} +} + func (engine *DockerTaskEngine) failWaitingTask(err error) { task, _ := engine.dequeueTask() logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn}) diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index 13f7dcc0991..7fce21994d3 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -204,6 +204,7 @@ func (mtask *managedTask) overseeTask() { // - Waits until host resource manager succesfully 'consume's task resources and returns // - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately // - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately + // - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources // (resources are later 'release'd on Stopped task emitTaskEvent call) mtask.waitForHostResources() @@ -386,6 +387,14 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool { func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) { // Handle acs message changes this task's desired status to whatever // acs says it should be if it is compatible + + // Isolate change of desired status updates from monitorQueuedTasks processing to prevent + // unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks + // For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks + // could be processing + mtask.engine.monitorQueuedTasksLock.Lock() + defer mtask.engine.monitorQueuedTasksLock.Unlock() + logger.Info("New acs transition", logger.Fields{ field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), @@ -1454,6 +1463,9 @@ func (mtask *managedTask) time() ttime.Time { } func (mtask *managedTask) cleanupTask(taskStoppedDuration time.Duration) { + // In case waitForHostResources returns on an event other than task's consumedHostResourceEvent + // release resources from host_resource_manager + go mtask.discardConsumedHostResourceEvents() taskExecutionCredentialsID := mtask.GetExecutionCredentialsID() cleanupTimeDuration := mtask.GetKnownStatusTime().Add(taskStoppedDuration).Sub(ttime.Now()) cleanupTime := make(<-chan time.Time) @@ -1520,6 +1532,22 @@ func (mtask *managedTask) discardEvents() { } } +func (mtask *managedTask) discardConsumedHostResourceEvents() { + for { + select { + case <-mtask.consumedHostResourceEvent: + logger.Info("Releasing resources in cleanup", logger.Fields{field.TaskARN: mtask.Arn}) + resourcesToRelease := mtask.ToHostResources() + err := mtask.engine.hostResourceManager.release(mtask.Arn, resourcesToRelease) + if err != nil { + logger.Critical("Failed to release resources in discardConsumedHostResourceEvents", logger.Fields{field.TaskARN: mtask.Arn}) + } + case <-mtask.ctx.Done(): + return + } + } +} + // waitForStopReported will wait for the task to be reported stopped and return true, or will time-out and return false. // Messages on the mtask.dockerMessages and mtask.acsMessages channels will be handled while this function is waiting. func (mtask *managedTask) waitForStopReported() bool {