From 61ad01020178254bedf49cd26e80a5b2fd359784 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 30 Jun 2023 14:48:00 -0700 Subject: [PATCH] Revert "add integ tests for task accounting (#3741)" This reverts commit 2d30365f50a1d8dc1f704f5c88f66268476ade49. --- agent/api/task/task_windows_test.go | 1 + agent/engine/docker_task_engine.go | 15 +- agent/engine/engine_unix_integ_test.go | 157 ------------------- agent/statemanager/state_manager_win_test.go | 2 +- 4 files changed, 5 insertions(+), 170 deletions(-) diff --git a/agent/api/task/task_windows_test.go b/agent/api/task/task_windows_test.go index 883d9ecd3f4..ce55a5251ea 100644 --- a/agent/api/task/task_windows_test.go +++ b/agent/api/task/task_windows_test.go @@ -109,6 +109,7 @@ func TestPostUnmarshalWindowsCanonicalPaths(t *testing.T) { }, }, }, + StartSequenceNumber: 42, } seqNum := int64(42) diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 69ca5f6ab54..4585c2d9df5 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -323,17 +323,8 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error { return nil } -// Method to wake up 'monitorQueuedTasks' goroutine, called when -// - a new task enqueues in waitingTaskQueue -// - a task stops (overseeTask) -// as these are the events when resources change/can change on the host -// Always wakes up when at least one event arrives on buffered channel (size 1) 'monitorQueuedTaskEvent' +// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent // but does not block if monitorQueuedTasks is already processing queued tasks -// Buffered channel of size 1 is sufficient because we only want to go through the queue -// once at any point and schedule as many tasks as possible (as many resources are available) -// Calls on 'wakeUpTaskQueueMonitor' when 'monitorQueuedTasks' is doing work are redundant -// as new tasks are enqueued at the end and will be taken into account in the continued loop -// if permitted by design func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() { select { case engine.monitorQueuedTaskEvent <- struct{}{}: @@ -603,8 +594,8 @@ func (engine *DockerTaskEngine) synchronizeState() { engine.saveTaskData(task) } - // Before starting managedTask goroutines, pre-allocate resources for tasks which - // which have progressed beyond resource check (waitingTaskQueue) stage + // Before starting managedTask goroutines, pre-allocate resources for already running + // tasks in host resource manager engine.reconcileHostResources() for _, task := range tasksToStart { engine.startTask(task) diff --git a/agent/engine/engine_unix_integ_test.go b/agent/engine/engine_unix_integ_test.go index 6ac1db00c3f..b68e510b82c 100644 --- a/agent/engine/engine_unix_integ_test.go +++ b/agent/engine/engine_unix_integ_test.go @@ -1131,160 +1131,3 @@ func TestDockerExecAPI(t *testing.T) { waitFinished(t, finished, testTimeout) } - -// This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager. -// First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait -// until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing. -func TestHostResourceManagerTrickleQueue(t *testing.T) { - testTimeout := 1 * time.Minute - taskEngine, done, _ := setupWithDefaultConfig(t) - defer done() - - stateChangeEvents := taskEngine.StateChangeEvents() - - tasks := []*apitask.Task{} - for i := 0; i < 3; i++ { - taskArn := fmt.Sprintf("taskArn-%d", i) - testTask := createTestTask(taskArn) - - // create container - A := createTestContainerWithImageAndName(baseImageForOS, "A") - A.EntryPoint = &entryPointForOS - A.Command = []string{"sleep 10"} - A.Essential = true - testTask.Containers = []*apicontainer.Container{ - A, - } - - // task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources() - testTask.Memory = int64(512) - - tasks = append(tasks, testTask) - } - - // goroutine to trickle tasks to enforce queueing order - go func() { - taskEngine.AddTask(tasks[0]) - time.Sleep(2 * time.Second) - taskEngine.AddTask(tasks[1]) - time.Sleep(2 * time.Second) - taskEngine.AddTask(tasks[2]) - }() - - finished := make(chan interface{}) - - // goroutine to verify task running order - go func() { - // Tasks go RUNNING in order - verifyContainerRunningStateChange(t, taskEngine) - verifyTaskIsRunning(stateChangeEvents, tasks[0]) - - verifyContainerRunningStateChange(t, taskEngine) - verifyTaskIsRunning(stateChangeEvents, tasks[1]) - - // First task should stop before 3rd task goes RUNNING - verifyContainerStoppedStateChange(t, taskEngine) - verifyTaskIsStopped(stateChangeEvents, tasks[0]) - - verifyContainerRunningStateChange(t, taskEngine) - verifyTaskIsRunning(stateChangeEvents, tasks[2]) - - verifyContainerStoppedStateChange(t, taskEngine) - verifyTaskIsStopped(stateChangeEvents, tasks[1]) - - verifyContainerStoppedStateChange(t, taskEngine) - verifyTaskIsStopped(stateChangeEvents, tasks[2]) - close(finished) - }() - - // goroutine to verify task accounting - // After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued - go func() { - time.Sleep(6 * time.Second) - task, err := taskEngine.(*DockerTaskEngine).topTask() - assert.NoError(t, err, "one task should be queued up after 6s") - assert.Equal(t, task.Arn, tasks[2].Arn, "wrong task at top of queue") - - time.Sleep(6 * time.Second) - _, err = taskEngine.(*DockerTaskEngine).topTask() - assert.Error(t, err, "no task should be queued up after 12s") - }() - waitFinished(t, finished, testTimeout) -} - -// This test verifies if a task which is STOPPING does not block other new tasks -// from starting if resources for them are available -func TestHostResourceManagerResourceUtilization(t *testing.T) { - testTimeout := 1 * time.Minute - taskEngine, done, _ := setupWithDefaultConfig(t) - defer done() - - stateChangeEvents := taskEngine.StateChangeEvents() - - tasks := []*apitask.Task{} - for i := 0; i < 2; i++ { - taskArn := fmt.Sprintf("IntegTaskArn-%d", i) - testTask := createTestTask(taskArn) - - // create container - A := createTestContainerWithImageAndName(baseImageForOS, "A") - A.EntryPoint = &entryPointForOS - A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"} - A.Essential = true - A.StopTimeout = uint(6) - testTask.Containers = []*apicontainer.Container{ - A, - } - - tasks = append(tasks, testTask) - } - - // Stop task payload from ACS for 1st task - stopTask := createTestTask("IntegTaskArn-0") - stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped - stopTask.Containers = []*apicontainer.Container{} - - go func() { - taskEngine.AddTask(tasks[0]) - time.Sleep(2 * time.Second) - - // single managedTask which should have started - assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running") - - // stopTask - taskEngine.AddTask(stopTask) - time.Sleep(2 * time.Second) - - taskEngine.AddTask(tasks[1]) - }() - - finished := make(chan interface{}) - - // goroutine to verify task running order - go func() { - // Tasks go RUNNING in order, 2nd task doesn't wait for 1st task - // to transition to STOPPED as resources are available - verifyContainerRunningStateChange(t, taskEngine) - verifyTaskIsRunning(stateChangeEvents, tasks[0]) - - verifyContainerRunningStateChange(t, taskEngine) - verifyTaskIsRunning(stateChangeEvents, tasks[1]) - - // At this time, task[0] stopTask is received, and SIGTERM sent to task - // but the task[0] is still RUNNING due to trap handler - assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING") - assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED") - - // task[0] stops after SIGTERM trap handler finishes - verifyContainerStoppedStateChange(t, taskEngine) - verifyTaskIsStopped(stateChangeEvents, tasks[0]) - - // task[1] stops after normal execution - verifyContainerStoppedStateChange(t, taskEngine) - verifyTaskIsStopped(stateChangeEvents, tasks[1]) - - close(finished) - }() - - waitFinished(t, finished, testTimeout) -} diff --git a/agent/statemanager/state_manager_win_test.go b/agent/statemanager/state_manager_win_test.go index 999c6267e6c..125e6a55ac8 100644 --- a/agent/statemanager/state_manager_win_test.go +++ b/agent/statemanager/state_manager_win_test.go @@ -36,7 +36,7 @@ func TestLoadsDataForGMSATask(t *testing.T) { defer cleanup() cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v26", "gmsa")} - taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil) var containerInstanceArn, cluster, savedInstanceID string var sequenceNumber int64