Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix container metadata information missing during agent restart #1183

Merged
merged 3 commits into from
Jan 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions agent/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,3 +1103,23 @@ func (task *Task) GetID() (string, error) {

return resourceSplit[1], nil
}

// RecordExecutionStoppedAt checks if this is an essential container stopped
// and set the task executionStoppedAt timestamps
func (task *Task) RecordExecutionStoppedAt(container *Container) {
if !container.Essential {
return
}
if container.GetKnownStatus() != ContainerStopped {
return
}
// If the essential container is stopped, set the ExecutionStoppedAt timestamp
now := time.Now()
ok := task.SetExecutionStoppedAt(now)
if !ok {
// ExecutionStoppedAt was already recorded. Nothing to left to do here
return
}
seelog.Infof("Task [%s]: recording execution stopped time. Essential container [%s] stopped at: %s",
task.Arn, container.Name, now.String())
}
40 changes: 40 additions & 0 deletions agent/api/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"encoding/json"
"fmt"
"reflect"
"runtime"
"testing"
Expand Down Expand Up @@ -1386,3 +1387,42 @@ func TestContainerHealthConfig(t *testing.T) {
assert.Equal(t, config.Healthcheck.Timeout, 4*time.Second)
assert.Equal(t, config.Healthcheck.StartPeriod, 1*time.Minute)
}

func TestRecordExecutionStoppedAt(t *testing.T) {
testCases := []struct {
essential bool
status ContainerStatus
executionStoppedAtSet bool
msg string
}{
{
essential: true,
status: ContainerStopped,
executionStoppedAtSet: true,
msg: "essential container stopped should have executionStoppedAt set",
},
{
essential: false,
status: ContainerStopped,
executionStoppedAtSet: false,
msg: "non essential container stopped should not cause executionStoppedAt set",
},
{
essential: true,
status: ContainerRunning,
executionStoppedAtSet: false,
msg: "essential non-stop status change should not cause executionStoppedAt set",
},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("Container status: %s, essential: %v, executionStoppedAt should be set: %v", tc.status, tc.essential, tc.executionStoppedAtSet), func(t *testing.T) {
task := &Task{}
task.RecordExecutionStoppedAt(&Container{
Essential: tc.essential,
KnownStatusUnsafe: tc.status,
})
assert.Equal(t, !tc.executionStoppedAtSet, task.GetExecutionStoppedAt().IsZero(), tc.msg)
})
}
}
69 changes: 49 additions & 20 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,37 @@ func (engine *DockerTaskEngine) synchronizeState() {
engine.saver.Save()
}

// updateContainerMetadata sets the container metadata from the docker inspect
func updateContainerMetadata(metadata *DockerContainerMetadata, container *api.Container, task *api.Task) {
container.SetCreatedAt(metadata.CreatedAt)
container.SetStartedAt(metadata.StartedAt)
container.SetFinishedAt(metadata.FinishedAt)

// Set the labels if it's not set
if len(metadata.Labels) != 0 && len(container.GetLabels()) == 0 {
container.SetLabels(metadata.Labels)
}

// Update Volume
if metadata.Volumes != nil {
task.UpdateMountPoints(container, metadata.Volumes)
}

// Set Exitcode if it's not set
if metadata.ExitCode != nil {
container.SetKnownExitCode(metadata.ExitCode)
}

// Set port mappings
if len(metadata.PortBindings) != 0 && len(container.KnownPortBindings) == 0 {
container.KnownPortBindings = metadata.PortBindings
}
// update the container health information
if container.HealthStatusShouldBeReported() {
container.SetHealthStatus(metadata.Health)
}
}

