From abf9bd1be1d65a6aa0ba534c11d4b908e9045236 Mon Sep 17 00:00:00 2001 From: Prateek Chaudhry Date: Tue, 13 Jun 2023 15:33:48 -0700 Subject: [PATCH] Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order (#3747) --- agent/api/task/task.go | 4 ++-- agent/engine/docker_task_engine.go | 11 ++++++++--- agent/engine/host_resource_manager.go | 5 +++-- agent/engine/task_manager.go | 18 +++++++++++++++--- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 47a8a5a49d1..4382cbbfa73 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -3617,8 +3617,8 @@ func (task *Task) ToHostResources() map[string]*ecs.Resource { "taskArn": task.Arn, "CPU": *resources["CPU"].IntegerValue, "MEMORY": *resources["MEMORY"].IntegerValue, - "PORTS_TCP": resources["PORTS_TCP"].StringSetValue, - "PORTS_UDP": resources["PORTS_UDP"].StringSetValue, + "PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue), + "PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue), "GPU": *resources["GPU"].IntegerValue, }) return resources diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 050665467a7..5f4e86e018a 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -597,15 +597,20 @@ func (engine *DockerTaskEngine) synchronizeState() { } tasks := engine.state.AllTasks() + // For normal task progress, overseeTask 'consume's resources through waitForHostResources in host_resource_manager before progressing + // For agent restarts (state restore), we pre-consume resources for tasks that had progressed beyond waitForHostResources stage - + // so these tasks do not wait during 'waitForHostResources' call again - do not go through queuing again + // + // Call reconcileHostResources before + // - filterTasksToStartUnsafe which will reconcile container statuses for the duration the agent was stopped + // - starting managedTask's overseeTask goroutines + engine.reconcileHostResources() tasksToStart := engine.filterTasksToStartUnsafe(tasks) for _, task := range tasks { task.InitializeResources(engine.resourceFields) engine.saveTaskData(task) } - // Before starting managedTask goroutines, pre-allocate resources for tasks which - // which have progressed beyond resource check (waitingTaskQueue) stage - engine.reconcileHostResources() for _, task := range tasksToStart { engine.startTask(task) } diff --git a/agent/engine/host_resource_manager.go b/agent/engine/host_resource_manager.go index e26521f5e5d..90435475dd8 100644 --- a/agent/engine/host_resource_manager.go +++ b/agent/engine/host_resource_manager.go @@ -23,6 +23,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" + "github.com/aws/aws-sdk-go/aws" ) const ( @@ -72,8 +73,8 @@ func (h *HostResourceManager) logResources(msg string, taskArn string) { "taskArn": taskArn, "CPU": *h.consumedResource[CPU].IntegerValue, "MEMORY": *h.consumedResource[MEMORY].IntegerValue, - "PORTS_TCP": h.consumedResource[PORTSTCP].StringSetValue, - "PORTS_UDP": h.consumedResource[PORTSUDP].StringSetValue, + "PORTS_TCP": aws.StringValueSlice(h.consumedResource[PORTSTCP].StringSetValue), + "PORTS_UDP": aws.StringValueSlice(h.consumedResource[PORTSUDP].StringSetValue), "GPU": *h.consumedResource[GPU].IntegerValue, }) } diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index fd1a321abb8..13f7dcc0991 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -199,12 +199,17 @@ func (mtask *managedTask) overseeTask() { // `desiredstatus`es which are a construct of the engine used only here, // not present on the backend mtask.UpdateStatus() - // If this was a 'state restore', send all unsent statuses - mtask.emitCurrentStatus() - // Wait for host resources required by this task to become available + // Wait here until enough resources are available on host for the task to progress + // - Waits until host resource manager succesfully 'consume's task resources and returns + // - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately + // - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately + // (resources are later 'release'd on Stopped task emitTaskEvent call) mtask.waitForHostResources() + // If this was a 'state restore', send all unsent statuses + mtask.emitCurrentStatus() + // Main infinite loop. This is where we receive messages and dispatch work. for { if mtask.shouldExit() { @@ -272,6 +277,13 @@ func (mtask *managedTask) emitCurrentStatus() { // the task. It will wait for event on this task's consumedHostResourceEvent // channel from monitorQueuedTasks routine to wake up func (mtask *managedTask) waitForHostResources() { + if mtask.GetKnownStatus().Terminal() { + // Task's known status is STOPPED. No need to wait in this case and proceed to cleanup + // This is relevant when agent restarts and a task has stopped - do not attempt + // to consume resources in host resource manager + return + } + if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) { // Internal tasks are started right away as their resources are not accounted for mtask.engine.enqueueTask(mtask)