diff --git a/agent/acs/client/acs_client_types.go b/agent/acs/client/acs_client_types.go index 6a80531408e..471989eaf65 100644 --- a/agent/acs/client/acs_client_types.go +++ b/agent/acs/client/acs_client_types.go @@ -47,6 +47,9 @@ func init() { ecsacs.ErrorMessage{}, ecsacs.AttachTaskNetworkInterfacesMessage{}, ecsacs.AttachInstanceNetworkInterfacesMessage{}, + ecsacs.TaskManifestMessage{}, + ecsacs.TaskStopVerificationAck{}, + ecsacs.TaskStopVerificationMessage{}, } } diff --git a/agent/acs/handler/acs_handler.go b/agent/acs/handler/acs_handler.go index 7ae0d7b6373..65bb759edaf 100644 --- a/agent/acs/handler/acs_handler.go +++ b/agent/acs/handler/acs_handler.go @@ -90,6 +90,7 @@ type session struct { cancel context.CancelFunc backoff retry.Backoff resources sessionResources + latestSeqNumTaskManifest *int64 _heartbeatTimeout time.Duration _heartbeatJitter time.Duration _inactiveInstanceReconnectDelay time.Duration @@ -143,7 +144,7 @@ func NewSession(ctx context.Context, stateManager statemanager.StateManager, taskEngine engine.TaskEngine, credentialsManager rolecredentials.Manager, - taskHandler *eventhandler.TaskHandler) Session { + taskHandler *eventhandler.TaskHandler, latestSeqNumTaskManifest *int64) Session { resources := newSessionResources(credentialsProvider) backoff := retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier) @@ -164,6 +165,7 @@ func NewSession(ctx context.Context, cancel: cancel, backoff: backoff, resources: resources, + latestSeqNumTaskManifest: latestSeqNumTaskManifest, _heartbeatTimeout: heartbeatTimeout, _heartbeatJitter: heartbeatJitter, _inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay, @@ -294,6 +296,17 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error { client.AddRequestHandler(instanceENIAttachHandler.handlerFunc()) + // Add TaskManifestHandler + taskManifestHandler := newTaskManifestHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN, + client, acsSession.stateManager, acsSession.taskEngine, acsSession.latestSeqNumTaskManifest) + + defer taskManifestHandler.clearAcks() + taskManifestHandler.start() + defer taskManifestHandler.stop() + + client.AddRequestHandler(taskManifestHandler.handlerFuncTaskManifestMessage()) + client.AddRequestHandler(taskManifestHandler.handlerFuncTaskStopVerificationMessage()) + // Add request handler for handling payload messages from ACS payloadHandler := newPayloadRequestHandler( acsSession.ctx, @@ -305,7 +318,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error { acsSession.stateManager, refreshCredsHandler, acsSession.credentialsManager, - acsSession.taskHandler) + acsSession.taskHandler, acsSession.latestSeqNumTaskManifest) // Clear the acks channel on return because acks of messageids don't have any value across sessions defer payloadHandler.clearAcks() payloadHandler.start() diff --git a/agent/acs/handler/acs_handler_test.go b/agent/acs/handler/acs_handler_test.go index 7f5f7456972..239c3bbc922 100644 --- a/agent/acs/handler/acs_handler_test.go +++ b/agent/acs/handler/acs_handler_test.go @@ -47,6 +47,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/version" "github.com/aws/amazon-ecs-agent/agent/wsclient" mock_wsclient "github.com/aws/amazon-ecs-agent/agent/wsclient/mock" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/gorilla/websocket" "github.com/pkg/errors" @@ -365,6 +366,7 @@ func TestHandlerReconnectsWithoutBackoffOnEOFError(t *testing.T) { ctx: ctx, cancel: cancel, resources: &mockSessionResources{mockWsClient}, + latestSeqNumTaskManifest: aws.Int64(10), _heartbeatTimeout: 20 * time.Millisecond, _heartbeatJitter: 10 * time.Millisecond, _inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay, @@ -833,18 +835,19 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) { go func() { acsSession := session{ - containerInstanceARN: "myArn", - credentialsProvider: testCreds, - agentConfig: testConfig, - taskEngine: taskEngine, - ecsClient: ecsClient, - stateManager: stateManager, - taskHandler: taskHandler, - ctx: ctx, - _heartbeatTimeout: 1 * time.Second, - backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier), - resources: newSessionResources(testCreds), - credentialsManager: rolecredentials.NewManager(), + containerInstanceARN: "myArn", + credentialsProvider: testCreds, + agentConfig: testConfig, + taskEngine: taskEngine, + ecsClient: ecsClient, + stateManager: stateManager, + taskHandler: taskHandler, + ctx: ctx, + _heartbeatTimeout: 1 * time.Second, + backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier), + resources: newSessionResources(testCreds), + credentialsManager: rolecredentials.NewManager(), + latestSeqNumTaskManifest: aws.Int64(12), } acsSession.Start() ended <- true @@ -914,6 +917,7 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) { credentialsManager := mock_credentials.NewMockManager(ctrl) + latestSeqNumberTaskManifest := int64(10) ended := make(chan bool, 1) go func() { acsSession := NewSession(ctx, @@ -926,7 +930,7 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) { stateManager, taskEngine, credentialsManager, - taskHandler, + taskHandler, &latestSeqNumberTaskManifest, ) acsSession.Start() // StartSession should never return unless the context is canceled diff --git a/agent/acs/handler/payload_handler.go b/agent/acs/handler/payload_handler.go index c1abc4d5189..9a600527f98 100644 --- a/agent/acs/handler/payload_handler.go +++ b/agent/acs/handler/payload_handler.go @@ -43,12 +43,13 @@ type payloadRequestHandler struct { saver statemanager.Saver taskHandler *eventhandler.TaskHandler // cancel is used to stop go routines started by start() method - cancel context.CancelFunc - cluster string - containerInstanceArn string - acsClient wsclient.ClientServer - refreshHandler refreshCredentialsHandler - credentialsManager credentials.Manager + cancel context.CancelFunc + cluster string + containerInstanceArn string + acsClient wsclient.ClientServer + refreshHandler refreshCredentialsHandler + credentialsManager credentials.Manager + latestSeqNumberTaskManifest *int64 } // newPayloadRequestHandler returns a new payloadRequestHandler object @@ -62,23 +63,24 @@ func newPayloadRequestHandler( saver statemanager.Saver, refreshHandler refreshCredentialsHandler, credentialsManager credentials.Manager, - taskHandler *eventhandler.TaskHandler) payloadRequestHandler { + taskHandler *eventhandler.TaskHandler, seqNumTaskManifest *int64) payloadRequestHandler { // Create a cancelable context from the parent context derivedContext, cancel := context.WithCancel(ctx) return payloadRequestHandler{ - messageBuffer: make(chan *ecsacs.PayloadMessage, payloadMessageBufferSize), - ackRequest: make(chan string, payloadMessageBufferSize), - taskEngine: taskEngine, - ecsClient: ecsClient, - saver: saver, - taskHandler: taskHandler, - ctx: derivedContext, - cancel: cancel, - cluster: cluster, - containerInstanceArn: containerInstanceArn, - acsClient: acsClient, - refreshHandler: refreshHandler, - credentialsManager: credentialsManager, + messageBuffer: make(chan *ecsacs.PayloadMessage, payloadMessageBufferSize), + ackRequest: make(chan string, payloadMessageBufferSize), + taskEngine: taskEngine, + ecsClient: ecsClient, + saver: saver, + taskHandler: taskHandler, + ctx: derivedContext, + cancel: cancel, + cluster: cluster, + containerInstanceArn: containerInstanceArn, + acsClient: acsClient, + refreshHandler: refreshHandler, + credentialsManager: credentialsManager, + latestSeqNumberTaskManifest: seqNumTaskManifest, } } @@ -151,10 +153,19 @@ func (payloadHandler *payloadRequestHandler) handleSingleMessage(payload *ecsacs } seelog.Debugf("Received payload message, message id: %s", aws.StringValue(payload.MessageId)) credentialsAcks, allTasksHandled := payloadHandler.addPayloadTasks(payload) + + // Update latestSeqNumberTaskManifest for it to get updated in state file + if payloadHandler.latestSeqNumberTaskManifest != nil && payload.SeqNum != nil && + *payloadHandler.latestSeqNumberTaskManifest < *payload.SeqNum { + + *payloadHandler.latestSeqNumberTaskManifest = *payload.SeqNum + } + // save the state of tasks we know about after passing them to the task engine err := payloadHandler.saver.Save() if err != nil { - seelog.Errorf("Error saving state for payload message! err: %v, messageId: %s", err, aws.StringValue(payload.MessageId)) + seelog.Errorf("Error saving state for payload message! err: %v, messageId: %s", err, + aws.StringValue(payload.MessageId)) // Don't ack; maybe we can save it in the future. return fmt.Errorf("error saving state for payload message, with messageId: %s", aws.StringValue(payload.MessageId)) } diff --git a/agent/acs/handler/payload_handler_test.go b/agent/acs/handler/payload_handler_test.go index f15f056626d..f33fdecf89d 100644 --- a/agent/acs/handler/payload_handler_test.go +++ b/agent/acs/handler/payload_handler_test.go @@ -69,6 +69,7 @@ func setup(t *testing.T) *testHelper { credentialsManager := credentials.NewManager() ctx, cancel := context.WithCancel(context.Background()) taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil) + latestSeqNumberTaskManifest := int64(10) handler := newPayloadRequestHandler( ctx, @@ -80,7 +81,7 @@ func setup(t *testing.T) *testHelper { stateManager, refreshCredentialsHandler{}, credentialsManager, - taskHandler) + taskHandler, &latestSeqNumberTaskManifest) return &testHelper{ ctrl: ctrl, diff --git a/agent/acs/handler/task_manifest_handler.go b/agent/acs/handler/task_manifest_handler.go new file mode 100644 index 00000000000..6b65927e47d --- /dev/null +++ b/agent/acs/handler/task_manifest_handler.go @@ -0,0 +1,281 @@ +// Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. +package handler + +import ( + "context" + "sync" + + "github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs" + apitask "github.com/aws/amazon-ecs-agent/agent/api/task" + apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" + "github.com/aws/amazon-ecs-agent/agent/engine" + "github.com/aws/amazon-ecs-agent/agent/statemanager" + "github.com/aws/amazon-ecs-agent/agent/wsclient" + "github.com/aws/aws-sdk-go/aws" + "github.com/cihub/seelog" +) + +// taskManifestHandler handles task manifest message for the ACS client +type taskManifestHandler struct { + messageBufferTaskManifest chan *ecsacs.TaskManifestMessage + messageBufferTaskManifestAck chan string + messageBufferTaskStopVerificationMessage chan *ecsacs.TaskStopVerificationMessage + messageBufferTaskStopVerificationAck chan *ecsacs.TaskStopVerificationAck + ctx context.Context + taskEngine engine.TaskEngine + cancel context.CancelFunc + saver statemanager.Saver + cluster string + containerInstanceArn string + acsClient wsclient.ClientServer + latestSeqNumberTaskManifest *int64 + messageId string + lock sync.RWMutex +} + +// newTaskManifestHandler returns an instance of the taskManifestHandler struct +func newTaskManifestHandler(ctx context.Context, + cluster string, containerInstanceArn string, acsClient wsclient.ClientServer, + saver statemanager.Saver, taskEngine engine.TaskEngine, latestSeqNumberTaskManifest *int64) taskManifestHandler { + + // Create a cancelable context from the parent context + derivedContext, cancel := context.WithCancel(ctx) + + return taskManifestHandler{ + messageBufferTaskManifest: make(chan *ecsacs.TaskManifestMessage), + messageBufferTaskManifestAck: make(chan string), + messageBufferTaskStopVerificationMessage: make(chan *ecsacs.TaskStopVerificationMessage), + messageBufferTaskStopVerificationAck: make(chan *ecsacs.TaskStopVerificationAck), + ctx: derivedContext, + cancel: cancel, + cluster: cluster, + containerInstanceArn: containerInstanceArn, + acsClient: acsClient, + taskEngine: taskEngine, + saver: saver, + latestSeqNumberTaskManifest: latestSeqNumberTaskManifest, + } +} + +func (taskManifestHandler *taskManifestHandler) handlerFuncTaskManifestMessage() func( + message *ecsacs.TaskManifestMessage) { + return func(message *ecsacs.TaskManifestMessage) { + taskManifestHandler.messageBufferTaskManifest <- message + } +} + +func (taskManifestHandler *taskManifestHandler) handlerFuncTaskStopVerificationMessage() func( + message *ecsacs.TaskStopVerificationAck) { + return func(message *ecsacs.TaskStopVerificationAck) { + taskManifestHandler.messageBufferTaskStopVerificationAck <- message + } +} + +func (taskManifestHandler *taskManifestHandler) start() { + // Task manifest and it's ack + go taskManifestHandler.handleTaskManifestMessage() + go taskManifestHandler.sendTaskManifestMessageAck() + + // Task stop verification message and it's ack + go taskManifestHandler.sendTaskStopVerificationMessage() + go taskManifestHandler.handleTaskStopVerificationAck() + +} + +func (taskManifestHandler *taskManifestHandler) getMessageId() string { + taskManifestHandler.lock.RLock() + defer taskManifestHandler.lock.RUnlock() + return taskManifestHandler.messageId +} + +func (taskManifestHandler *taskManifestHandler) setMessageId(messageId string) { + taskManifestHandler.lock.Lock() + defer taskManifestHandler.lock.Unlock() + taskManifestHandler.messageId = messageId +} + +func (taskManifestHandler *taskManifestHandler) sendTaskManifestMessageAck() { + for { + select { + case messageBufferTaskManifestAck := <-taskManifestHandler.messageBufferTaskManifestAck: + taskManifestHandler.ackTaskManifestMessage(messageBufferTaskManifestAck) + case <-taskManifestHandler.ctx.Done(): + return + } + } +} + +func (taskManifestHandler *taskManifestHandler) handleTaskStopVerificationAck() { + for { + select { + case messageBufferTaskStopVerificationAck := <-taskManifestHandler.messageBufferTaskStopVerificationAck: + if err := taskManifestHandler.handleSingleMessageVerificationAck(messageBufferTaskStopVerificationAck); err != nil { + seelog.Warnf("Error handling Verification ack with messageID: %s, error: %v", + messageBufferTaskStopVerificationAck.MessageId, err) + } + case <-taskManifestHandler.ctx.Done(): + return + } + } +} + +func (taskManifestHandler *taskManifestHandler) clearAcks() { + for { + select { + case <-taskManifestHandler.messageBufferTaskManifestAck: + case <-taskManifestHandler.messageBufferTaskStopVerificationAck: + default: + return + } + } +} + +func (taskManifestHandler *taskManifestHandler) ackTaskManifestMessage(messageID string) { + seelog.Debugf("Acking task manifest message id: %s", messageID) + err := taskManifestHandler.acsClient.MakeRequest(&ecsacs.AckRequest{ + Cluster: aws.String(taskManifestHandler.cluster), + ContainerInstance: aws.String(taskManifestHandler.containerInstanceArn), + MessageId: aws.String(messageID), + }) + if err != nil { + seelog.Warnf("Error 'ack'ing TaskManifestMessage with messageID: %s, error: %v", messageID, err) + } +} + +// stop is used to invoke a cancellation function +func (taskManifestHandler *taskManifestHandler) stop() { + taskManifestHandler.cancel() +} + +func (taskManifestHandler *taskManifestHandler) handleTaskManifestMessage() { + for { + select { + case <-taskManifestHandler.ctx.Done(): + return + case message := <-taskManifestHandler.messageBufferTaskManifest: + if err := taskManifestHandler.handleTaskManifestSingleMessage(message); err != nil { + seelog.Warnf("Unable to handle taskManifest message [%s]: %v", message.String(), err) + } + } + } +} + +func (taskManifestHandler *taskManifestHandler) sendTaskStopVerificationMessage() { + for { + select { + case message := <-taskManifestHandler.messageBufferTaskStopVerificationMessage: + if err := taskManifestHandler.acsClient.MakeRequest(message); err != nil { + seelog.Warnf("Unable to send taskStopVerification message [%s]: %v", message.String(), err) + } + case <-taskManifestHandler.ctx.Done(): + return + } + } +} + +// compares the list of tasks received in the task manifest message and tasks running on the the instance +// It returns all the task that are running on the instance but not present in task manifest message task list +func compareTasks(receivedTaskList []*ecsacs.TaskIdentifier, runningTaskList []*apitask.Task) []*ecsacs.TaskIdentifier { + tasksToBeKilled := make([]*ecsacs.TaskIdentifier, 0) + for _, runningTask := range runningTaskList { + // For every task running on the instance check if the task is present in receivedTaskList with the DesiredState + // of running, if not add them to the list of task that needs to be stopped + if runningTask.GetDesiredStatus() == apitaskstatus.TaskRunning { + taskPresent := false + for _, receivedTask := range receivedTaskList { + if *receivedTask.TaskArn == runningTask.Arn && *receivedTask. + DesiredStatus == apitaskstatus.TaskRunningString { + // Task present, does not need to be stopped + taskPresent = true + break + } + } + if !taskPresent { + tasksToBeKilled = append(tasksToBeKilled, &ecsacs.TaskIdentifier{ + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String(runningTask.Arn), + }) + } + } + } + return tasksToBeKilled +} + +func (taskManifestHandler *taskManifestHandler) handleSingleMessageVerificationAck( + message *ecsacs.TaskStopVerificationAck) error { + // Ensure that we have received a corresponding task manifest message before + taskManifestMessageId := taskManifestHandler.getMessageId() + if taskManifestMessageId != "" && *message.MessageId == taskManifestMessageId { + // Reset the message id so that the message with same message id is not processed twice + taskManifestHandler.setMessageId("") + for _, taskToKill := range message.StopTasks { + if *taskToKill.DesiredStatus == apitaskstatus.TaskStoppedString { + task, isPresent := taskManifestHandler.taskEngine.GetTaskByArn(*taskToKill.TaskArn) + if isPresent { + seelog.Infof("Stopping task from task manifest handler: %s", task.Arn) + task.SetDesiredStatus(apitaskstatus.TaskStopped) + taskManifestHandler.taskEngine.AddTask(task) + } else { + seelog.Debugf("Task not found on the instance: %s", *taskToKill.TaskArn) + } + } + } + } + return nil +} + +func (taskManifestHandler *taskManifestHandler) handleTaskManifestSingleMessage( + message *ecsacs.TaskManifestMessage) error { + taskListManifestHandler := message.Tasks + seqNumberFromMessage := *message.Timeline + agentLatestSequenceNumber := *taskManifestHandler.latestSeqNumberTaskManifest + + // Check if the sequence number of message received is more than the one stored in Agent + if agentLatestSequenceNumber < seqNumberFromMessage { + runningTasksOnInstance, err := taskManifestHandler.taskEngine.ListTasks() + if err != nil { + return err + } + // Update state file + *taskManifestHandler.latestSeqNumberTaskManifest = *message.Timeline + err = taskManifestHandler.saver.Save() + if err != nil { + return err + } + + tasksToKill := compareTasks(taskListManifestHandler, runningTasksOnInstance) + + // Update messageId so that it can be compared to the messageId in TaskStopVerificationAck message + taskManifestHandler.setMessageId(*message.MessageId) + + // Throw the task manifest ack and task verification message in async so that it does not block the current + // thread. + go func() { + taskManifestHandler.messageBufferTaskManifestAck <- *message.MessageId + if len(tasksToKill) > 0 { + taskStopVerificationMessage := ecsacs.TaskStopVerificationMessage{ + MessageId: message.MessageId, + StopCandidates: tasksToKill, + } + + taskManifestHandler.messageBufferTaskStopVerificationMessage <- &taskStopVerificationMessage + } + }() + } else { + seelog.Debugf("Skipping the task manifest message. sequence number from task manifest: %d. sequence number "+ + " from Agent: %d", seqNumberFromMessage, agentLatestSequenceNumber) + } + + return nil +} diff --git a/agent/acs/handler/task_manifest_handler_test.go b/agent/acs/handler/task_manifest_handler_test.go new file mode 100644 index 00000000000..3bb00cc6a20 --- /dev/null +++ b/agent/acs/handler/task_manifest_handler_test.go @@ -0,0 +1,486 @@ +// +build unit + +// Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. +package handler + +import ( + "context" + "testing" + + "github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs" + "github.com/aws/amazon-ecs-agent/agent/api/task" + apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" + mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks" + mock_statemanager "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks" + mock_wsclient "github.com/aws/amazon-ecs-agent/agent/wsclient/mock" + "github.com/aws/aws-sdk-go/aws" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +// Tests the case when all the tasks running on the instance needs to be killed +func TestManifestHandlerKillAllTasks(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + manager := mock_statemanager.NewMockStateManager(ctrl) + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + cluster := "mock-cluster" + containerInstanceArn := "mock-container-instance" + messageId := "mock-message-id" + + ctx := context.TODO() + mockWSClient := mock_wsclient.NewMockClientServer(ctrl) + + newTaskManifest := newTaskManifestHandler(ctx, cluster, containerInstanceArn, mockWSClient, manager, taskEngine, + aws.Int64(11)) + + ackRequested := &ecsacs.AckRequest{ + Cluster: aws.String(cluster), + ContainerInstance: aws.String(containerInstanceArn), + MessageId: aws.String(messageId), + } + + task2 := &task.Task{Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + task1 := &task.Task{Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + + taskList := []*task.Task{task1, task2} + + //Task that needs to be stopped, sent back by agent + taskIdentifierFinal := []*ecsacs.TaskIdentifier{ + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn1")}, + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn2")}, + } + + taskStopVerificationMessage := &ecsacs.TaskStopVerificationMessage{ + MessageId: aws.String(messageId), + StopCandidates: taskIdentifierFinal, + } + + messageTaskStopVerificationAck := &ecsacs.TaskStopVerificationAck{ + GeneratedAt: aws.Int64(123), + MessageId: aws.String(messageId), + StopTasks: taskIdentifierFinal, + } + + gomock.InOrder( + taskEngine.EXPECT().ListTasks().Return(taskList, nil).Times(1), + manager.EXPECT().Save().Return(nil).Times(1), + // AddTask function needs to be called twice for both the tasks getting stopped + taskEngine.EXPECT().AddTask(gomock.Any()), + taskEngine.EXPECT().AddTask(gomock.Any()).Do(func(task1 *task.Task) { + newTaskManifest.stop() + }), + ) + + mockWSClient.EXPECT().MakeRequest(ackRequested).Times(1) + + mockWSClient.EXPECT().MakeRequest(taskStopVerificationMessage).Times(1).Do(func(message *ecsacs.TaskStopVerificationMessage) { + // Agent receives the ack message when taskStopVerificationMessage is processed by ACS + newTaskManifest.messageBufferTaskStopVerificationAck <- messageTaskStopVerificationAck + }) + + taskEngine.EXPECT().GetTaskByArn("arn1").Return(task1, true) + taskEngine.EXPECT().GetTaskByArn("arn2").Return(task2, true) + + message := &ecsacs.TaskManifestMessage{ + MessageId: aws.String(messageId), + ClusterArn: aws.String(cluster), + ContainerInstanceArn: aws.String(containerInstanceArn), + Tasks: []*ecsacs.TaskIdentifier{ + {DesiredStatus: aws.String("STOPPED"), TaskArn: aws.String("arn-long")}, + }, + Timeline: aws.Int64(12), + } + + go newTaskManifest.start() + + newTaskManifest.messageBufferTaskManifest <- message + + select { + case <-newTaskManifest.ctx.Done(): + } +} + +// Tests the case when two of three tasks running on the instance needs to be killed +func TestManifestHandlerKillFewTasks(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + manager := mock_statemanager.NewMockStateManager(ctrl) + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + cluster := "mock-cluster" + containerInstanceArn := "mock-container-instance" + messageId := "mock-message-id" + + ctx := context.TODO() + mockWSClient := mock_wsclient.NewMockClientServer(ctrl) + + newTaskManifest := newTaskManifestHandler(ctx, cluster, containerInstanceArn, mockWSClient, manager, taskEngine, + aws.Int64(11)) + + ackRequested := &ecsacs.AckRequest{ + Cluster: aws.String(cluster), + ContainerInstance: aws.String(containerInstanceArn), + MessageId: aws.String(messageId), + } + + task2 := &task.Task{Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + task1 := &task.Task{Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + task3 := &task.Task{Arn: "arn3", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + + taskList := []*task.Task{task1, task2, task3} + + //Task that needs to be stopped, sent back by agent + taskIdentifierFinal := []*ecsacs.TaskIdentifier{ + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn2")}, + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn3")}, + } + + taskStopVerificationMessage := &ecsacs.TaskStopVerificationMessage{ + MessageId: aws.String(messageId), + StopCandidates: taskIdentifierFinal, + } + + messageTaskStopVerificationAck := &ecsacs.TaskStopVerificationAck{ + GeneratedAt: aws.Int64(123), + MessageId: aws.String(messageId), + StopTasks: taskIdentifierFinal, + } + + gomock.InOrder( + taskEngine.EXPECT().ListTasks().Return(taskList, nil).Times(1), + manager.EXPECT().Save().Return(nil).Times(1), + taskEngine.EXPECT().AddTask(gomock.Any()), + taskEngine.EXPECT().AddTask(gomock.Any()).Do(func(task1 *task.Task) { + newTaskManifest.stop() + }), + ) + + mockWSClient.EXPECT().MakeRequest(ackRequested).Times(1) + + mockWSClient.EXPECT().MakeRequest(taskStopVerificationMessage).Times(1).Do(func(message *ecsacs.TaskStopVerificationMessage) { + newTaskManifest.messageBufferTaskStopVerificationAck <- messageTaskStopVerificationAck + }) + + taskEngine.EXPECT().GetTaskByArn("arn3").Return(task1, true) + taskEngine.EXPECT().GetTaskByArn("arn2").Return(task2, true) + + message := &ecsacs.TaskManifestMessage{ + MessageId: aws.String(messageId), + ClusterArn: aws.String(cluster), + ContainerInstanceArn: aws.String(containerInstanceArn), + Tasks: []*ecsacs.TaskIdentifier{ + { + DesiredStatus: aws.String(apitaskstatus.TaskRunningString), + TaskArn: aws.String("arn1"), + }, + { + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String("arn2"), + }, + }, + Timeline: aws.Int64(12), + } + + go newTaskManifest.start() + + newTaskManifest.messageBufferTaskManifest <- message + + select { + case <-newTaskManifest.ctx.Done(): + } +} + +// Tests the case when their is no difference in task running on the instance and tasks received in task manifest. No +// tasks on the instance needs to be killed +func TestManifestHandlerKillNoTasks(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + manager := mock_statemanager.NewMockStateManager(ctrl) + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + cluster := "mock-cluster" + containerInstanceArn := "mock-container-instance" + messageId := "mock-message-id" + + ctx := context.TODO() + mockWSClient := mock_wsclient.NewMockClientServer(ctrl) + + newTaskManifest := newTaskManifestHandler(ctx, cluster, containerInstanceArn, mockWSClient, manager, taskEngine, + aws.Int64(11)) + + ackRequested := &ecsacs.AckRequest{ + Cluster: aws.String(cluster), + ContainerInstance: aws.String(containerInstanceArn), + MessageId: aws.String(messageId), + } + + task2 := &task.Task{Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + task1 := &task.Task{Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + task3 := &task.Task{Arn: "arn3", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + + taskList := []*task.Task{task1, task2, task3} + + //Task that needs to be stopped, sent back by agent + taskIdentifierFinal := []*ecsacs.TaskIdentifier{ + {DesiredStatus: aws.String("STOPPED"), TaskArn: aws.String("arn2")}, + {DesiredStatus: aws.String("STOPPED"), TaskArn: aws.String("arn3")}, + } + + taskStopVerificationMessage := &ecsacs.TaskStopVerificationMessage{ + MessageId: aws.String(messageId), + StopCandidates: taskIdentifierFinal, + } + + gomock.InOrder( + taskEngine.EXPECT().ListTasks().Return(taskList, nil).Times(1), + manager.EXPECT().Save().Return(nil).Times(1), + ) + + mockWSClient.EXPECT().MakeRequest(taskStopVerificationMessage).Times(0) + mockWSClient.EXPECT().MakeRequest(ackRequested).Times(1).Do(func(message *ecsacs.AckRequest) { + newTaskManifest.stop() + }) + + message := &ecsacs.TaskManifestMessage{ + MessageId: aws.String(messageId), + ClusterArn: aws.String(cluster), + ContainerInstanceArn: aws.String(containerInstanceArn), + Tasks: []*ecsacs.TaskIdentifier{ + { + DesiredStatus: aws.String(apitaskstatus.TaskRunningString), + TaskArn: aws.String("arn1"), + }, + { + DesiredStatus: aws.String(apitaskstatus.TaskRunningString), + TaskArn: aws.String("arn2"), + }, + { + DesiredStatus: aws.String(apitaskstatus.TaskRunningString), + TaskArn: aws.String("arn3"), + }, + }, + Timeline: aws.Int64(12), + } + + go newTaskManifest.start() + + newTaskManifest.messageBufferTaskManifest <- message + + select { + case <-newTaskManifest.ctx.Done(): + } +} + +// Tests the case when the task list received in TaskManifest message is different than the one received in +// TaskStopVerificationMessage +func TestManifestHandlerDifferentTaskLists(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + manager := mock_statemanager.NewMockStateManager(ctrl) + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + cluster := "mock-cluster" + containerInstanceArn := "mock-container-instance" + messageId := "mock-message-id" + + ctx := context.TODO() + mockWSClient := mock_wsclient.NewMockClientServer(ctrl) + + newTaskManifest := newTaskManifestHandler(ctx, cluster, containerInstanceArn, mockWSClient, manager, taskEngine, + aws.Int64(11)) + + ackRequested := &ecsacs.AckRequest{ + Cluster: aws.String(cluster), + ContainerInstance: aws.String(containerInstanceArn), + MessageId: aws.String(messageId), + } + + task2 := &task.Task{Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + task1 := &task.Task{Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning} + + taskList := []*task.Task{task1, task2} + + // tasks that suppose to be running + taskIdentifierInitial := ecsacs.TaskIdentifier{ + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String("arn1"), + } + + //Task that needs to be stopped, sent back by agent + taskIdentifierAckFinal := []*ecsacs.TaskIdentifier{ + {DesiredStatus: aws.String(apitaskstatus.TaskRunningString), TaskArn: aws.String("arn1")}, + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn2")}, + } + + //Task that needs to be stopped, sent back by agent + taskIdentifierMessage := []*ecsacs.TaskIdentifier{ + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn1")}, + {DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), TaskArn: aws.String("arn2")}, + } + + taskStopVerificationMessage := &ecsacs.TaskStopVerificationMessage{ + MessageId: aws.String(messageId), + StopCandidates: taskIdentifierMessage, + } + + messageTaskStopVerificationAck := &ecsacs.TaskStopVerificationAck{ + GeneratedAt: aws.Int64(123), + MessageId: aws.String(messageId), + StopTasks: taskIdentifierAckFinal, + } + + gomock.InOrder( + taskEngine.EXPECT().ListTasks().Return(taskList, nil).Times(1), + manager.EXPECT().Save().Return(nil).Times(1), + taskEngine.EXPECT().AddTask(gomock.Any()).Times(1).Do(func(task1 *task.Task) { + newTaskManifest.stop() + }), + ) + + mockWSClient.EXPECT().MakeRequest(ackRequested).Times(1) + + mockWSClient.EXPECT().MakeRequest(taskStopVerificationMessage).Times(1).Do(func( + message *ecsacs.TaskStopVerificationMessage) { + newTaskManifest.messageBufferTaskStopVerificationAck <- messageTaskStopVerificationAck + }) + + taskEngine.EXPECT().GetTaskByArn("arn1").Times(0) + taskEngine.EXPECT().GetTaskByArn("arn2").Return(task2, true) + + message := &ecsacs.TaskManifestMessage{ + MessageId: aws.String(messageId), + ClusterArn: aws.String(cluster), + ContainerInstanceArn: aws.String(containerInstanceArn), + Tasks: []*ecsacs.TaskIdentifier{ + &taskIdentifierInitial, + }, + Timeline: aws.Int64(12), + } + + go newTaskManifest.start() + + newTaskManifest.messageBufferTaskManifest <- message + + select { + case <-newTaskManifest.ctx.Done(): + } +} + +func TestManifestHandlerSequenceNumbers(t *testing.T) { + testcases := []struct { + name string + inputSequenceNumber int64 + }{ + { + name: "Tests the case when sequence number received is older than the one stored in agent", + inputSequenceNumber: 13, + }, + { + name: "Tests the case when sequence number received is equal to one stored in agent", + inputSequenceNumber: 12, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + manager := mock_statemanager.NewMockStateManager(ctrl) + taskEngine := mock_engine.NewMockTaskEngine(ctrl) + + ctx := context.TODO() + mockWSClient := mock_wsclient.NewMockClientServer(ctrl) + newTaskManifest := newTaskManifestHandler(ctx, cluster, containerInstanceArn, mockWSClient, manager, + taskEngine, aws.Int64(tc.inputSequenceNumber)) + + taskList := []*task.Task{ + {Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning}, + {Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning}, + } + + gomock.InOrder( + taskEngine.EXPECT().ListTasks().Return(taskList, nil).Times(0), + taskEngine.EXPECT().AddTask(gomock.Any()).Times(0), + manager.EXPECT().Save().Return(nil).Times(0), + ) + + message := &ecsacs.TaskManifestMessage{ + MessageId: aws.String(eniMessageId), + ClusterArn: aws.String(clusterName), + ContainerInstanceArn: aws.String(containerInstanceArn), + Tasks: []*ecsacs.TaskIdentifier{ + { + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String("arn-long"), + }, + { + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String("arn-long-1"), + }, + }, + Timeline: aws.Int64(12), + } + err := newTaskManifest.handleTaskManifestSingleMessage(message) + assert.NoError(t, err) + + }) + } +} + +func TestCompareTasksDifferentTasks(t *testing.T) { + receivedTaskList := []*ecsacs.TaskIdentifier{ + { + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String("arn-long"), + }, + { + DesiredStatus: aws.String(apitaskstatus.TaskStoppedString), + TaskArn: aws.String("arn-long-1"), + }, + } + + taskList := []*task.Task{ + {Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning}, + {Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning}, + } + + compareTaskList := compareTasks(receivedTaskList, taskList) + + assert.Equal(t, 2, len(compareTaskList)) +} + +func TestCompareTasksSameTasks(t *testing.T) { + receivedTaskList := []*ecsacs.TaskIdentifier{ + { + DesiredStatus: aws.String(apitaskstatus.TaskRunningString), + TaskArn: aws.String("arn1"), + }, + { + DesiredStatus: aws.String(apitaskstatus.TaskRunningString), + TaskArn: aws.String("arn2"), + }, + } + + taskList := []*task.Task{ + {Arn: "arn2", DesiredStatusUnsafe: apitaskstatus.TaskRunning}, + {Arn: "arn1", DesiredStatusUnsafe: apitaskstatus.TaskRunning}, + } + + compareTaskList := compareTasks(receivedTaskList, taskList) + + assert.Equal(t, 0, len(compareTaskList)) +} diff --git a/agent/acs/model/api/api-2.json b/agent/acs/model/api/api-2.json index b551c9ee5ff..7bff225c698 100644 --- a/agent/acs/model/api/api-2.json +++ b/agent/acs/model/api/api-2.json @@ -693,6 +693,44 @@ "host", "docker" ] + }, + "TaskIdentifier": { + "type": "structure", + "members": { + "taskArn": {"shape":"String"}, + "taskClusterArn": {"shape": "String"}, + "desiredStatus": {"shape":"String"} + } + }, + "TaskIdentifierList": { + "type": "list", + "member": {"shape": "TaskIdentifier"} + }, + "TaskManifestMessage": { + "type": "structure", + "members": { + "containerInstanceArn": {"shape":"String"}, + "clusterArn": {"shape":"String"}, + "tasks": {"shape": "TaskIdentifierList"}, + "generatedAt": {"shape": "Long"}, + "messageId": {"shape": "String"}, + "timeline": {"shape":"Long"} + } + }, + "TaskStopVerificationAck": { + "type": "structure", + "members": { + "stopTasks": {"shape": "TaskIdentifierList"}, + "generatedAt": {"shape": "Long"}, + "messageId": {"shape": "String"} + } + }, + "TaskStopVerificationMessage": { + "type": "structure", + "members": { + "stopCandidates": {"shape": "TaskIdentifierList"}, + "messageId": {"shape": "String"} + } } } } diff --git a/agent/acs/model/ecsacs/api.go b/agent/acs/model/ecsacs/api.go index 5d4f8578b63..a7f4372c2c7 100644 --- a/agent/acs/model/ecsacs/api.go +++ b/agent/acs/model/ecsacs/api.go @@ -1263,6 +1263,90 @@ func (s Task) GoString() string { return s.String() } +type TaskIdentifier struct { + _ struct{} `type:"structure"` + + DesiredStatus *string `locationName:"desiredStatus" type:"string"` + + TaskArn *string `locationName:"taskArn" type:"string"` + + TaskClusterArn *string `locationName:"taskClusterArn" type:"string"` +} + +// String returns the string representation +func (s TaskIdentifier) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s TaskIdentifier) GoString() string { + return s.String() +} + +type TaskManifestMessage struct { + _ struct{} `type:"structure"` + + ClusterArn *string `locationName:"clusterArn" type:"string"` + + ContainerInstanceArn *string `locationName:"containerInstanceArn" type:"string"` + + GeneratedAt *int64 `locationName:"generatedAt" type:"long"` + + MessageId *string `locationName:"messageId" type:"string"` + + Tasks []*TaskIdentifier `locationName:"tasks" type:"list"` + + Timeline *int64 `locationName:"timeline" type:"long"` +} + +// String returns the string representation +func (s TaskManifestMessage) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s TaskManifestMessage) GoString() string { + return s.String() +} + +type TaskStopVerificationAck struct { + _ struct{} `type:"structure"` + + GeneratedAt *int64 `locationName:"generatedAt" type:"long"` + + MessageId *string `locationName:"messageId" type:"string"` + + StopTasks []*TaskIdentifier `locationName:"stopTasks" type:"list"` +} + +// String returns the string representation +func (s TaskStopVerificationAck) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s TaskStopVerificationAck) GoString() string { + return s.String() +} + +type TaskStopVerificationMessage struct { + _ struct{} `type:"structure"` + + MessageId *string `locationName:"messageId" type:"string"` + + StopCandidates []*TaskIdentifier `locationName:"stopCandidates" type:"list"` +} + +// String returns the string representation +func (s TaskStopVerificationMessage) String() string { + return awsutil.Prettify(s) +} + +// GoString returns the string representation +func (s TaskStopVerificationMessage) GoString() string { + return s.String() +} + type UpdateFailureInput struct { _ struct{} `type:"structure"` diff --git a/agent/api/task/status/taskstatus.go b/agent/api/task/status/taskstatus.go index c0389a14918..3497511c4f8 100644 --- a/agent/api/task/status/taskstatus.go +++ b/agent/api/task/status/taskstatus.go @@ -31,16 +31,24 @@ const ( TaskStopped // TaskZombie is an "impossible" state that is used as the maximum TaskZombie + // TaskStoppedString represents task stopped status string + TaskStoppedString = "STOPPED" + // TaskRunningString represents task running status string + TaskRunningString = "RUNNING" + //TaskCreatedString represents task created status string + TaskCreatedString = "CREATED" + // TaskNoneString represents task none status string + TaskNoneString = "NONE" ) // TaskStatus is an enumeration of valid states in the task lifecycle type TaskStatus int32 var taskStatusMap = map[string]TaskStatus{ - "NONE": TaskStatusNone, - "CREATED": TaskCreated, - "RUNNING": TaskRunning, - "STOPPED": TaskStopped, + TaskNoneString: TaskStatusNone, + TaskCreatedString: TaskCreated, + TaskRunningString: TaskRunning, + TaskStoppedString: TaskStopped, } // String returns a human readable string representation of this object diff --git a/agent/app/agent.go b/agent/app/agent.go index aa083abda25..29f63c10fd0 100644 --- a/agent/app/agent.go +++ b/agent/app/agent.go @@ -93,26 +93,27 @@ type agent interface { // after creating it via // the newAgent() method type ecsAgent struct { - ctx context.Context - ec2MetadataClient ec2.EC2MetadataClient - ec2Client ec2.Client - cfg *config.Config - dockerClient dockerapi.DockerClient - containerInstanceARN string - credentialProvider *aws_credentials.Credentials - stateManagerFactory factory.StateManager - saveableOptionFactory factory.SaveableOption - pauseLoader pause.Loader - cniClient ecscni.CNIClient - os oswrapper.OS - vpc string - subnet string - mac string - metadataManager containermetadata.Manager - terminationHandler sighandlers.TerminationHandler - mobyPlugins mobypkgwrapper.Plugins - resourceFields *taskresource.ResourceFields - availabilityZone string + ctx context.Context + ec2MetadataClient ec2.EC2MetadataClient + ec2Client ec2.Client + cfg *config.Config + dockerClient dockerapi.DockerClient + containerInstanceARN string + credentialProvider *aws_credentials.Credentials + stateManagerFactory factory.StateManager + saveableOptionFactory factory.SaveableOption + pauseLoader pause.Loader + cniClient ecscni.CNIClient + os oswrapper.OS + vpc string + subnet string + mac string + metadataManager containermetadata.Manager + terminationHandler sighandlers.TerminationHandler + mobyPlugins mobypkgwrapper.Plugins + resourceFields *taskresource.ResourceFields + availabilityZone string + latestSeqNumberTaskManifest *int64 } // newAgent returns a new ecsAgent object, but does not start anything @@ -158,6 +159,7 @@ func newAgent( metadataManager = containermetadata.NewManager(dockerClient, cfg) } + initialSeqNumber := int64(-1) return &ecsAgent{ ctx: ctx, ec2MetadataClient: ec2MetadataClient, @@ -167,15 +169,16 @@ func newAgent( // We instantiate our own credentialProvider for use in acs/tcs. This tries // to mimic roughly the way it's instantiated by the SDK for a default // session. - credentialProvider: defaults.CredChain(defaults.Config(), defaults.Handlers()), - stateManagerFactory: factory.NewStateManager(), - saveableOptionFactory: factory.NewSaveableOption(), - pauseLoader: pause.New(), - cniClient: ecscni.NewClient(cfg.CNIPluginsPath), - os: oswrapper.New(), - metadataManager: metadataManager, - terminationHandler: sighandlers.StartDefaultTerminationHandler, - mobyPlugins: mobypkgwrapper.NewPlugins(), + credentialProvider: defaults.CredChain(defaults.Config(), defaults.Handlers()), + stateManagerFactory: factory.NewStateManager(), + saveableOptionFactory: factory.NewSaveableOption(), + pauseLoader: pause.New(), + cniClient: ecscni.NewClient(cfg.CNIPluginsPath), + os: oswrapper.New(), + metadataManager: metadataManager, + terminationHandler: sighandlers.StartDefaultTerminationHandler, + mobyPlugins: mobypkgwrapper.NewPlugins(), + latestSeqNumberTaskManifest: &initialSeqNumber, }, nil } @@ -250,8 +253,8 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre agent.initMetricsEngine() // Initialize the state manager - stateManager, err := agent.newStateManager(taskEngine, - &agent.cfg.Cluster, &agent.containerInstanceARN, ¤tEC2InstanceID, &agent.availabilityZone) + stateManager, err := agent.newStateManager(taskEngine, &agent.cfg.Cluster, &agent.containerInstanceARN, + ¤tEC2InstanceID, &agent.availabilityZone, agent.latestSeqNumberTaskManifest) if err != nil { seelog.Criticalf("Error creating state manager: %v", err) return exitcodes.ExitTerminal @@ -344,7 +347,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve // previousStateManager is used to verify that our current runtime configuration is // compatible with our past configuration as reflected by our state-file previousStateManager, err := agent.newStateManager(previousTaskEngine, &previousCluster, - &previousContainerInstanceArn, &previousEC2InstanceID, &previousAZ) + &previousContainerInstanceArn, &previousEC2InstanceID, &previousAZ, agent.latestSeqNumberTaskManifest) if err != nil { seelog.Criticalf("Error creating state manager: %v", err) return nil, "", err @@ -454,7 +457,7 @@ func (agent *ecsAgent) newStateManager( cluster *string, containerInstanceArn *string, savedInstanceID *string, - availabilityZone *string) (statemanager.StateManager, error) { + availabilityZone *string, latestSeqNumberTaskManifest *int64) (statemanager.StateManager, error) { if !agent.cfg.Checkpoint { return statemanager.NewNoopStateManager(), nil @@ -469,6 +472,7 @@ func (agent *ecsAgent) newStateManager( // This is for making testing easier as we can mock this agent.saveableOptionFactory.AddSaveable("EC2InstanceID", savedInstanceID), agent.saveableOptionFactory.AddSaveable("availabilityZone", availabilityZone), + agent.saveableOptionFactory.AddSaveable("latestSeqNumberTaskManifest", latestSeqNumberTaskManifest), ) } @@ -699,6 +703,7 @@ func (agent *ecsAgent) startACSSession( taskEngine, credentialsManager, taskHandler, + agent.latestSeqNumberTaskManifest, ) seelog.Info("Beginning Polling for updates") err := acsSession.Start() diff --git a/agent/app/agent_capability.go b/agent/app/agent_capability.go index 140ff31ae7c..81faaddcd27 100644 --- a/agent/app/agent_capability.go +++ b/agent/app/agent_capability.go @@ -55,6 +55,7 @@ const ( capabilityFirelensLoggingDriver = "logging-driver.awsfirelens" capabilityFirelensConfigFile = "firelens.options.config.file" capabilityFirelensConfigS3 = "firelens.options.config.s3" + capabilityFullTaskSync = "full-sync" ) // capabilities returns the supported capabilities of this agent / docker-client pair. @@ -98,6 +99,7 @@ const ( // com.amazonaws.ecs.capability.logging-driver.awsfirelens // ecs.capability.firelens.options.config.file // ecs.capability.firelens.options.config.s3 +// ecs.capability.full-sync func (agent *ecsAgent) capabilities() ([]*ecs.Attribute, error) { var capabilities []*ecs.Attribute @@ -166,6 +168,9 @@ func (agent *ecsAgent) capabilities() ([]*ecs.Attribute, error) { // support container ordering in agent capabilities = appendNameOnlyAttribute(capabilities, attributePrefix+capabilityContainerOrdering) + // support full task sync + capabilities = appendNameOnlyAttribute(capabilities, attributePrefix+capabilityFullTaskSync) + // ecs agent version 1.22.0 supports sharing PID namespaces and IPC resource namespaces // with host EC2 instance and among containers within the task capabilities = agent.appendPIDAndIPCNamespaceSharingCapabilities(capabilities) diff --git a/agent/app/agent_capability_test.go b/agent/app/agent_capability_test.go index ccf9df9a6ab..4e553a7d867 100644 --- a/agent/app/agent_capability_test.go +++ b/agent/app/agent_capability_test.go @@ -129,6 +129,9 @@ func TestCapabilities(t *testing.T) { { Name: aws.String(attributePrefix + capabilityContainerOrdering), }, + { + Name: aws.String(attributePrefix + capabilityFullTaskSync), + }, }...) ctx, cancel := context.WithCancel(context.TODO()) diff --git a/agent/app/agent_capability_unix_test.go b/agent/app/agent_capability_unix_test.go index 99f4d7b9b80..1e7740b83ca 100644 --- a/agent/app/agent_capability_unix_test.go +++ b/agent/app/agent_capability_unix_test.go @@ -518,6 +518,9 @@ func TestPIDAndIPCNamespaceSharingCapabilitiesUnix(t *testing.T) { { Name: aws.String(attributePrefix + capabilityContainerOrdering), }, + { + Name: aws.String(attributePrefix + capabilityFullTaskSync), + }, { Name: aws.String(attributePrefix + capabiltyPIDAndIPCNamespaceSharing), }, @@ -600,6 +603,9 @@ func TestAppMeshCapabilitiesUnix(t *testing.T) { { Name: aws.String(attributePrefix + capabilityContainerOrdering), }, + { + Name: aws.String(attributePrefix + capabilityFullTaskSync), + }, { Name: aws.String(attributePrefix + capabiltyPIDAndIPCNamespaceSharing), }, diff --git a/agent/app/agent_capability_windows_test.go b/agent/app/agent_capability_windows_test.go index bc50b4b1734..e51415bcadf 100644 --- a/agent/app/agent_capability_windows_test.go +++ b/agent/app/agent_capability_windows_test.go @@ -210,6 +210,9 @@ func TestSupportedCapabilitiesWindows(t *testing.T) { { Name: aws.String(attributePrefix + capabilityContainerOrdering), }, + { + Name: aws.String(attributePrefix + capabilityFullTaskSync), + }, }...) ctx, cancel := context.WithCancel(context.TODO()) diff --git a/agent/app/agent_compatibility_linux_test.go b/agent/app/agent_compatibility_linux_test.go index ec1979d2bc4..adb3e791ed5 100644 --- a/agent/app/agent_compatibility_linux_test.go +++ b/agent/app/agent_compatibility_linux_test.go @@ -47,7 +47,8 @@ func TestCompatibilityEnabledSuccess(t *testing.T) { gomock.InOrder( saveableOptionFactory.EXPECT().AddSaveable(gomock.Any(), gomock.Any()).AnyTimes(), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil), + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil), stateManager.EXPECT().Load().AnyTimes(), state.EXPECT().AllTasks().Return([]*apitask.Task{}), ) @@ -79,7 +80,8 @@ func TestCompatibilityDefaultEnabledFail(t *testing.T) { } gomock.InOrder( saveableOptionFactory.EXPECT().AddSaveable(gomock.Any(), gomock.Any()).AnyTimes(), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil), + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil), stateManager.EXPECT().Load().AnyTimes(), state.EXPECT().AllTasks().Return(getTaskListWithOneBadTask()), ) @@ -110,7 +112,8 @@ func TestCompatibilityExplicitlyEnabledFail(t *testing.T) { } gomock.InOrder( saveableOptionFactory.EXPECT().AddSaveable(gomock.Any(), gomock.Any()).AnyTimes(), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil), + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil), stateManager.EXPECT().Load().AnyTimes(), state.EXPECT().AllTasks().Return(getTaskListWithOneBadTask()), ) diff --git a/agent/app/agent_test.go b/agent/app/agent_test.go index 1f8028bf9f4..f2abcbdc14a 100644 --- a/agent/app/agent_test.go +++ b/agent/app/agent_test.go @@ -153,10 +153,11 @@ func TestDoStartNewTaskEngineError(t *testing.T) { saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), // An error in creating the state manager should result in an // error from newTaskEngine as well - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( nil, errors.New("error")), ) @@ -192,7 +193,9 @@ func TestDoStartNewStateManagerError(t *testing.T) { saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), + + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( statemanager.NewNoopStateManager(), nil), state.EXPECT().AllTasks().AnyTimes(), @@ -201,7 +204,9 @@ func TestDoStartNewStateManagerError(t *testing.T) { saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), + + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( nil, errors.New("error")), ) @@ -407,7 +412,9 @@ func TestNewTaskEngineRestoreFromCheckpointNoEC2InstanceIDToLoadHappyPath(t *tes saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), + + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( statemanager.NewNoopStateManager(), nil), state.EXPECT().AllTasks().AnyTimes(), @@ -463,7 +470,8 @@ func TestNewTaskEngineRestoreFromCheckpointPreviousEC2InstanceIDLoadedHappyPath( assert.True(t, ok) *previousAZ = "us-west-2b" }).Return(nil), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( statemanager.NewNoopStateManager(), nil), state.EXPECT().AllTasks().AnyTimes(), @@ -517,8 +525,9 @@ func TestNewTaskEngineRestoreFromCheckpointClusterIDMismatch(t *testing.T) { }).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( statemanager.NewNoopStateManager(), nil), state.EXPECT().AllTasks().AnyTimes(), @@ -555,7 +564,9 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) { saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), - stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), + + stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return( nil, errors.New("error")), ) @@ -590,8 +601,9 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) { saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), - gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), ).Return(stateManager, nil), stateManager.EXPECT().Load().Return(errors.New("error")), ) @@ -627,8 +639,9 @@ func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) { saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("EC2InstanceID", gomock.Any()).Return(nil), saveableOptionFactory.EXPECT().AddSaveable("availabilityZone", gomock.Any()).Return(nil), + saveableOptionFactory.EXPECT().AddSaveable("latestSeqNumberTaskManifest", gomock.Any()).Return(nil), stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), - gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), ).Return(statemanager.NewNoopStateManager(), nil), state.EXPECT().AllTasks().AnyTimes(), ec2MetadataClient.EXPECT().InstanceID().Return(expectedInstanceID, nil), diff --git a/agent/statemanager/state_manager.go b/agent/statemanager/state_manager.go index a7b32873bba..72b959336f5 100644 --- a/agent/statemanager/state_manager.go +++ b/agent/statemanager/state_manager.go @@ -95,8 +95,9 @@ const ( // a) Add 'imageDigest' field to 'apicontainer.Container' // b) Add 'Region', 'ExecutionCredentialsID', 'ExternalConfigType', 'ExternalConfigValue' and 'NetworkMode' to // firelens task resource. + // 25) Add `seqNumTaskManifest` int field - ECSDataVersion = 24 + ECSDataVersion = 25 // ecsDataFile specifies the filename in the ECS_DATADIR ecsDataFile = "ecs_agent_data.json" diff --git a/agent/statemanager/state_manager_test.go b/agent/statemanager/state_manager_test.go index 5a49df382fe..db5e7de3be8 100644 --- a/agent/statemanager/state_manager_test.go +++ b/agent/statemanager/state_manager_test.go @@ -459,3 +459,30 @@ func TestLoadsDataForContainerImageDigest(t *testing.T) { assert.Equal(t, 1, len(task.Containers)) assert.Equal(t, "sha256:9f1003c480699be56815db0f8146ad2e22efea85129b5b5983d0e0fb52d9ab70", task.Containers[0].ImageDigest) } + +func TestLoadsDataSeqTaskManifest(t *testing.T) { + cleanup, err := setupWindowsTest(filepath.Join(".", "testdata", "v25", "seqNumTaskManifest", "ecs_agent_data.json")) + require.Nil(t, err, "Failed to set up test") + defer cleanup() + cfg := &config.Config{DataDir: filepath.Join(".", "testdata", "v25", "seqNumTaskManifest")} + taskEngine := engine.NewTaskEngine(&config.Config{}, nil, nil, nil, nil, dockerstate.NewTaskEngineState(), nil, nil) + var containerInstanceArn, cluster, savedInstanceID string + var sequenceNumber, seqNumTaskManifest int64 + stateManager, err := statemanager.NewStateManager(cfg, + statemanager.AddSaveable("TaskEngine", taskEngine), + statemanager.AddSaveable("ContainerInstanceArn", &containerInstanceArn), + statemanager.AddSaveable("Cluster", &cluster), + statemanager.AddSaveable("EC2InstanceID", &savedInstanceID), + statemanager.AddSaveable("SeqNum", &sequenceNumber), + statemanager.AddSaveable("seqNumTaskManifest", &seqNumTaskManifest), + ) + assert.NoError(t, err) + err = stateManager.Load() + assert.NoError(t, err) + tasks, err := taskEngine.ListTasks() + assert.NoError(t, err) + assert.Equal(t, 1, len(tasks)) + assert.Equal(t, "state-file", cluster) + assert.EqualValues(t, 0, sequenceNumber) + assert.EqualValues(t, 7, seqNumTaskManifest) +} diff --git a/agent/statemanager/testdata/v25/seqNumTaskManifest/ecs_agent_data.json b/agent/statemanager/testdata/v25/seqNumTaskManifest/ecs_agent_data.json new file mode 100644 index 00000000000..e490f2a58c9 --- /dev/null +++ b/agent/statemanager/testdata/v25/seqNumTaskManifest/ecs_agent_data.json @@ -0,0 +1,157 @@ +{ + "Data": { + "Cluster": "state-file", + "ContainerInstanceArn": "arn:aws:ecs:us-west-2:984736093387:container-instance/ea27e41b-c6e4-45a9-a7a0-484c95abece7", + "EC2InstanceID": "i-0e38e94fed89f598f", + "TaskEngine": { + "Tasks": [ + { + "Arn": "arn:aws:ecs:us-west-2:984736093387:task/70947c96-f64e-483a-a612-3fd4303546e7", + "Family": "sleep360", + "Version": "6", + "Containers": [ + { + "Name": "sleep", + "RuntimeID": "c6b1ea1004c6de778bdc4c00bc15085ef16b4b259f7dfc198a0a36b6629a7f90", + "V3EndpointID": "6d4b6283-452e-42ef-bafc-7e7f5a6dac99", + "Image": "busybox", + "ImageID": "sha256:db8ee88ad75f6bdc74663f4992a185e2722fa29573abcc1a19186cc5ec09dceb", + "Command": [ + "sleep", + "360" + ], + "Cpu": 10, + "GPUIDs": null, + "Memory": 100, + "Links": null, + "volumesFrom": [], + "mountPoints": [], + "portMappings": [], + "secrets": null, + "Essential": true, + "EntryPoint": null, + "environment": { + "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI": "/v2/credentials/16105516-51c7-41a2-ad8a-ba7411f309a0", + "AWS_EXECUTION_ENV": "AWS_ECS_EC2", + "ECS_CONTAINER_METADATA_URI": "http://169.254.170.2/v3/6d4b6283-452e-42ef-bafc-7e7f5a6dac99" + }, + "overrides": { + "command": null + }, + "dockerConfig": { + "config": "{}", + "hostConfig": "{\"CapAdd\":[],\"CapDrop\":[]}", + "version": "1.17" + }, + "registryAuthentication": null, + "LogsAuthStrategy": "", + "StartTimeout": 0, + "StopTimeout": 0, + "desiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "RunDependencies": null, + "IsInternal": "NORMAL", + "ApplyingError": null, + "SentStatus": "RUNNING", + "metadataFileUpdated": false, + "KnownExitCode": null, + "KnownPortBindings": null + } + ], + "associations": [], + "volumes": [], + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "KnownTime": "2019-08-06T21:59:04.217198554Z", + "PullStartedAt": "2019-08-06T21:59:01.88671907Z", + "PullStoppedAt": "2019-08-06T21:59:03.799514307Z", + "ExecutionStoppedAt": "0001-01-01T00:00:00Z", + "SentStatus": "RUNNING", + "StartSequenceNumber": 2, + "StopSequenceNumber": 0, + "executionCredentialsID": "", + "ENI": null, + "AppMesh": null, + "MemoryCPULimitsEnabled": true, + "PlatformFields": {} + } + ], + "IdToContainer": { + "c6b1ea1004c6de778bdc4c00bc15085ef16b4b259f7dfc198a0a36b6629a7f90": { + "DockerId": "c6b1ea1004c6de778bdc4c00bc15085ef16b4b259f7dfc198a0a36b6629a7f90", + "DockerName": "ecs-sleep360-6-sleep-a2b4d9d6ef938afc6f00", + "Container": { + "Name": "sleep", + "RuntimeID": "c6b1ea1004c6de778bdc4c00bc15085ef16b4b259f7dfc198a0a36b6629a7f90", + "V3EndpointID": "6d4b6283-452e-42ef-bafc-7e7f5a6dac99", + "Image": "busybox", + "ImageID": "sha256:db8ee88ad75f6bdc74663f4992a185e2722fa29573abcc1a19186cc5ec09dceb", + "Command": [ + "sleep", + "360" + ], + "Cpu": 10, + "GPUIDs": null, + "Memory": 100, + "Links": null, + "volumesFrom": [], + "mountPoints": [], + "portMappings": [], + "secrets": null, + "Essential": true, + "EntryPoint": null, + "environment": { + "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI": "/v2/credentials/16105516-51c7-41a2-ad8a-ba7411f309a0", + "AWS_EXECUTION_ENV": "AWS_ECS_EC2", + "ECS_CONTAINER_METADATA_URI": "http://169.254.170.2/v3/6d4b6283-452e-42ef-bafc-7e7f5a6dac99" + }, + "overrides": { + "command": null + }, + "dockerConfig": { + "config": "{}", + "hostConfig": "{\"CapAdd\":[],\"CapDrop\":[]}", + "version": "1.17" + }, + "registryAuthentication": null, + "LogsAuthStrategy": "", + "StartTimeout": 0, + "StopTimeout": 0, + "desiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "RunDependencies": null, + "IsInternal": "NORMAL", + "ApplyingError": null, + "SentStatus": "RUNNING", + "metadataFileUpdated": false, + "KnownExitCode": null, + "KnownPortBindings": null + } + } + }, + "IdToTask": { + "c6b1ea1004c6de778bdc4c00bc15085ef16b4b259f7dfc198a0a36b6629a7f90": "arn:aws:ecs:us-west-2:984736093387:task/70947c96-f64e-483a-a612-3fd4303546e7" + }, + "ImageStates": [ + { + "Image": { + "ImageID": "sha256:db8ee88ad75f6bdc74663f4992a185e2722fa29573abcc1a19186cc5ec09dceb", + "Names": [ + "busybox" + ], + "Size": 1223894 + }, + "PulledAt": "2019-08-06T21:59:03.797764725Z", + "LastUsedAt": "2019-08-06T21:59:03.797764824Z", + "PullSucceeded": true + } + ], + "ENIAttachments": null, + "IPToTask": {} + }, + "availabilityZone": "us-west-2b", + "seqNumTaskManifest": 7 + + }, + "Version": 24 +} diff --git a/agent/wsclient/client.go b/agent/wsclient/client.go index 2ff7f8f75f0..33f576c859c 100644 --- a/agent/wsclient/client.go +++ b/agent/wsclient/client.go @@ -442,7 +442,7 @@ func (cs *ClientServerImpl) handleMessage(data []byte) { if handler, ok := cs.RequestHandlers[typeStr]; ok { reflect.ValueOf(handler).Call([]reflect.Value{reflect.ValueOf(typedMessage)}) } else { - seelog.Infof("No handler for message type: %s", typeStr) + seelog.Infof("No handler for message type: %s %s", typeStr, typedMessage) } }