Skip to content

Commit

Permalink
Fixing task volumes from payload for EBS-backed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
mye956 committed Oct 20, 2023
1 parent 3500ac2 commit 7ecf228
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 36 deletions.
34 changes: 34 additions & 0 deletions agent/acs/session/payload_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
"github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs"
apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
Expand Down Expand Up @@ -106,6 +107,16 @@ func (pmHandler *payloadMessageHandler) addPayloadTasks(payload *ecsacs.PayloadM
allTasksOK = false
continue
}

// Note: If we receive an EBS-backed task, we'll also received an incomplete volume configuration in the list of Volumes
// To accomodate this, we'll first check if the task IS EBS-backed then we'll mark the corresponding Volume object to be
// of type "attachment". This volume object will be replaced by the newly created EBS volume configuration when we parse
// through the task attachments.
volName, ok := hasEBSAttachment(task)
if ok {
initializeAttachmentTypeVolume(task, volName)
}

apiTask, err := apitask.TaskFromACS(task, payload)
if err != nil {
pmHandler.handleInvalidTask(task, err, payload)
Expand Down Expand Up @@ -306,3 +317,26 @@ func isTaskStatusStopped(status apitaskstatus.TaskStatus) bool {
func isTaskStatusNotStopped(status apitaskstatus.TaskStatus) bool {
return status != apitaskstatus.TaskStopped
}

func hasEBSAttachment(acsTask *ecsacs.Task) (string, bool) {
// TODO: This will only work if there's one EBS volume per task. If we there is a case where we have multi-attach for a task, this needs to be modified
for _, attachment := range acsTask.Attachments {
if *attachment.AttachmentType == apiresource.EBSTaskAttach {
for _, property := range attachment.AttachmentProperties {
if *property.Name == apiresource.VolumeNameKey {
return *property.Value, true
}
}
}
}
return "", false
}

func initializeAttachmentTypeVolume(acsTask *ecsacs.Task, volName string) {
for _, volume := range acsTask.Volumes {
if *volume.Name == volName && volume.Type == nil {
newType := "attachment"
volume.Type = &newType
}
}
}
6 changes: 6 additions & 0 deletions agent/acs/session/payload_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,12 @@ func TestHandlePayloadMessageAddedEBSToTask(t *testing.T) {
AttachmentType: aws.String(apiresource.EBSTaskAttach),
},
},
Volumes: []*ecsacs.Volume{
{
Name: aws.String(taskresourcevolume.TestVolumeName),
Type: aws.String(apitask.AttachmentType),
},
},
},
}

Expand Down
36 changes: 0 additions & 36 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/aws/amazon-ecs-agent/ecs-agent/acs/model/ecsacs"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
apiresource "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
Expand Down Expand Up @@ -332,7 +331,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
return task, nil
}

