Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file watcher for Appnet agent image update #3435

Merged
merged 1 commit into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func testDoStartHappyPathWithConditions(t *testing.T, blackholed bool, warmPools
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
mockServiceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
imageManager.EXPECT().StartImageCleanupProcess(gomock.Any()).MaxTimes(1)
dockerClient.EXPECT().ListContainers(gomock.Any(), gomock.Any(), gomock.Any()).Return(
dockerapi.ListContainersResponse{}).AnyTimes()
Expand Down
3 changes: 3 additions & 0 deletions agent/app/agent_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestDoStartTaskENIHappyPath(t *testing.T) {
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
mockServiceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
mockUdevMonitor.EXPECT().Monitor(gomock.Any()).Return(monitoShutdownEvents).AnyTimes()

gomock.InOrder(
Expand Down Expand Up @@ -454,6 +455,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) {
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
mockServiceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

gomock.InOrder(
mockControl.EXPECT().Init().Return(nil),
Expand Down Expand Up @@ -615,6 +617,7 @@ func TestDoStartGPUManagerHappyPath(t *testing.T) {
mockServiceConnectManager.EXPECT().GetLoadedAppnetVersion().AnyTimes()
mockServiceConnectManager.EXPECT().GetCapabilitiesForAppnetInterfaceVersion("").AnyTimes()
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
mockServiceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

gomock.InOrder(
mockGPUManager.EXPECT().Initialize().Return(nil),
Expand Down
1 change: 1 addition & 0 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
go engine.handleDockerEvents(derivedCtx)
engine.initialized = true
go engine.startPeriodicExecAgentsMonitoring(derivedCtx)
go engine.watchAppNetImage(derivedCtx)
return nil
}

Expand Down
81 changes: 81 additions & 0 deletions agent/engine/docker_task_engine_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
package engine

import (
"context"
"fmt"
"time"

"github.com/fsnotify/fsnotify"

apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
apitask "github.com/aws/amazon-ecs-agent/agent/api/task"
apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status"
"github.com/aws/amazon-ecs-agent/agent/logger"
)

const (
Expand All @@ -39,3 +45,78 @@ func (engine *DockerTaskEngine) updateTaskENIDependencies(task *apitask.Task) {
func (engine *DockerTaskEngine) invokePluginsForContainer(task *apitask.Task, container *apicontainer.Container) error {
return nil
}

func (engine *DockerTaskEngine) watchAppNetImage(ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
logger.Error(fmt.Sprintf("failed to initialize fsnotify NewWatcher, error: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking. we can use logger fields : )

		logger.Error("Failed to initialize fsnotify NewWatcher", logger.Fields{
			field.Error:     err,
		})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops...will fix it in the next PR!! Thank you:)

}
appnetContainerTarballDir := engine.serviceconnectManager.GetAppnetContainerTarballDir()
err = watcher.Add(appnetContainerTarballDir)
if err != nil {
logger.Error(fmt.Sprintf("error adding %s to fsnotify watcher, error: %v", appnetContainerTarballDir, err))
}
defer watcher.Close()

// Start listening for events.
for {
select {
case event, ok := <-watcher.Events:
if !ok {
logger.Warn("fsnotify event watcher channel is closed")
return
}
// check if the event file operation is write or create
const writeOrCreateMask = fsnotify.Write | fsnotify.Create
if event.Op&writeOrCreateMask != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a space between event.Op and writeOrCreateMask ? Also, can you explain what this means?

Copy link
Contributor Author

@mythri-garaga mythri-garaga Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added space but I think gofmt removes it.

Also add a comment to clarify the condition we check:)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Mythri :)

logger.Debug(fmt.Sprintf("new fsnotify watcher event: %s", event.Name))
// reload the updated Appnet Agent image
if err := engine.reloadAppNetImage(); err == nil {
// restart the internal instance relay task with
// updated Appnet Agent image
engine.restartInstanceTask()
}
}
case err, ok := <-watcher.Errors:
if !ok {
logger.Warn("fsnotify event watcher channel is closed")
return
yinyic marked this conversation as resolved.
Show resolved Hide resolved
}
logger.Error(fmt.Sprintf("fsnotify watcher error: %v", err))
case <-ctx.Done():
return
}
}
}

func (engine *DockerTaskEngine) reloadAppNetImage() error {
_, err := engine.serviceconnectManager.LoadImage(engine.ctx, engine.cfg, engine.client)
if err != nil {
logger.Error(fmt.Sprintf("engine: Failed to reload appnet Agent container, error: %v", err))
return err
}
return nil
}

func (engine *DockerTaskEngine) restartInstanceTask() {
if engine.serviceconnectRelay != nil {
serviceconnectRelayTask, err := engine.serviceconnectManager.CreateInstanceTask(engine.cfg)
if err != nil {
logger.Error(fmt.Sprintf("Unable to start relay for task in the engine: %v", err))
return
}
// clean up instance relay task
for _, container := range engine.serviceconnectRelay.Containers {
if container.Type == apicontainer.ContainerServiceConnectRelay {
engine.stopContainer(engine.serviceconnectRelay, container)
}
}
engine.serviceconnectRelay.SetDesiredStatus(apitaskstatus.TaskStopped)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q. Would we show the task is stopped/started due to AppNet Agent image reload somewhere in ECS Agent log? or Control plane will receive a message from ECS Agent, saying that task state is changed due to image is reloading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this an internal task, we don't publish this task status to backend but just log the restart on ECS Agent.

engine.sweepTask(engine.serviceconnectRelay)
engine.deleteTask(engine.serviceconnectRelay)

engine.serviceconnectRelay = serviceconnectRelayTask
engine.AddTask(engine.serviceconnectRelay)
logger.Info("engine: Restarted AppNet Relay task")
}
}
38 changes: 33 additions & 5 deletions agent/engine/docker_task_engine_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -81,7 +82,7 @@ func init() {
func TestResourceContainerProgression(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, _, imageManager, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, client, mockTime, taskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

sleepTask := testdata.LoadTask("sleep5")
Expand All @@ -105,6 +106,7 @@ func TestResourceContainerProgression(t *testing.T) {
// events are processed
containerEventsWG := sync.WaitGroup{}
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
gomock.InOrder(
// Ensure that the resource is created first
mockControl.EXPECT().Exists(gomock.Any()).Return(false),
Expand Down Expand Up @@ -250,7 +252,7 @@ func TestDeleteTaskBranchENIEnabled(t *testing.T) {
func TestResourceContainerProgressionFailure(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, _, _, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, client, mockTime, taskEngine, _, _, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()
sleepTask := testdata.LoadTask("sleep5")
sleepContainer := sleepTask.Containers[0]
Expand All @@ -267,6 +269,7 @@ func TestResourceContainerProgressionFailure(t *testing.T) {
sleepTask.AddResource("cgroup", cgroupResource)
eventStream := make(chan dockerapi.DockerContainerChangeEvent)
client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
gomock.InOrder(
// resource creation failure
mockControl.EXPECT().Exists(gomock.Any()).Return(false),
Expand Down Expand Up @@ -307,7 +310,7 @@ func TestTaskCPULimitHappyPath(t *testing.T) {
metadataConfig.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, credentialsManager, imageManager, metadataManager, _ := mocks(
ctrl, client, mockTime, taskEngine, credentialsManager, imageManager, metadataManager, serviceConnectManager := mocks(
t, ctx, &metadataConfig)
defer ctrl.Finish()

Expand All @@ -328,6 +331,7 @@ func TestTaskCPULimitHappyPath(t *testing.T) {
containerEventsWG := sync.WaitGroup{}

client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
containerName := make(chan string)
go func() {
name := <-containerName
Expand Down Expand Up @@ -590,7 +594,7 @@ func TestBuildCNIConfigFromTaskContainer(t *testing.T) {
func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, client, mockTime, taskEngine, _, imageManager, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, client, mockTime, taskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

mockCNIClient := mock_ecscni.NewMockCNIClient(ctrl)
Expand All @@ -616,6 +620,7 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
containerEventsWG := sync.WaitGroup{}

client.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()
// We cannot rely on the order of pulls between images as they can still be downloaded in
// parallel. The dependency graph enforcement comes into effect for CREATED transitions.
// Hence, do not enforce the order of invocation of these calls
Expand Down Expand Up @@ -728,7 +733,7 @@ func TestTaskWithSteadyStateResourcesProvisioned(t *testing.T) {
func TestPauseContainerHappyPath(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, dockerClient, mockTime, taskEngine, _, imageManager, _, _ := mocks(t, ctx, &defaultConfig)
ctrl, dockerClient, mockTime, taskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

cniClient := mock_ecscni.NewMockCNIClient(ctrl)
Expand Down Expand Up @@ -758,6 +763,7 @@ func TestPauseContainerHappyPath(t *testing.T) {
})

dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

sleepContainerID1 := containerID + "1"
sleepContainerID2 := containerID + "2"
Expand Down Expand Up @@ -985,6 +991,7 @@ func TestContainersWithServiceConnect(t *testing.T) {
sleepTask.AddTaskENI(mockENI)

dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

sleepContainerID1 := containerID + "1"
sleepContainerID2 := containerID + "2"
Expand Down Expand Up @@ -1136,6 +1143,7 @@ func TestContainersWithServiceConnect_BridgeMode(t *testing.T) {
})

dockerClient.EXPECT().ContainerEvents(gomock.Any()).Return(eventStream, nil)
serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes()

sleepContainerID := containerID + "1"
scContainerID := "serviceConnectID"
Expand Down Expand Up @@ -1364,3 +1372,23 @@ func TestProvisionContainerResourcesBridgeModeWithServiceConnect(t *testing.T) {
require.Nil(t, taskEngine.(*DockerTaskEngine).provisionContainerResources(testTask, cont).Error)
}
}

func TestWatchAppNetImage(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
ctrl, _, _, taskEngine, _, _, _, serviceConnectManager := mocks(t, ctx, &defaultConfig)
defer ctrl.Finish()

tempServiceConnectAppnetAgenTarballDir := t.TempDir()

serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().Return(tempServiceConnectAppnetAgenTarballDir).AnyTimes()
serviceConnectManager.EXPECT().LoadImage(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

watcherCtx, watcherCancel := context.WithTimeout(context.Background(), time.Second)
defer watcherCancel()
go taskEngine.(*DockerTaskEngine).watchAppNetImage(watcherCtx)
_, err := os.CreateTemp(tempServiceConnectAppnetAgenTarballDir, "agent.tar")
assert.NoError(t, err)

<-watcherCtx.Done()
}
Loading