Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integ tests for task accounting #3741

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion agent/api/task/task_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func TestPostUnmarshalWindowsCanonicalPaths(t *testing.T) {
},
},
},
StartSequenceNumber: 42,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field has been removed as part of #3723

}

seqNum := int64(42)
Expand Down
2 changes: 1 addition & 1 deletion agent/engine/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"github.com/aws/amazon-ecs-agent/agent/engine/execcmd"
mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks"
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
mock_ttime "github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime/mocks"
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/cihub/seelog"
dockercontainer "github.com/docker/docker/api/types/container"
"github.com/golang/mock/gomock"
Expand Down
15 changes: 12 additions & 3 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,17 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return nil
}

// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent
// Method to wake up 'monitorQueuedTasks' goroutine, called when
// - a new task enqueues in waitingTaskQueue
// - a task stops (overseeTask)
// as these are the events when resources change/can change on the host
// Always wakes up when at least one event arrives on buffered channel (size 1) 'monitorQueuedTaskEvent'
// but does not block if monitorQueuedTasks is already processing queued tasks
// Buffered channel of size 1 is sufficient because we only want to go through the queue
// once at any point and schedule as many tasks as possible (as many resources are available)
// Calls on 'wakeUpTaskQueueMonitor' when 'monitorQueuedTasks' is doing work are redundant
// as new tasks are enqueued at the end and will be taken into account in the continued loop
// if permitted by design
func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() {
select {
case engine.monitorQueuedTaskEvent <- struct{}{}:
Expand Down Expand Up @@ -594,8 +603,8 @@ func (engine *DockerTaskEngine) synchronizeState() {
engine.saveTaskData(task)
}

// Before starting managedTask goroutines, pre-allocate resources for already running
// tasks in host resource manager
// Before starting managedTask goroutines, pre-allocate resources for tasks which
// which have progressed beyond resource check (waitingTaskQueue) stage
engine.reconcileHostResources()
for _, task := range tasksToStart {
engine.startTask(task)
Expand Down
157 changes: 157 additions & 0 deletions agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,3 +1131,160 @@ func TestDockerExecAPI(t *testing.T) {

waitFinished(t, finished, testTimeout)
}

// This integ test checks for task queuing behavior in waitingTaskQueue which is dependent on hostResourceManager.
// First two tasks totally consume the available memory resource on the host. So the third task queued up needs to wait
// until resources gets freed up (i.e. any running tasks stops and frees enough resources) before it can start progressing.
func TestHostResourceManagerTrickleQueue(t *testing.T) {
testTimeout := 1 * time.Minute
taskEngine, done, _ := setupWithDefaultConfig(t)
defer done()

stateChangeEvents := taskEngine.StateChangeEvents()

tasks := []*apitask.Task{}
for i := 0; i < 3; i++ {
taskArn := fmt.Sprintf("taskArn-%d", i)
testTask := createTestTask(taskArn)

// create container
A := createTestContainerWithImageAndName(baseImageForOS, "A")
A.EntryPoint = &entryPointForOS
A.Command = []string{"sleep 10"}
A.Essential = true
testTask.Containers = []*apicontainer.Container{
A,
}

// task memory so that only 2 such tasks can run - 1024 total memory available on instance by getTestHostResources()
testTask.Memory = int64(512)

tasks = append(tasks, testTask)
}

// goroutine to trickle tasks to enforce queueing order
go func() {
taskEngine.AddTask(tasks[0])
time.Sleep(2 * time.Second)
taskEngine.AddTask(tasks[1])
time.Sleep(2 * time.Second)
taskEngine.AddTask(tasks[2])
}()

finished := make(chan interface{})

// goroutine to verify task running order
go func() {
// Tasks go RUNNING in order
verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[0])

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[1])

// First task should stop before 3rd task goes RUNNING
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[0])

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[2])

verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[1])

verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[2])
close(finished)
}()

// goroutine to verify task accounting
// After ~4s, 3rd task should be queued up and will not be dequeued until ~10s, i.e. until 1st task stops and gets dequeued
go func() {
time.Sleep(6 * time.Second)
task, err := taskEngine.(*DockerTaskEngine).topTask()
assert.NoError(t, err, "one task should be queued up after 6s")
assert.Equal(t, task.Arn, tasks[2].Arn, "wrong task at top of queue")

time.Sleep(6 * time.Second)
_, err = taskEngine.(*DockerTaskEngine).topTask()
assert.Error(t, err, "no task should be queued up after 12s")
}()
waitFinished(t, finished, testTimeout)
}

// This test verifies if a task which is STOPPING does not block other new tasks
// from starting if resources for them are available
func TestHostResourceManagerResourceUtilization(t *testing.T) {
testTimeout := 1 * time.Minute
taskEngine, done, _ := setupWithDefaultConfig(t)
defer done()

stateChangeEvents := taskEngine.StateChangeEvents()

tasks := []*apitask.Task{}
for i := 0; i < 2; i++ {
taskArn := fmt.Sprintf("IntegTaskArn-%d", i)
testTask := createTestTask(taskArn)

// create container
A := createTestContainerWithImageAndName(baseImageForOS, "A")
A.EntryPoint = &entryPointForOS
A.Command = []string{"trap shortsleep SIGTERM; shortsleep() { sleep 6; exit 1; }; sleep 10"}
A.Essential = true
A.StopTimeout = uint(6)
testTask.Containers = []*apicontainer.Container{
A,
}

tasks = append(tasks, testTask)
}

// Stop task payload from ACS for 1st task
stopTask := createTestTask("IntegTaskArn-0")
stopTask.DesiredStatusUnsafe = apitaskstatus.TaskStopped
stopTask.Containers = []*apicontainer.Container{}

go func() {
taskEngine.AddTask(tasks[0])
time.Sleep(2 * time.Second)

// single managedTask which should have started
assert.Equal(t, 1, len(taskEngine.(*DockerTaskEngine).managedTasks), "exactly one task should be running")

// stopTask
taskEngine.AddTask(stopTask)
time.Sleep(2 * time.Second)

taskEngine.AddTask(tasks[1])
}()

finished := make(chan interface{})

// goroutine to verify task running order
go func() {
// Tasks go RUNNING in order, 2nd task doesn't wait for 1st task
// to transition to STOPPED as resources are available
verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[0])

verifyContainerRunningStateChange(t, taskEngine)
verifyTaskIsRunning(stateChangeEvents, tasks[1])

// At this time, task[0] stopTask is received, and SIGTERM sent to task
// but the task[0] is still RUNNING due to trap handler
assert.Equal(t, apitaskstatus.TaskRunning, tasks[0].GetKnownStatus(), "task 0 known status should be RUNNING")
assert.Equal(t, apitaskstatus.TaskStopped, tasks[0].GetDesiredStatus(), "task 0 status should be STOPPED")

// task[0] stops after SIGTERM trap handler finishes
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[0])

// task[1] stops after normal execution
verifyContainerStoppedStateChange(t, taskEngine)
verifyTaskIsStopped(stateChangeEvents, tasks[1])

close(finished)
}()

waitFinished(t, finished, testTimeout)
}
2 changes: 1 addition & 1 deletion agent/statemanager/state_manager_win_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestLoadsDataForGMSATask(t *testing.T) {
defer cleanup()

cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v26", "gmsa")}
taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil, nil, nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature here changed as part of #3684

var containerInstanceArn, cluster, savedInstanceID string
var sequenceNumber int64

Expand Down