// TODO: Add unit test
func (task *Task) RemoveVolume(index int) {
task.lock.Lock()
defer task.lock.Unlock()
Expand All @@ -343,7 +341,6 @@ func (task *Task) removeVolumeUnsafe(index int) {
if index < 0 || index >= len(task.Volumes) {
return
}
// temp := task.Volumes[:1]
out := make([]TaskVolume, 0)
out = append(out, task.Volumes[:index]...)
out = append(out, task.Volumes[index+1:]...)
Expand All @@ -352,17 +349,6 @@ func (task *Task) removeVolumeUnsafe(index int) {

func (task *Task) initializeVolumes(cfg *config.Config, dockerClient dockerapi.DockerClient, ctx context.Context) error {
// TODO: Have EBS volumes use the DockerVolumeConfig to create the mountpoint
if task.IsEBSTaskAttachEnabled() {
ebsVolumes := task.GetEBSVolumeNames()
for index, tv := range task.Volumes {
volumeName := tv.Name
volumeType := tv.Type
if ebsVolumes[volumeName] && volumeType != apiresource.EBSTaskAttach {
task.RemoveVolume(index)
}
}
}

err := task.initializeDockerLocalVolumes(dockerClient, ctx)
if err != nil {
return apierrors.NewResourceInitError(task.Arn, err)
Expand Down Expand Up @@ -3485,28 +3471,6 @@ func (task *Task) isEBSTaskAttachEnabledUnsafe() bool {
return false
}

// TODO: Add unit tests
func (task *Task) GetEBSVolumeNames() map[string]bool {
task.lock.RLock()
defer task.lock.RUnlock()
return task.getEBSVolumeNamesUnsafe()
}

func (task *Task) getEBSVolumeNamesUnsafe() map[string]bool {
volNames := map[string]bool{}
for _, tv := range task.Volumes {
switch tv.Volume.(type) {
case *taskresourcevolume.EBSTaskVolumeConfig:
logger.Debug("found ebs volume config")
ebsCfg := tv.Volume.(*taskresourcevolume.EBSTaskVolumeConfig)
volNames[ebsCfg.VolumeName] = true
default:
continue
}
}
return volNames
}

func (task *Task) IsServiceConnectBridgeModeApplicationContainer(container *apicontainer.Container) bool {
return container.GetNetworkModeFromHostConfig() == "container" && task.IsServiceConnectEnabled()
}
Expand Down
10 changes: 10 additions & 0 deletions agent/api/task/task_attachment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
task.ServiceConnectConfig = scHandler.(*ServiceConnectAttachmentHandler).scConfig
}
if len(ebsVolumeAttachments) > 0 {
ebsVolumes := make(map[string]bool)
for _, attachment := range ebsVolumeAttachments {
ebs, err := taskresourcevolume.ParseEBSTaskVolumeAttachment(attachment)
if err != nil {
Expand All @@ -120,8 +121,17 @@ func handleTaskAttachments(acsTask *ecsacs.Task, task *Task) error {
Type: apiresource.EBSTaskAttach,
Volume: ebs,
}
ebsVolumes[ebs.VolumeName] = true
task.Volumes = append(task.Volumes, taskVolume)
}
// We're removing all incorrect volume configuration that were intially passed over from ACS
for index, tv := range task.Volumes {
volumeName := tv.Name
volumeType := tv.Type
if ebsVolumes[volumeName] && volumeType != apiresource.EBSTaskAttach {
task.RemoveVolume(index)
}
}
}
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions agent/api/task/task_attachment_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ func TestHandleTaskAttachmentWithEBSVolumeAttachment(t *testing.T) {
AttachmentType: stringToPointer(apiresource.EBSTaskAttach),
},
},
Volumes: []*ecsacs.Volume{
{
Name: strptr("test-volume"),
Type: strptr(AttachmentType),
},
},
}
testTask := &Task{}
err := handleTaskAttachments(testAcsTask, testTask)
Expand Down
44 changes: 44 additions & 0 deletions agent/api/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4691,6 +4691,15 @@ func TestTaskWithEBSVolumeAttachment(t *testing.T) {
AttachmentType: strptr(apiresource.EBSTaskAttach),
},
},
Volumes: []*ecsacs.Volume{
{
Name: strptr("test-volume"),
Type: strptr(AttachmentType),
Host: &ecsacs.HostVolumeProperties{
SourcePath: strptr("/host/path"),
},
},
},
}

testExpectedEBSCfg := &taskresourcevolume.EBSTaskVolumeConfig{
Expand Down Expand Up @@ -5234,3 +5243,38 @@ func TestToHostResources(t *testing.T) {
assert.Equal(t, len(tc.expectedResources["PORTS_UDP"].StringSetValue), len(calcResources["PORTS_UDP"].StringSetValue), "Error converting task UDP port resources")
}
}

func TestRemoveVolumes(t *testing.T) {
task := &Task{
Volumes: []TaskVolume{
{
Name: "volName",
Type: "host",
Volume: &taskresourcevolume.FSHostVolume{
FSSourcePath: "/host/path",
},
},
},
}
task.RemoveVolume(0)
assert.Equal(t, len(task.Volumes), 0)
}

func TestRemoveVolumeIndexOutOfBounds(t *testing.T) {
task := &Task{
Volumes: []TaskVolume{
{
Name: "volName",
Type: "host",
Volume: &taskresourcevolume.FSHostVolume{
FSSourcePath: "/host/path",
},
},
},
}
task.RemoveVolume(1)
assert.Equal(t, len(task.Volumes), 1)

task.RemoveVolume(-1)
assert.Equal(t, len(task.Volumes), 1)
}
4 changes: 4 additions & 0 deletions agent/api/task/taskvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
DockerVolumeType = "docker"
EFSVolumeType = "efs"
FSxWindowsFileServerVolumeType = "fsxWindowsFileServer"
AttachmentType = "attachment"
)

// TaskVolume is a definition of all the volumes available for containers to
Expand Down Expand Up @@ -78,6 +79,9 @@ func (tv *TaskVolume) UnmarshalJSON(b []byte) error {
return tv.unmarshalFSxWindowsFileServerVolume(intermediate["fsxWindowsFileServerVolumeConfiguration"])
case apiresource.EBSTaskAttach:
return tv.unmarshalEBSVolume(intermediate["ebsVolumeConfiguration"])
case AttachmentType:
seelog.Warn("Obtaining the volume configuration from task attachments.")
return nil
default:
return errors.Errorf("unrecognized volume type: %q", tv.Type)
}
Expand Down

0 comments on commit 7ecf228

Please sign in to comment.