// synchronizeContainerStatus checks and updates the container status with docker
func (engine *DockerTaskEngine) synchronizeContainerStatus(container *api.DockerContainer, task *api.Task) {
if container.DockerID == "" {
Expand All @@ -301,51 +332,49 @@ func (engine *DockerTaskEngine) synchronizeContainerStatus(container *api.Docker
seelog.Warnf("Task engine [%s]: could not find matching container for expected name [%s]: %v",
task.Arn, container.DockerName, err)
} else {
// update the container metadata in case the container was created during agent restart
metadata := metadataFromContainer(describedContainer)
updateContainerMetadata(&metadata, container.Container, task)
container.DockerID = describedContainer.ID

container.Container.SetKnownStatus(dockerStateToState(describedContainer.State))
container.Container.SetCreatedAt(describedContainer.Created)
container.Container.SetStartedAt(describedContainer.State.StartedAt)
container.Container.SetFinishedAt(describedContainer.State.FinishedAt)
// update mappings that need dockerid
engine.state.AddContainer(container, task)
engine.imageManager.RecordContainerReference(container.Container)
container.Container.SetLabels(describedContainer.Config.Labels)
// update the container health information
if container.Container.HealthStatusShouldBeReported() {
dockerContainerMetadata := metadataFromContainer(describedContainer)
container.Container.SetHealthStatus(dockerContainerMetadata.Health)
}
}
return
}

currentState, metadata := engine.client.DescribeContainer(container.DockerID)
if metadata.Error != nil {
currentState = api.ContainerStopped
if !container.Container.KnownTerminal() {
container.Container.ApplyingError = api.NewNamedError(&ContainerVanishedError{})
// If this is a Docker API error
if metadata.Error.ErrorName() == cannotDescribeContainerError {
seelog.Warnf("Task engine [%s]: could not describe previously known container [id=%s; name=%s]; assuming dead: %v",
task.Arn, container.DockerID, container.DockerName, metadata.Error)
engine.imageManager.RemoveContainerReferenceFromImageState(container.Container)
if !container.Container.KnownTerminal() {
container.Container.ApplyingError = api.NewNamedError(&ContainerVanishedError{})
engine.imageManager.RemoveContainerReferenceFromImageState(container.Container)
}
} else {
// If this is a container state error
updateContainerMetadata(&metadata, container.Container, task)
container.Container.ApplyingError = api.NewNamedError(metadata.Error)
}
} else {
container.Container.SetLabels(metadata.Labels)
container.Container.SetCreatedAt(metadata.CreatedAt)
container.Container.SetStartedAt(metadata.StartedAt)
container.Container.SetFinishedAt(metadata.FinishedAt)
// update the container metadata in case the container status/metadata changed during agent restart
updateContainerMetadata(&metadata, container.Container, task)
engine.imageManager.RecordContainerReference(container.Container)
if engine.cfg.ContainerMetadataEnabled && !container.Container.IsMetadataFileUpdated() {
go engine.updateMetadataFile(task, container)
}
// update the container health information
if container.Container.HealthStatusShouldBeReported() {
container.Container.SetHealthStatus(metadata.Health)
}
}
if currentState > container.Container.GetKnownStatus() {
// update the container known status
container.Container.SetKnownStatus(currentState)
}
// Update task ExecutionStoppedAt timestamp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase this from dev once #1217 is merged. This is now moved into its own method. May be it's better to review this after that PR is merged

task.RecordExecutionStoppedAt(container.Container)
}

// CheckTaskState inspects the state of all containers within a task and writes
Expand Down
123 changes: 123 additions & 0 deletions agent/engine/docker_task_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,3 +1720,126 @@ func TestHandleDockerHealthEvent(t *testing.T) {
})
assert.Equal(t, testContainer.Health.Status, api.ContainerHealthy)
}

func TestContainerMetadataUpdatedOnRestart(t *testing.T) {

dockerID := "dockerID_created"
labels := map[string]string{
"name": "metadata",
}
testCases := []struct {
stage string
status api.ContainerStatus
created time.Time
started time.Time
finished time.Time
portBindings []api.PortBinding
exitCode *int
err DockerStateError
}{
{
stage: "created",
status: api.ContainerCreated,
created: time.Now(),
},
{
stage: "started",
status: api.ContainerRunning,
started: time.Now(),
portBindings: []api.PortBinding{
{
ContainerPort: 80,
HostPort: 80,
BindIP: "0.0.0.0/0",
Protocol: api.TransportProtocolTCP,
},
},
},
{
stage: "stopped",
finished: time.Now(),
exitCode: aws.Int(1),
},
{
stage: "failed",
status: api.ContainerStopped,
err: NewDockerStateError("error"),
exitCode: aws.Int(1),
},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("Agent restarted during container: %s", tc.stage), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

ctrl, client, _, taskEngine, _, imageManager, _ := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()
dockerContainer := &api.DockerContainer{
DockerID: dockerID,
DockerName: fmt.Sprintf("docker%s", tc.stage),
Container: &api.Container{},
}
task := &api.Task{}

if tc.stage == "created" {
dockerContainer.Container.MountPoints = []api.MountPoint{
{
SourceVolume: "empty",
ContainerPath: "container",
},
}
dockerContainer.DockerID = ""
task.Volumes = []api.TaskVolume{
{
Name: "empty",
Volume: &api.EmptyHostVolume{},
},
}
client.EXPECT().InspectContainer(dockerContainer.DockerName, gomock.Any()).Return(&docker.Container{
ID: dockerID,
Config: &docker.Config{
Labels: labels,
},
Created: tc.created,
Volumes: map[string]string{
"container": "tmp",
},
}, nil)
imageManager.EXPECT().RecordContainerReference(dockerContainer.Container).AnyTimes()
} else {
client.EXPECT().DescribeContainer(dockerID).Return(tc.status, DockerContainerMetadata{
Labels: labels,
DockerID: dockerID,
CreatedAt: tc.created,
StartedAt: tc.started,
FinishedAt: tc.finished,
PortBindings: tc.portBindings,
ExitCode: tc.exitCode,
Error: tc.err,
})
imageManager.EXPECT().RecordContainerReference(dockerContainer.Container).AnyTimes()
}

taskEngine.(*DockerTaskEngine).synchronizeContainerStatus(dockerContainer, task)
assert.Equal(t, labels, dockerContainer.Container.GetLabels())
assert.Equal(t, tc.created, dockerContainer.Container.GetCreatedAt())
assert.Equal(t, tc.started, dockerContainer.Container.GetStartedAt())
assert.Equal(t, tc.finished, dockerContainer.Container.GetFinishedAt())
if tc.stage == "created" {
assert.Equal(t, "tmp", task.Volumes[0].Volume.SourcePath())
}
if tc.stage == "started" {
assert.Equal(t, uint16(80), dockerContainer.Container.KnownPortBindings[0].ContainerPort)
}
if tc.stage == "finished" {
assert.False(t, task.GetExecutionStoppedAt().IsZero())
assert.Equal(t, tc.exitCode, dockerContainer.Container.GetKnownExitCode())
}
if tc.stage == "failed" {
assert.Equal(t, tc.exitCode, dockerContainer.Container.GetKnownExitCode())
assert.NotNil(t, dockerContainer.Container.ApplyingError)
}
})
}
}
3 changes: 2 additions & 1 deletion agent/engine/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
dockerTimeoutErrorName = "DockerTimeoutError"
cannotInspectContainerErrorName = "CannotInspectContainerError"
cannotDescribeContainerError = "CannotDescribeContainerError"
)

