diff --git a/agent/api/task/task_windows_test.go b/agent/api/task/task_windows_test.go index ce55a5251ea..883d9ecd3f4 100644 --- a/agent/api/task/task_windows_test.go +++ b/agent/api/task/task_windows_test.go @@ -109,7 +109,6 @@ func TestPostUnmarshalWindowsCanonicalPaths(t *testing.T) { }, }, }, - StartSequenceNumber: 42, } seqNum := int64(42) diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 4585c2d9df5..69ca5f6ab54 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -323,8 +323,17 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error { return nil } -// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent +// Method to wake up 'monitorQueuedTasks' goroutine, called when +// - a new task enqueues in waitingTaskQueue +// - a task stops (overseeTask) +// as these are the events when resources change/can change on the host +// Always wakes up when at least one event arrives on buffered channel (size 1) 'monitorQueuedTaskEvent' // but does not block if monitorQueuedTasks is already processing queued tasks +// Buffered channel of size 1 is sufficient because we only want to go through the queue +// once at any point and schedule as many tasks as possible (as many resources are available) +// Calls on 'wakeUpTaskQueueMonitor' when 'monitorQueuedTasks' is doing work are redundant +// as new tasks are enqueued at the end and will be taken into account in the continued loop +// if permitted by design func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() { select { case engine.monitorQueuedTaskEvent <- struct{}{}: @@ -594,8 +603,8 @@ func (engine *DockerTaskEngine) synchronizeState() { engine.saveTaskData(task) } - // Before starting managedTask goroutines, pre-allocate resources for already running - // tasks in host resource manager + // Before starting managedTask goroutines, pre-allocate resources for tasks which + // which have progressed beyond resource check (waitingTaskQueue) stage engine.reconcileHostResources() for _, task := range tasksToStart { engine.startTask(task) diff --git a/agent/engine/engine_unix_integ_test.go b/agent/engine/engine_unix_integ_test.go index b68e510b82c..6ac1db00c3f 100644 --- a/agent/engine/engine_unix_integ_test.go +++ b/agent/engine/engine_unix_integ_test.go @@ -1131,3 +1131,160 @@ func TestDockerExecAPI(t *testing.T) { waitFinished(t, finished, testTimeout) } + +// This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager. +// First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait +// until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing. +func TestHostResourceManagerTrickleQueue(t *testing.T) { + testTimeout := 1 * time.Minute + taskEngine, done, _ := setupWithDefaultConfig(t) + defer done() + + stateChangeEvents := taskEngine.StateChangeEvents() + + tasks := []*apitask.Task{} + for i := 0; i < 3; i++ { + taskArn := fmt.Sprintf("taskArn-%d", i) + testTask := createTestTask(taskArn) + + // create container + A := createTestContainerWithImageAndName(baseImageForOS, "A") + A.EntryPoint = &entryPointForOS + A.Command = []string{"sleep 10"} + A.Essential = true + testTask.Containers = []*apicontainer.Container{ + A, + } + + // task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources() + testTask.Memory = int64(512) + + tasks = append(tasks, testTask) + } + + // goroutine to trickle tasks to enforce queueing order + go func() { + taskEngine.AddTask(tasks[0]) + time.Sleep(2 * time.Second) + taskEngine.AddTask(tasks[1]) + time.Sleep(2 * time.Second) + taskEngine.AddTask(tasks[2]) + }() + + finished := make(chan interface{}) + + // goroutine to verify task running order + go func() { + // Tasks go RUNNING in order + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[0]) + + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[1]) + + // First task should stop before 3rd task goes RUNNING + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[0]) + + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[2]) + + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[1]) + + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[2]) + close(finished) + }() + + // goroutine to verify task accounting + // After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued + go func() { + time.Sleep(6 * time.Second) + task, err := taskEngine.(*DockerTaskEngine).topTask() + assert.NoError(t, err, "one task should be queued up after 6s") + assert.Equal(t, task.Arn, tasks[2].Arn, "wrong task at top of queue") + + time.Sleep(6 * time.Second) + _, err = taskEngine.(*DockerTaskEngine).topTask() + assert.Error(t, err, "no task should be queued up after 12s") + }() + waitFinished(t, finished, testTimeout) +} + +// This test verifies if a task which is STOPPING does not block other new tasks +// from starting if resources for them are available +func TestHostResourceManagerResourceUtilization(t *testing.T) { + testTimeout := 1 * time.Minute + taskEngine, done, _ := setupWithDefaultConfig(t) + defer done() + + stateChangeEvents := taskEngine.StateChangeEvents() + + tasks := []*apitask.Task{} + for i := 0; i < 2; i++ { + taskArn := fmt.Sprintf("IntegTaskArn-%d", i) + testTask := createTestTask(taskArn) + + // create container + A := createTestContainerWithImageAndName(baseImageForOS, "A") + A.EntryPoint = &entryPointForOS + A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} + A.Essential = true + A.StopTimeout = uint(6) + testTask.Containers = []*apicontainer.Container{ + A, + } + + tasks = append(tasks, testTask) + } + + // Stop task payload from ACS for 1st task + stopTask := createTestTask("IntegTaskArn-0") + stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped + stopTask.Containers = []*apicontainer.Container{} + + go func() { + taskEngine.AddTask(tasks[0]) + time.Sleep(2 * time.Second) + + // single managedTask which should have started + assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running") + + // stopTask + taskEngine.AddTask(stopTask) + time.Sleep(2 * time.Second) + + taskEngine.AddTask(tasks[1]) + }() + + finished := make(chan interface{}) + + // goroutine to verify task running order + go func() { + // Tasks go RUNNING in order, 2nd task doesn't wait for 1st task + // to transition to STOPPED as resources are available + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[0]) + + verifyContainerRunningStateChange(t, taskEngine) + verifyTaskIsRunning(stateChangeEvents, tasks[1]) + + // At this time, task[0] stopTask is received, and SIGTERM sent to task + // but the task[0] is still RUNNING due to trap handler + assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING") + assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED") + + // task[0] stops after SIGTERM trap handler finishes + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[0]) + + // task[1] stops after normal execution + verifyContainerStoppedStateChange(t, taskEngine) + verifyTaskIsStopped(stateChangeEvents, tasks[1]) + + close(finished) + }() + + waitFinished(t, finished, testTimeout) +} diff --git a/agent/statemanager/state_manager_win_test.go b/agent/statemanager/state_manager_win_test.go index 125e6a55ac8..999c6267e6c 100644 --- a/agent/statemanager/state_manager_win_test.go +++ b/agent/statemanager/state_manager_win_test.go @@ -36,7 +36,7 @@ func TestLoadsDataForGMSATask(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v26", "gmsa")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64