Skip to content

Commit

Permalink
dont consume resources for acs stopped tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
prateekchaudhry committed Jun 15, 2023
1 parent abf9bd1 commit 376dc60
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 10 deletions.
51 changes: 41 additions & 10 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,14 @@ type DockerTaskEngine struct {
// all tasks, it must not acquire it for any significant duration
// The write mutex should be taken when adding and removing tasks from managedTasks.
tasksLock sync.RWMutex
// waitingTasksLock is a mutext for operations on waitingTasksQueue
// waitingTasksLock is a mutex for operations on waitingTasksQueue
waitingTasksLock sync.RWMutex

// monitorQueuedTasksLock is a mutex for operations in the monitorQueuedTasks which
// allocate host resources and wakes up waiting host resources. This should be used
// for synchronizing task desired status updates and queue operations
monitorQueuedTasksLock sync.RWMutex

credentialsManager credentials.Manager
_time ttime.Time
_timeOnce sync.Once
Expand Down Expand Up @@ -392,15 +397,8 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
if err != nil {
break
}
taskHostResources := task.ToHostResources()
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
}
if consumed {
engine.startWaitingTask()
} else {
// not consumed, go to wait
dequeuedTask := engine.tryDequeueWaitingTasks(task)
if !dequeuedTask {
break
}
}
Expand All @@ -409,6 +407,39 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
}
}

func (engine *DockerTaskEngine) tryDequeueWaitingTasks(task *managedTask) bool {
// Isolate monitorQueuedTasks processing from changes of desired status updates to prevent
// unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks
// For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks
// could be processing
engine.monitorQueuedTasksLock.Lock()
defer engine.monitorQueuedTasksLock.Unlock()
taskDesiredStatus := task.GetDesiredStatus()
if taskDesiredStatus.Terminal() {
logger.Info("Task desired status changed to STOPPED while waiting for host resources, progressing without consuming resources", logger.Fields{field.TaskARN: task.Arn})
engine.returnWaitingTask()
return true
}
taskHostResources := task.ToHostResources()
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
return true
}
if consumed {
engine.startWaitingTask()
return true
}
return false
// not consumed, go to wait
}

// To be called when resources are not to be consumed by host resource manager, just dequeues and returns
func (engine *DockerTaskEngine) returnWaitingTask() {
task, _ := engine.dequeueTask()
task.consumedHostResourceEvent <- struct{}{}
}

func (engine *DockerTaskEngine) failWaitingTask(err error) {
task, _ := engine.dequeueTask()
logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn})
Expand Down
29 changes: 29 additions & 0 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
"github.com/aws/amazon-ecs-agent/agent/vendor/github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"

"github.com/aws/amazon-ecs-agent/agent/api"
apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
Expand Down Expand Up @@ -204,6 +205,7 @@ func (mtask *managedTask) overseeTask() {
// - Waits until host resource manager succesfully 'consume's task resources and returns
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
// - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately
// - If an ACS StopTask arrives, host resources manager returns immediately. Host resource manager does not consume resources
// (resources are later 'release'd on Stopped task emitTaskEvent call)
mtask.waitForHostResources()

Expand Down Expand Up @@ -386,6 +388,14 @@ func (mtask *managedTask) waitEvent(stopWaiting <-chan struct{}) bool {
func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus.TaskStatus, seqnum int64) {
// Handle acs message changes this task's desired status to whatever
// acs says it should be if it is compatible

// Isolate change of desired status updates from monitorQueuedTasks processing to prevent
// unexpected updates to host resource manager when tasks are being processed by monitorQueuedTasks
// For example when ACS StopTask event updates arrives and simultaneously monitorQueuedTasks
// could be processing
mtask.engine.monitorQueuedTasksLock.Lock()
defer mtask.engine.monitorQueuedTasksLock.Unlock()

logger.Info("New acs transition", logger.Fields{
field.TaskID: mtask.GetID(),
field.DesiredStatus: desiredStatus.String(),
Expand Down Expand Up @@ -1454,6 +1464,9 @@ func (mtask *managedTask) time() ttime.Time {
}

func (mtask *managedTask) cleanupTask(taskStoppedDuration time.Duration) {
// In case waitForHostResources returns on an event other than task's consumedHostResourceEvent
// release resources from host_resource_manager
go mtask.discardConsumedHostResourceEvents()
taskExecutionCredentialsID := mtask.GetExecutionCredentialsID()
cleanupTimeDuration := mtask.GetKnownStatusTime().Add(taskStoppedDuration).Sub(ttime.Now())
cleanupTime := make(<-chan time.Time)
Expand Down Expand Up @@ -1520,6 +1533,22 @@ func (mtask *managedTask) discardEvents() {
}
}

func (mtask *managedTask) discardConsumedHostResourceEvents() {
for {
select {
case <-mtask.consumedHostResourceEvent:
logger.Info("Releasing resources in cleanup", logger.Fields{field.TaskARN: mtask.Arn})
resourcesToRelease := mtask.ToHostResources()
err := mtask.engine.hostResourceManager.release(mtask.Arn, resourcesToRelease)
if err != nil {
logger.Critical("Failed to release resources in discardConsumedHostResourceEvents", logger.Fields{field.TaskARN: task.Arn})
}
case <-mtask.ctx.Done():
return
}
}
}

// waitForStopReported will wait for the task to be reported stopped and return true, or will time-out and return false.
// Messages on the mtask.dockerMessages and mtask.acsMessages channels will be handled while this function is waiting.
func (mtask *managedTask) waitForStopReported() bool {
Expand Down

0 comments on commit 376dc60

Please sign in to comment.