diff --git a/agent/api/task.go b/agent/api/task.go index 27faf39d23..36ef2e2851 100644 --- a/agent/api/task.go +++ b/agent/api/task.go @@ -1103,3 +1103,23 @@ func (task *Task) GetID() (string, error) { return resourceSplit[1], nil } + +// RecordExecutionStoppedAt checks if this is an essential container stopped +// and set the task executionStoppedAt timestamps +func (task *Task) RecordExecutionStoppedAt(container *Container) { + if !container.Essential { + return + } + if container.GetKnownStatus() != ContainerStopped { + return + } + // If the essential container is stopped, set the ExecutionStoppedAt timestamp + now := time.Now() + ok := task.SetExecutionStoppedAt(now) + if !ok { + // ExecutionStoppedAt was already recorded. Nothing to left to do here + return + } + seelog.Infof("Task [%s]: recording execution stopped time. Essential container [%s] stopped at: %s", + task.Arn, container.Name, now.String()) +} diff --git a/agent/api/task_test.go b/agent/api/task_test.go index ad54126843..ef2c7321c1 100644 --- a/agent/api/task_test.go +++ b/agent/api/task_test.go @@ -15,6 +15,7 @@ package api import ( "encoding/json" + "fmt" "reflect" "runtime" "testing" @@ -1386,3 +1387,42 @@ func TestContainerHealthConfig(t *testing.T) { assert.Equal(t, config.Healthcheck.Timeout, 4*time.Second) assert.Equal(t, config.Healthcheck.StartPeriod, 1*time.Minute) } + +func TestRecordExecutionStoppedAt(t *testing.T) { + testCases := []struct { + essential bool + status ContainerStatus + executionStoppedAtSet bool + msg string + }{ + { + essential: true, + status: ContainerStopped, + executionStoppedAtSet: true, + msg: "essential container stopped should have executionStoppedAt set", + }, + { + essential: false, + status: ContainerStopped, + executionStoppedAtSet: false, + msg: "non essential container stopped should not cause executionStoppedAt set", + }, + { + essential: true, + status: ContainerRunning, + executionStoppedAtSet: false, + msg: "essential non-stop status change should not cause executionStoppedAt set", + }, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("Container status: %s, essential: %v, executionStoppedAt should be set: %v", tc.status, tc.essential, tc.executionStoppedAtSet), func(t *testing.T) { + task := &Task{} + task.RecordExecutionStoppedAt(&Container{ + Essential: tc.essential, + KnownStatusUnsafe: tc.status, + }) + assert.Equal(t, !tc.executionStoppedAtSet, task.GetExecutionStoppedAt().IsZero(), tc.msg) + }) + } +} diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 1e1495ff6d..40e4cda9ff 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -290,6 +290,37 @@ func (engine *DockerTaskEngine) synchronizeState() { engine.saver.Save() } +// updateContainerMetadata sets the container metadata from the docker inspect +func updateContainerMetadata(metadata *DockerContainerMetadata, container *api.Container, task *api.Task) { + container.SetCreatedAt(metadata.CreatedAt) + container.SetStartedAt(metadata.StartedAt) + container.SetFinishedAt(metadata.FinishedAt) + + // Set the labels if it's not set + if len(metadata.Labels) != 0 && len(container.GetLabels()) == 0 { + container.SetLabels(metadata.Labels) + } + + // Update Volume + if metadata.Volumes != nil { + task.UpdateMountPoints(container, metadata.Volumes) + } + + // Set Exitcode if it's not set + if metadata.ExitCode != nil { + container.SetKnownExitCode(metadata.ExitCode) + } + + // Set port mappings + if len(metadata.PortBindings) != 0 && len(container.KnownPortBindings) == 0 { + container.KnownPortBindings = metadata.PortBindings + } + // update the container health information + if container.HealthStatusShouldBeReported() { + container.SetHealthStatus(metadata.Health) + } +} + // synchronizeContainerStatus checks and updates the container status with docker func (engine *DockerTaskEngine) synchronizeContainerStatus(container *api.DockerContainer, task *api.Task) { if container.DockerID == "" { @@ -301,20 +332,15 @@ func (engine *DockerTaskEngine) synchronizeContainerStatus(container *api.Docker seelog.Warnf("Task engine [%s]: could not find matching container for expected name [%s]: %v", task.Arn, container.DockerName, err) } else { + // update the container metadata in case the container was created during agent restart + metadata := metadataFromContainer(describedContainer) + updateContainerMetadata(&metadata, container.Container, task) container.DockerID = describedContainer.ID + container.Container.SetKnownStatus(dockerStateToState(describedContainer.State)) - container.Container.SetCreatedAt(describedContainer.Created) - container.Container.SetStartedAt(describedContainer.State.StartedAt) - container.Container.SetFinishedAt(describedContainer.State.FinishedAt) // update mappings that need dockerid engine.state.AddContainer(container, task) engine.imageManager.RecordContainerReference(container.Container) - container.Container.SetLabels(describedContainer.Config.Labels) - // update the container health information - if container.Container.HealthStatusShouldBeReported() { - dockerContainerMetadata := metadataFromContainer(describedContainer) - container.Container.SetHealthStatus(dockerContainerMetadata.Health) - } } return } @@ -322,30 +348,33 @@ func (engine *DockerTaskEngine) synchronizeContainerStatus(container *api.Docker currentState, metadata := engine.client.DescribeContainer(container.DockerID) if metadata.Error != nil { currentState = api.ContainerStopped - if !container.Container.KnownTerminal() { - container.Container.ApplyingError = api.NewNamedError(&ContainerVanishedError{}) + // If this is a Docker API error + if metadata.Error.ErrorName() == cannotDescribeContainerError { seelog.Warnf("Task engine [%s]: could not describe previously known container [id=%s; name=%s]; assuming dead: %v", task.Arn, container.DockerID, container.DockerName, metadata.Error) - engine.imageManager.RemoveContainerReferenceFromImageState(container.Container) + if !container.Container.KnownTerminal() { + container.Container.ApplyingError = api.NewNamedError(&ContainerVanishedError{}) + engine.imageManager.RemoveContainerReferenceFromImageState(container.Container) + } + } else { + // If this is a container state error + updateContainerMetadata(&metadata, container.Container, task) + container.Container.ApplyingError = api.NewNamedError(metadata.Error) } } else { - container.Container.SetLabels(metadata.Labels) - container.Container.SetCreatedAt(metadata.CreatedAt) - container.Container.SetStartedAt(metadata.StartedAt) - container.Container.SetFinishedAt(metadata.FinishedAt) + // update the container metadata in case the container status/metadata changed during agent restart + updateContainerMetadata(&metadata, container.Container, task) engine.imageManager.RecordContainerReference(container.Container) if engine.cfg.ContainerMetadataEnabled && !container.Container.IsMetadataFileUpdated() { go engine.updateMetadataFile(task, container) } - // update the container health information - if container.Container.HealthStatusShouldBeReported() { - container.Container.SetHealthStatus(metadata.Health) - } } if currentState > container.Container.GetKnownStatus() { // update the container known status container.Container.SetKnownStatus(currentState) } + // Update task ExecutionStoppedAt timestamp + task.RecordExecutionStoppedAt(container.Container) } // CheckTaskState inspects the state of all containers within a task and writes diff --git a/agent/engine/docker_task_engine_test.go b/agent/engine/docker_task_engine_test.go index 1f7a8268d5..a91ff9ea13 100644 --- a/agent/engine/docker_task_engine_test.go +++ b/agent/engine/docker_task_engine_test.go @@ -1720,3 +1720,126 @@ func TestHandleDockerHealthEvent(t *testing.T) { }) assert.Equal(t, testContainer.Health.Status, api.ContainerHealthy) } + +func TestContainerMetadataUpdatedOnRestart(t *testing.T) { + + dockerID := "dockerID_created" + labels := map[string]string{ + "name": "metadata", + } + testCases := []struct { + stage string + status api.ContainerStatus + created time.Time + started time.Time + finished time.Time + portBindings []api.PortBinding + exitCode *int + err DockerStateError + }{ + { + stage: "created", + status: api.ContainerCreated, + created: time.Now(), + }, + { + stage: "started", + status: api.ContainerRunning, + started: time.Now(), + portBindings: []api.PortBinding{ + { + ContainerPort: 80, + HostPort: 80, + BindIP: "0.0.0.0/0", + Protocol: api.TransportProtocolTCP, + }, + }, + }, + { + stage: "stopped", + finished: time.Now(), + exitCode: aws.Int(1), + }, + { + stage: "failed", + status: api.ContainerStopped, + err: NewDockerStateError("error"), + exitCode: aws.Int(1), + }, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("Agent restarted during container: %s", tc.stage), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + ctrl, client, _, taskEngine, _, imageManager, _ := mocks(t, ctx, &defaultConfig) + defer ctrl.Finish() + dockerContainer := &api.DockerContainer{ + DockerID: dockerID, + DockerName: fmt.Sprintf("docker%s", tc.stage), + Container: &api.Container{}, + } + task := &api.Task{} + + if tc.stage == "created" { + dockerContainer.Container.MountPoints = []api.MountPoint{ + { + SourceVolume: "empty", + ContainerPath: "container", + }, + } + dockerContainer.DockerID = "" + task.Volumes = []api.TaskVolume{ + { + Name: "empty", + Volume: &api.EmptyHostVolume{}, + }, + } + client.EXPECT().InspectContainer(dockerContainer.DockerName, gomock.Any()).Return(&docker.Container{ + ID: dockerID, + Config: &docker.Config{ + Labels: labels, + }, + Created: tc.created, + Volumes: map[string]string{ + "container": "tmp", + }, + }, nil) + imageManager.EXPECT().RecordContainerReference(dockerContainer.Container).AnyTimes() + } else { + client.EXPECT().DescribeContainer(dockerID).Return(tc.status, DockerContainerMetadata{ + Labels: labels, + DockerID: dockerID, + CreatedAt: tc.created, + StartedAt: tc.started, + FinishedAt: tc.finished, + PortBindings: tc.portBindings, + ExitCode: tc.exitCode, + Error: tc.err, + }) + imageManager.EXPECT().RecordContainerReference(dockerContainer.Container).AnyTimes() + } + + taskEngine.(*DockerTaskEngine).synchronizeContainerStatus(dockerContainer, task) + assert.Equal(t, labels, dockerContainer.Container.GetLabels()) + assert.Equal(t, tc.created, dockerContainer.Container.GetCreatedAt()) + assert.Equal(t, tc.started, dockerContainer.Container.GetStartedAt()) + assert.Equal(t, tc.finished, dockerContainer.Container.GetFinishedAt()) + if tc.stage == "created" { + assert.Equal(t, "tmp", task.Volumes[0].Volume.SourcePath()) + } + if tc.stage == "started" { + assert.Equal(t, uint16(80), dockerContainer.Container.KnownPortBindings[0].ContainerPort) + } + if tc.stage == "finished" { + assert.False(t, task.GetExecutionStoppedAt().IsZero()) + assert.Equal(t, tc.exitCode, dockerContainer.Container.GetKnownExitCode()) + } + if tc.stage == "failed" { + assert.Equal(t, tc.exitCode, dockerContainer.Container.GetKnownExitCode()) + assert.NotNil(t, dockerContainer.Container.ApplyingError) + } + }) + } +} diff --git a/agent/engine/errors.go b/agent/engine/errors.go index 2f71b90298..620fd2fb7e 100644 --- a/agent/engine/errors.go +++ b/agent/engine/errors.go @@ -24,6 +24,7 @@ import ( const ( dockerTimeoutErrorName = "DockerTimeoutError" cannotInspectContainerErrorName = "CannotInspectContainerError" + cannotDescribeContainerError = "CannotDescribeContainerError" ) // engineError wraps the error interface with an identifier method that @@ -291,7 +292,7 @@ func (err CannotDescribeContainerError) Error() string { } func (err CannotDescribeContainerError) ErrorName() string { - return "CannotDescribeContainerError" + return cannotDescribeContainerError } // CannotListContainersError indicates any error when trying to list containers diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index 6d90654512..23069c9a31 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -29,6 +29,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/statemanager" utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/agent/utils/ttime" + "github.com/cihub/seelog" ) @@ -365,9 +366,7 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC // Update the container to be known currentKnownStatus := containerKnownStatus container.SetKnownStatus(event.Status) - container.SetCreatedAt(event.CreatedAt) - container.SetStartedAt(event.StartedAt) - container.SetFinishedAt(event.FinishedAt) + updateContainerMetadata(&event.DockerContainerMetadata, container, mtask.Task) if event.Error != nil { proceedAnyway := mtask.handleEventError(containerChange, currentKnownStatus) @@ -381,7 +380,7 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC container.SetHealthStatus(event.Health) } - mtask.recordExecutionStoppedAt(container) + mtask.RecordExecutionStoppedAt(container) seelog.Debugf("Managed task [%s]: sending container change event to tcs, container: [%s(%s)], status: %s", mtask.Arn, container.Name, event.DockerID, event.Status.String()) err := mtask.containerChangeEventStream.WriteToEventStream(event) @@ -390,18 +389,6 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC mtask.Arn, container.Name, err) } - if event.ExitCode != nil && event.ExitCode != container.GetKnownExitCode() { - container.SetKnownExitCode(event.ExitCode) - } - if event.PortBindings != nil { - container.KnownPortBindings = event.PortBindings - } - if event.Volumes != nil { - seelog.Warnf("Managed task [%s]: updating mounts for container [%s]: %v", - mtask.Arn, container.Name, event.Volumes) - mtask.UpdateMountPoints(container, event.Volumes) - } - mtask.emitContainerEvent(mtask.Task, container, "") if mtask.UpdateStatus() { seelog.Debugf("Managed task [%s]: container change also resulted in task change [%s]: [%s]", @@ -413,25 +400,6 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC mtask.Arn, container.Name, mtask.GetDesiredStatus().String()) } -func (mtask *managedTask) recordExecutionStoppedAt(container *api.Container) { - if !container.Essential { - return - } - if container.GetKnownStatus() != api.ContainerStopped { - return - } - // If the essential container is stopped, set the ExecutionStoppedAt timestamp - now := mtask.time().Now() - ok := mtask.Task.SetExecutionStoppedAt(now) - if !ok { - // ExecutionStoppedAt was already recorded. Nothing to left to do here - return - } - seelog.Infof("Managed task [%s]: recording execution stopped time. Essential container [%s] stopped at: %s", - mtask.Arn, container.Name, now.String()) - -} - func (mtask *managedTask) emitTaskEvent(task *api.Task, reason string) { event, err := api.NewTaskStateChangeEvent(task, reason) if err != nil { diff --git a/agent/functional_tests/tests/functionaltests_test.go b/agent/functional_tests/tests/functionaltests_test.go index 3b714eb08f..7d55ef7c71 100644 --- a/agent/functional_tests/tests/functionaltests_test.go +++ b/agent/functional_tests/tests/functionaltests_test.go @@ -27,6 +27,7 @@ import ( ecsapi "github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs" . "github.com/aws/amazon-ecs-agent/agent/functional_tests/util" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/stretchr/testify/assert"