// engineError wraps the error interface with an identifier method that
Expand Down Expand Up @@ -291,7 +292,7 @@ func (err CannotDescribeContainerError) Error() string {
}

func (err CannotDescribeContainerError) ErrorName() string {
return "CannotDescribeContainerError"
return cannotDescribeContainerError
}

// CannotListContainersError indicates any error when trying to list containers
Expand Down
38 changes: 3 additions & 35 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/statemanager"
utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync"
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"

"github.com/cihub/seelog"
)

Expand Down Expand Up @@ -365,9 +366,7 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC
// Update the container to be known
currentKnownStatus := containerKnownStatus
container.SetKnownStatus(event.Status)
container.SetCreatedAt(event.CreatedAt)
container.SetStartedAt(event.StartedAt)
container.SetFinishedAt(event.FinishedAt)
updateContainerMetadata(&event.DockerContainerMetadata, container, mtask.Task)

if event.Error != nil {
proceedAnyway := mtask.handleEventError(containerChange, currentKnownStatus)
Expand All @@ -381,7 +380,7 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC
container.SetHealthStatus(event.Health)
}

mtask.recordExecutionStoppedAt(container)
mtask.RecordExecutionStoppedAt(container)
seelog.Debugf("Managed task [%s]: sending container change event to tcs, container: [%s(%s)], status: %s",
mtask.Arn, container.Name, event.DockerID, event.Status.String())
err := mtask.containerChangeEventStream.WriteToEventStream(event)
Expand All @@ -390,18 +389,6 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC
mtask.Arn, container.Name, err)
}

if event.ExitCode != nil && event.ExitCode != container.GetKnownExitCode() {
container.SetKnownExitCode(event.ExitCode)
}
if event.PortBindings != nil {
container.KnownPortBindings = event.PortBindings
}
if event.Volumes != nil {
seelog.Warnf("Managed task [%s]: updating mounts for container [%s]: %v",
mtask.Arn, container.Name, event.Volumes)
mtask.UpdateMountPoints(container, event.Volumes)
}

mtask.emitContainerEvent(mtask.Task, container, "")
if mtask.UpdateStatus() {
seelog.Debugf("Managed task [%s]: container change also resulted in task change [%s]: [%s]",
Expand All @@ -413,25 +400,6 @@ func (mtask *managedTask) handleContainerChange(containerChange dockerContainerC
mtask.Arn, container.Name, mtask.GetDesiredStatus().String())
}

func (mtask *managedTask) recordExecutionStoppedAt(container *api.Container) {
if !container.Essential {
return
}
if container.GetKnownStatus() != api.ContainerStopped {
return
}
// If the essential container is stopped, set the ExecutionStoppedAt timestamp
now := mtask.time().Now()
ok := mtask.Task.SetExecutionStoppedAt(now)
if !ok {
// ExecutionStoppedAt was already recorded. Nothing to left to do here
return
}
seelog.Infof("Managed task [%s]: recording execution stopped time. Essential container [%s] stopped at: %s",
mtask.Arn, container.Name, now.String())

}

func (mtask *managedTask) emitTaskEvent(task *api.Task, reason string) {
event, err := api.NewTaskStateChangeEvent(task, reason)
if err != nil {
Expand Down
Loading