From 2c370496a869c520ff3f0f1fb277d9c32acf0ae3 Mon Sep 17 00:00:00 2001 From: Ray Allan Date: Wed, 11 Oct 2023 23:30:26 +0000 Subject: [PATCH] add missing changes from feature branch --- agent/app/agent.go | 2 +- agent/ebs/watcher.go | 21 +++++++++++++++---- agent/engine/docker_task_engine.go | 11 ++++++++++ .../dockerstate/docker_task_engine_state.go | 4 ++-- 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/agent/app/agent.go b/agent/app/agent.go index a232fc7088..11ef156952 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -1041,7 +1041,7 @@ func (agent *ecsAgent) startACSSession( taskComparer, sequenceNumberAccessor, taskStopper, - nil, + agent.ebsWatcher, updater.NewUpdater(agent.cfg, state, agent.dataClient, taskEngine).AddAgentUpdateHandlers, ) logger.Info("Beginning Polling for updates") diff --git a/agent/ebs/watcher.go b/agent/ebs/watcher.go index 8fd2a0d9ca..d2229ce064 100644 --- a/agent/ebs/watcher.go +++ b/agent/ebs/watcher.go @@ -17,7 +17,9 @@ import ( "context" "errors" "fmt" + "path/filepath" "strconv" + "strings" "time" ecsapi "github.com/aws/amazon-ecs-agent/agent/api" @@ -84,7 +86,6 @@ func (w *EBSWatcher) Start() { w.overrideDeviceName(foundVolumes) if err := w.StageAll(foundVolumes); err != nil { log.Errorf("stage error: %s", err) - continue } // TODO only notify attached for volumes that are successfully staged w.NotifyAttached(foundVolumes) @@ -193,7 +194,10 @@ func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error { log.Debugf("EBS status is already attached, skipping: %v.", ebsAttachment.EBSToString()) continue } - hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey) + + preHostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey) + hostPath := filepath.Join("/mnt/ecs/ebs", preHostPath[strings.LastIndex(preHostPath, "/")+1:]) + filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName) // CSI NodeStage stub required fields @@ -244,6 +248,10 @@ func (w *EBSWatcher) notifyAttachedEBS(volumeId string) { log.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId) return } + if !ebs.IsAttached() { + log.Debugf("EBS status is not attached, skipping: %v.", ebs.EBSToString()) + return + } if ebs.HasExpired() { log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebs.EBSToString()) @@ -314,10 +322,15 @@ func (w *EBSWatcher) emitEBSAttachedEvent(ebsvol *apiebs.ResourceAttachment) { ClusterARN: ebsvol.GetClusterARN(), ContainerInstanceARN: ebsvol.GetContainerInstanceARN(), } + eniWrapper := apieni.ENIAttachment{AttachmentInfo: attachmentInfo} + // TODO update separate out ENI and EBS attachment types in attachment + // handler. For now we use fake task ENI with dummy fields + eniWrapper.AttachmentType = apieni.ENIAttachmentTypeTaskENI + eniWrapper.MACAddress = "ebs1" + eniWrapper.StartTimer(func() {}) attachmentChange := ecsapi.AttachmentStateChange{ - Attachment: &apieni.ENIAttachment{AttachmentInfo: attachmentInfo}, + Attachment: &eniWrapper, } - log.Debugf("Emitting EBS volume attached event for: %v", ebsvol) w.taskEngine.StateChangeEvents() <- attachmentChange } diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 92a7fcd230..8216dd42c2 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -1215,6 +1215,11 @@ func (engine *DockerTaskEngine) GetTaskByArn(arn string) (*apitask.Task, bool) { func (engine *DockerTaskEngine) GetDaemonTask(daemonName string) *apitask.Task { engine.daemonTasksLock.RLock() defer engine.daemonTasksLock.RUnlock() + + logger.Info("getting daemon task", logger.Fields{ + field.Container: daemonName, + }) + if daemon, ok := engine.daemonTasks[daemonName]; ok { return daemon } @@ -1223,6 +1228,12 @@ func (engine *DockerTaskEngine) GetDaemonTask(daemonName string) *apitask.Task { func (engine *DockerTaskEngine) SetDaemonTask(daemonName string, task *apitask.Task) { engine.daemonTasksLock.Lock() + + logger.Info("setting daemon task", logger.Fields{ + field.Container: daemonName, + field.TaskID: task.GetID(), + }) + defer engine.daemonTasksLock.Unlock() engine.daemonTasks[daemonName] = task } diff --git a/agent/engine/dockerstate/docker_task_engine_state.go b/agent/engine/dockerstate/docker_task_engine_state.go index c64f090707..bdbaa5f99a 100644 --- a/agent/engine/dockerstate/docker_task_engine_state.go +++ b/agent/engine/dockerstate/docker_task_engine_state.go @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment { var pendingEBSAttachments []*apiresource.ResourceAttachment for _, v := range state.ebsAttachments { - if !v.IsAttached() && !v.IsSent() { + if !v.IsAttached() || !v.IsSent() { pendingEBSAttachments = append(pendingEBSAttachments, v) } } @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment { pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment) for k, v := range state.ebsAttachments { - if !v.IsAttached() && !v.IsSent() { + if !v.IsAttached() || !v.IsSent() { pendingEBSAttachments[k] = v } }