diff --git a/agent/acs/session/payload_responder.go b/agent/acs/session/payload_responder.go index 4656c5f3af4..39a432cc241 100644 --- a/agent/acs/session/payload_responder.go +++ b/agent/acs/session/payload_responder.go @@ -22,6 +22,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/engine" "github.com/aws/amazon-ecs-agent/agent/eventhandler" "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" + apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource" apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" @@ -106,6 +107,16 @@ func (pmHandler *payloadMessageHandler) addPayloadTasks(payload *ecsacs.PayloadM allTasksOK = false continue } + + // Note: If we receive an EBS-backed task, we'll also received an incomplete volume configuration in the list of Volumes + // To accomodate this, we'll first check if the task IS EBS-backed then we'll mark the corresponding Volume object to be + // of type "attachment". This volume object will be replaced by the newly created EBS volume configuration when we parse + // through the task attachments. + volName, ok := hasEBSAttachment(task) + if ok { + initializeAttachmentTypeVolume(task, volName) + } + apiTask, err := apitask.TaskFromACS(task, payload) if err != nil { pmHandler.handleInvalidTask(task, err, payload) @@ -306,3 +317,26 @@ func isTaskStatusStopped(status apitaskstatus.TaskStatus) bool { func isTaskStatusNotStopped(status apitaskstatus.TaskStatus) bool { return status != apitaskstatus.TaskStopped } + +func hasEBSAttachment(acsTask *ecsacs.Task) (string, bool) { + // TODO: This will only work if there's one EBS volume per task. If we there is a case where we have multi-attach for a task, this needs to be modified + for _, attachment := range acsTask.Attachments { + if *attachment.AttachmentType == apiresource.EBSTaskAttach { + for _, property := range attachment.AttachmentProperties { + if *property.Name == apiresource.VolumeNameKey { + return *property.Value, true + } + } + } + } + return "", false +} + +func initializeAttachmentTypeVolume(acsTask *ecsacs.Task, volName string) { + for _, volume := range acsTask.Volumes { + if *volume.Name == volName && volume.Type == nil { + newType := "attachment" + volume.Type = &newType + } + } +} diff --git a/agent/acs/session/payload_responder_test.go b/agent/acs/session/payload_responder_test.go index 5e9b149315f..cd89e52853f 100644 --- a/agent/acs/session/payload_responder_test.go +++ b/agent/acs/session/payload_responder_test.go @@ -725,6 +725,12 @@ func TestHandlePayloadMessageAddedEBSToTask(t *testing.T) { AttachmentType: aws.String(apiresource.EBSTaskAttach), }, }, + Volumes: []*ecsacs.Volume{ + { + Name: aws.String(taskresourcevolume.TestVolumeName), + Type: aws.String(apitask.AttachmentType), + }, + }, }, } diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 689f1c20282..17cfb13a10d 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -43,7 +43,6 @@ import ( "github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs" apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status" apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors" - apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource" apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status" "github.com/aws/amazon-ecs-agent/ecs-agent/credentials" "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" @@ -332,7 +331,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task, return task, nil } -// TODO: Add unit test func (task *Task) RemoveVolume(index int) { task.lock.Lock() defer task.lock.Unlock() @@ -343,7 +341,6 @@ func (task *Task) removeVolumeUnsafe(index int) { if index < 0 || index >= len(task.Volumes) { return } - // temp := task.Volumes[:1] out := make([]TaskVolume, 0) out = append(out, task.Volumes[:index]...) out = append(out, task.Volumes[index+1:]...) @@ -352,17 +349,6 @@ func (task *Task) removeVolumeUnsafe(index int) { func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error { // TODO: Have EBS volumes use the DockerVolumeConfig to create the mountpoint - if task.IsEBSTaskAttachEnabled() { - ebsVolumes := task.GetEBSVolumeNames() - for index, tv := range task.Volumes { - volumeName := tv.Name - volumeType := tv.Type - if ebsVolumes[volumeName] && volumeType != apiresource.EBSTaskAttach { - task.RemoveVolume(index) - } - } - } - err := task.initializeDockerLocalVolumes(dockerClient, ctx) if err != nil { return apierrors.NewResourceInitError(task.Arn, err) @@ -3485,28 +3471,6 @@ func (task *Task) isEBSTaskAttachEnabledUnsafe() bool { return false } -// TODO: Add unit tests -func (task *Task) GetEBSVolumeNames() map[string]bool { - task.lock.RLock() - defer task.lock.RUnlock() - return task.getEBSVolumeNamesUnsafe() -} - -func (task *Task) getEBSVolumeNamesUnsafe() map[string]bool { - volNames := map[string]bool{} - for _, tv := range task.Volumes { - switch tv.Volume.(type) { - case *taskresourcevolume.EBSTaskVolumeConfig: - logger.Debug("found ebs volume config") - ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig) - volNames[ebsCfg.VolumeName] = true - default: - continue - } - } - return volNames -} - func (task *Task) IsServiceConnectBridgeModeApplicationContainer(container *apicontainer.Container) bool { return container.GetNetworkModeFromHostConfig() == "container" && task.IsServiceConnectEnabled() } diff --git a/agent/api/task/task_attachment_handler.go b/agent/api/task/task_attachment_handler.go index bcaef0bfbeb..4d48ad840ef 100644 --- a/agent/api/task/task_attachment_handler.go +++ b/agent/api/task/task_attachment_handler.go @@ -110,6 +110,7 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error { task.ServiceConnectConfig = scHandler.(*ServiceConnectAttachmentHandler).scConfig } if len(ebsVolumeAttachments) > 0 { + ebsVolumes := make(map[string]bool) for _, attachment := range ebsVolumeAttachments { ebs, err := taskresourcevolume.ParseEBSTaskVolumeAttachment(attachment) if err != nil { @@ -120,8 +121,17 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error { Type: apiresource.EBSTaskAttach, Volume: ebs, } + ebsVolumes[ebs.VolumeName] = true task.Volumes = append(task.Volumes, taskVolume) } + // We're removing all incorrect volume configuration that were intially passed over from ACS + for index, tv := range task.Volumes { + volumeName := tv.Name + volumeType := tv.Type + if ebsVolumes[volumeName] && volumeType != apiresource.EBSTaskAttach { + task.RemoveVolume(index) + } + } } } return nil diff --git a/agent/api/task/task_attachment_handler_test.go b/agent/api/task/task_attachment_handler_test.go index 98e007ad267..0ab39ef24be 100644 --- a/agent/api/task/task_attachment_handler_test.go +++ b/agent/api/task/task_attachment_handler_test.go @@ -274,6 +274,12 @@ func TestHandleTaskAttachmentWithEBSVolumeAttachment(t *testing.T) { AttachmentType: stringToPointer(apiresource.EBSTaskAttach), }, }, + Volumes: []*ecsacs.Volume{ + { + Name: strptr("test-volume"), + Type: strptr(AttachmentType), + }, + }, } testTask := &Task{} err := handleTaskAttachments(testAcsTask, testTask) diff --git a/agent/api/task/task_test.go b/agent/api/task/task_test.go index 7bddfb78e31..f2c91348204 100644 --- a/agent/api/task/task_test.go +++ b/agent/api/task/task_test.go @@ -4691,6 +4691,15 @@ func TestTaskWithEBSVolumeAttachment(t *testing.T) { AttachmentType: strptr(apiresource.EBSTaskAttach), }, }, + Volumes: []*ecsacs.Volume{ + { + Name: strptr("test-volume"), + Type: strptr(AttachmentType), + Host: &ecsacs.HostVolumeProperties{ + SourcePath: strptr("/host/path"), + }, + }, + }, } testExpectedEBSCfg := &taskresourcevolume.EBSTaskVolumeConfig{ @@ -5234,3 +5243,38 @@ func TestToHostResources(t *testing.T) { assert.Equal(t, len(tc.expectedResources["PORTS_UDP"].StringSetValue), len(calcResources["PORTS_UDP"].StringSetValue), "Error converting task UDP port resources") } } + +func TestRemoveVolumes(t *testing.T) { + task := &Task{ + Volumes: []TaskVolume{ + { + Name: "volName", + Type: "host", + Volume: &taskresourcevolume.FSHostVolume{ + FSSourcePath: "/host/path", + }, + }, + }, + } + task.RemoveVolume(0) + assert.Equal(t, len(task.Volumes), 0) +} + +func TestRemoveVolumeIndexOutOfBounds(t *testing.T) { + task := &Task{ + Volumes: []TaskVolume{ + { + Name: "volName", + Type: "host", + Volume: &taskresourcevolume.FSHostVolume{ + FSSourcePath: "/host/path", + }, + }, + }, + } + task.RemoveVolume(1) + assert.Equal(t, len(task.Volumes), 1) + + task.RemoveVolume(-1) + assert.Equal(t, len(task.Volumes), 1) +} diff --git a/agent/api/task/taskvolume.go b/agent/api/task/taskvolume.go index e230fe1d96f..0b08555f3f7 100644 --- a/agent/api/task/taskvolume.go +++ b/agent/api/task/taskvolume.go @@ -32,6 +32,7 @@ const ( DockerVolumeType = "docker" EFSVolumeType = "efs" FSxWindowsFileServerVolumeType = "fsxWindowsFileServer" + AttachmentType = "attachment" ) // TaskVolume is a definition of all the volumes available for containers to @@ -78,6 +79,9 @@ func (tv *TaskVolume) UnmarshalJSON(b []byte) error { return tv.unmarshalFSxWindowsFileServerVolume(intermediate["fsxWindowsFileServerVolumeConfiguration"]) case apiresource.EBSTaskAttach: return tv.unmarshalEBSVolume(intermediate["ebsVolumeConfiguration"]) + case AttachmentType: + seelog.Warn("Obtaining the volume configuration from task attachments.") + return nil default: return errors.Errorf("unrecognized volume type: %q", tv.Type) }