From ed1cd673f0640bad262076b2a47062c28da4d4df Mon Sep 17 00:00:00 2001 From: Aithal Date: Wed, 4 Oct 2017 16:39:43 -0700 Subject: [PATCH] eni attachment state change: ignore expired This change fixes a bug with the Agent's event submission logic, where if the ENI attachment is ignored if the ack time sent by the backend has expired. Not doing this will result in the Agent forever trying to send the attachment event to the backend as we can't reliably distinguish between throttle errors and invalid input errors from SubmitTaskStateChange response. --- agent/api/eniattachment.go | 9 ++ agent/api/eniattachment_test.go | 23 +++ agent/ecscni/plugin.go | 2 +- agent/eni/watcher/watcher_linux.go | 60 +++---- agent/eni/watcher/watcher_linux_test.go | 93 ++++++----- agent/eventhandler/handler_test.go | 14 -- agent/eventhandler/task_handler_types.go | 1 + agent/eventhandler/task_handler_types_test.go | 153 ++++++++++++++++++ 8 files changed, 268 insertions(+), 87 deletions(-) create mode 100644 agent/eventhandler/task_handler_types_test.go diff --git a/agent/api/eniattachment.go b/agent/api/eniattachment.go index e0de3257467..548124c5c17 100644 --- a/agent/api/eniattachment.go +++ b/agent/api/eniattachment.go @@ -90,6 +90,15 @@ func (eni *ENIAttachment) StopAckTimer() { eni.ackTimer.Stop() } +// HasExpired returns true if the ENI attachment object has exceeded the +// threshold for notifying the backend of the attachment +func (eni *ENIAttachment) HasExpired() bool { + eni.guard.RLock() + defer eni.guard.RUnlock() + + return time.Now().After(eni.ExpiresAt) +} + // String returns a string representation of the ENI Attachment func (eni *ENIAttachment) String() string { eni.guard.RLock() diff --git a/agent/api/eniattachment_test.go b/agent/api/eniattachment_test.go index 19df2ed15e5..28527bc100f 100644 --- a/agent/api/eniattachment_test.go +++ b/agent/api/eniattachment_test.go @@ -68,3 +68,26 @@ func TestStartTimerErrorWhenExpiresAtIsInThePast(t *testing.T) { } assert.Error(t, attachment.StartTimer(func() {})) } + +func TestHasExpired(t *testing.T) { + for _, tc := range []struct { + expiresAt int64 + expected bool + name string + }{ + {time.Now().Unix() - 1, true, "expiresAt in past returns true"}, + {time.Now().Unix() + 10, false, "expiresAt in future returns false"}, + } { + t.Run(tc.name, func(t *testing.T) { + attachment := &ENIAttachment{ + TaskARN: taskARN, + AttachmentARN: attachmentARN, + AttachStatusSent: attachSent, + MACAddress: mac, + Status: ENIAttachmentNone, + ExpiresAt: time.Unix(tc.expiresAt, 0), + } + assert.Equal(t, tc.expected, attachment.HasExpired()) + }) + } +} diff --git a/agent/ecscni/plugin.go b/agent/ecscni/plugin.go index 21f65a3e4ea..5bacc3705d9 100644 --- a/agent/ecscni/plugin.go +++ b/agent/ecscni/plugin.go @@ -124,7 +124,7 @@ func (client *cniClient) ReleaseIPResource(cfg *Config) error { Plugins: []*libcni.NetworkConfig{ipamConfig}, } - seelog.Debugf("Releasing the ip resource from ipam db, id: [%s], ip: [%s]", cfg.ID, cfg.IPAMV4Address) + seelog.Debugf("Releasing the ip resource from ipam db, id: [%s], ip: [%v]", cfg.ID, cfg.IPAMV4Address) os.Setenv("ECS_CNI_LOGLEVEL", logger.GetLevel()) defer os.Unsetenv("ECS_CNI_LOGLEVEL") return client.libcni.DelNetworkList(networkConfigList, cns) diff --git a/agent/eni/watcher/watcher_linux.go b/agent/eni/watcher/watcher_linux.go index 06f242a0a75..41eaa2fda3c 100644 --- a/agent/eni/watcher/watcher_linux.go +++ b/agent/eni/watcher/watcher_linux.go @@ -139,46 +139,46 @@ func (udevWatcher *UdevWatcher) reconcileOnce() error { // Add new interfaces next for mac, _ := range currentState { - udevWatcher.sendENIStateChange(mac) + if err := udevWatcher.sendENIStateChange(mac); err != nil { + log.Warnf("Udev watcher reconciliation: unable to send state change: %v", err) + } } return nil } // sendENIStateChange handles the eni event from udev or reconcile phase -func (udevWatcher *UdevWatcher) sendENIStateChange(mac string) { - eniAttachment, ok := udevWatcher.shouldSendENIStateChange(mac) - if ok { - go func(eni *api.ENIAttachment) { - eni.Status = api.ENIAttached - log.Infof("Emitting ENI change event for: %v", eni) - udevWatcher.eniChangeEvent <- api.TaskStateChange{ - TaskARN: eni.TaskARN, - Attachment: eni, - } - }(eniAttachment) - } -} - -// shouldSendENIStateChange checks whether this eni is managed by ecs -// and if its status should be sent to backend -func (udevWatcher *UdevWatcher) shouldSendENIStateChange(macAddress string) (*api.ENIAttachment, bool) { - if macAddress == "" { - log.Warn("ENI state manager: device with empty mac address") - return nil, false +func (udevWatcher *UdevWatcher) sendENIStateChange(mac string) error { + if mac == "" { + return errors.New("udev watcher send ENI state change: empty mac address") } // check if this is an eni required by a task - eni, ok := udevWatcher.agentState.ENIByMac(macAddress) + eni, ok := udevWatcher.agentState.ENIByMac(mac) if !ok { - log.Infof("ENI state manager: eni not managed by ecs: %s", macAddress) - return nil, false + return errors.Errorf("udev watcher send ENI state change: eni not managed by ecs: %s", mac) } - if eni.IsSent() { - log.Infof("ENI state manager: eni attach status has already sent: %s", macAddress) - return eni, false + return errors.Errorf("udev watcher send ENI state change: eni status already sent: %s", eni.String()) + } + if eni.HasExpired() { + // Agent is aware of the ENI, but we decide not to ack it + // as it's ack timeout has expired + udevWatcher.agentState.RemoveENIAttachment(eni.MACAddress) + return errors.Errorf( + "udev watcher send ENI state change: eni status expired, no longer tracking it: %s", + eni.String()) } - return eni, true + // We found an ENI, which has the expiration time set in future and + // needs to be acknowledged as having been 'attached' to the Instance + go func(eni *api.ENIAttachment) { + eni.Status = api.ENIAttached + log.Infof("Emitting ENI change event for: %s", eni.String()) + udevWatcher.eniChangeEvent <- api.TaskStateChange{ + TaskARN: eni.TaskARN, + Attachment: eni, + } + }(eni) + return nil } // buildState is used to build a state of the system for reconciliation @@ -227,7 +227,9 @@ func (udevWatcher *UdevWatcher) eventHandler() { log.Warnf("Udev watcher event-handler: error obtaining MACAddress for interface %s", netInterface) continue } - udevWatcher.sendENIStateChange(macAddress) + if err := udevWatcher.sendENIStateChange(macAddress); err != nil { + log.Warnf("Udev watcher event-handler: unable to send state change: %v", err) + } case <-udevWatcher.ctx.Done(): log.Info("Stopping udev event handler") // Send the shutdown signal and close the connection diff --git a/agent/eni/watcher/watcher_linux_test.go b/agent/eni/watcher/watcher_linux_test.go index f29694ed9e0..328f03f25d7 100644 --- a/agent/eni/watcher/watcher_linux_test.go +++ b/agent/eni/watcher/watcher_linux_test.go @@ -18,10 +18,10 @@ package watcher import ( "context" "errors" - "fmt" "net" "sync" "testing" + "time" "github.com/deniswernert/udev" "github.com/golang/mock/gomock" @@ -63,6 +63,7 @@ func TestWatcherInit(t *testing.T) { taskEngineState.AddENIAttachment(&api.ENIAttachment{ MACAddress: randomMAC, AttachStatusSent: false, + ExpiresAt: time.Unix(time.Now().Unix()+10, 0), }) eventChannel := make(chan statechange.Event) @@ -164,6 +165,7 @@ func TestReconcileENIs(t *testing.T) { taskEngineState.AddENIAttachment(&api.ENIAttachment{ MACAddress: randomMAC, AttachStatusSent: false, + ExpiresAt: time.Unix(time.Now().Unix()+10, 0), }) mockNetlink.EXPECT().LinkList().Return([]netlink.Link{ @@ -293,7 +295,7 @@ func TestUdevAddEvent(t *testing.T) { }, }, nil), mockStateManager.EXPECT().ENIByMac(randomMAC).Return( - &api.ENIAttachment{}, true), + &api.ENIAttachment{ExpiresAt: time.Unix(time.Now().Unix()+10, 0)}, true), ) // Spin off event handler @@ -452,7 +454,9 @@ func TestSendENIStateChange(t *testing.T) { watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, eventChannel) - mockStateManager.EXPECT().ENIByMac(randomMAC).Return(&api.ENIAttachment{}, true) + mockStateManager.EXPECT().ENIByMac(randomMAC).Return(&api.ENIAttachment{ + ExpiresAt: time.Unix(time.Now().Unix()+10, 0), + }, true) go watcher.sendENIStateChange(randomMAC) @@ -462,46 +466,49 @@ func TestSendENIStateChange(t *testing.T) { assert.Equal(t, api.ENIAttached, taskStateChange.Attachment.Status) } -func TestShouldSendENIStateChange(t *testing.T) { - testCases := []struct { - eniAttachment *api.ENIAttachment - eniByMACExists bool - expectStateChange bool - }{ - { - &api.ENIAttachment{}, - true, - true, - }, - { +func TestSendENIStateChangeUnmanaged(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl) + watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil) + + mockStateManager.EXPECT().ENIByMac(randomMAC).Return(nil, false) + assert.Error(t, watcher.sendENIStateChange(randomMAC)) +} + +func TestSendENIStateChangeAlreadySent(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl) + watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil) + + mockStateManager.EXPECT().ENIByMac(randomMAC).Return(&api.ENIAttachment{ + AttachStatusSent: true, + ExpiresAt: time.Unix(time.Now().Unix()+10, 0), + MACAddress: randomMAC, + }, true) + + assert.Error(t, watcher.sendENIStateChange(randomMAC)) +} + +func TestSendENIStateChangeExpired(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl) + watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil) + + gomock.InOrder( + mockStateManager.EXPECT().ENIByMac(randomMAC).Return( &api.ENIAttachment{ - AttachStatusSent: true, - }, - true, - false, - }, - { - &api.ENIAttachment{}, - false, - false, - }, - } - for _, tc := range testCases { - t.Run( - fmt.Sprintf("return %t when exists is %t and sent is %s", - tc.expectStateChange, tc.eniByMACExists, tc.eniAttachment.Status.String()), - func(t *testing.T) { - - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl) - watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil) - - mockStateManager.EXPECT().ENIByMac(randomMAC).Return(tc.eniAttachment, tc.eniByMACExists) - _, ok := watcher.shouldSendENIStateChange(randomMAC) - assert.Equal(t, tc.expectStateChange, ok) - }) - } + AttachStatusSent: false, + ExpiresAt: time.Unix(time.Now().Unix()-10, 0), + MACAddress: randomMAC, + }, true), + mockStateManager.EXPECT().RemoveENIAttachment(randomMAC), + ) + assert.Error(t, watcher.sendENIStateChange(randomMAC)) } diff --git a/agent/eventhandler/handler_test.go b/agent/eventhandler/handler_test.go index 4beb69e3041..15b3460f1cc 100644 --- a/agent/eventhandler/handler_test.go +++ b/agent/eventhandler/handler_test.go @@ -277,20 +277,6 @@ func TestCleanupTaskEventAfterSubmit(t *testing.T) { assert.Len(t, handler.tasksToEvents, 0) } -func TestShouldBeSent(t *testing.T) { - sendableEvent := newSendableContainerEvent(api.ContainerStateChange{ - Status: api.ContainerStopped, - }) - - if sendableEvent.taskShouldBeSent() { - t.Error("Container event should not be sent as a task") - } - - if !sendableEvent.containerShouldBeSent() { - t.Error("Container should be sent if it's the first try") - } -} - func containerEvent(arn string) statechange.Event { return api.ContainerStateChange{TaskArn: arn, ContainerName: "containerName", Status: api.ContainerRunning, Container: &api.Container{}} } diff --git a/agent/eventhandler/task_handler_types.go b/agent/eventhandler/task_handler_types.go index e7bbefc0548..0f36bc681d0 100644 --- a/agent/eventhandler/task_handler_types.go +++ b/agent/eventhandler/task_handler_types.go @@ -93,6 +93,7 @@ func (event *sendableEvent) taskAttachmentShouldBeSent() bool { tevent := event.taskChange return tevent.Status == api.TaskStatusNone && // Task Status is not set for attachments as task record has yet to be streamed down tevent.Attachment != nil && // Task has attachment records + !tevent.Attachment.HasExpired() && // ENI attachment ack timestamp hasn't expired !tevent.Attachment.IsSent() // Task status hasn't already been sent } diff --git a/agent/eventhandler/task_handler_types_test.go b/agent/eventhandler/task_handler_types_test.go new file mode 100644 index 00000000000..3cb257a4999 --- /dev/null +++ b/agent/eventhandler/task_handler_types_test.go @@ -0,0 +1,153 @@ +// Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package eventhandler + +import ( + "fmt" + "testing" + "time" + + "github.com/aws/amazon-ecs-agent/agent/api" + "github.com/stretchr/testify/assert" +) + +func TestShouldContainerEventBeSent(t *testing.T) { + event := newSendableContainerEvent(api.ContainerStateChange{ + Status: api.ContainerStopped, + }) + assert.Equal(t, true, event.containerShouldBeSent()) + assert.Equal(t, false, event.taskShouldBeSent()) +} + +func TestShouldTaskEventBeSent(t *testing.T) { + for _, tc := range []struct { + event *sendableEvent + shouldBeSent bool + }{ + { + // We don't send a task event to backend if task status == NONE + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStatusNone, + }), + shouldBeSent: false, + }, + { + // task status == RUNNING should be sent to backend + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskRunning, + }), + shouldBeSent: true, + }, + { + // task event will not be sent if sent status >= task status + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskRunning, + Task: &api.Task{ + SentStatusUnsafe: api.TaskRunning, + }, + }), + shouldBeSent: false, + }, + { + // this is a valid event as task status >= sent status + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStopped, + Task: &api.Task{ + SentStatusUnsafe: api.TaskRunning, + }, + }), + shouldBeSent: true, + }, + } { + t.Run(fmt.Sprintf("Event[%s] should be sent[%t]", tc.event.String(), tc.shouldBeSent), func(t *testing.T) { + assert.Equal(t, tc.shouldBeSent, tc.event.taskShouldBeSent()) + assert.Equal(t, false, tc.event.containerShouldBeSent()) + assert.Equal(t, false, tc.event.taskAttachmentShouldBeSent()) + }) + } +} + +func TestShouldTaskAttachmentEventBeSent(t *testing.T) { + for _, tc := range []struct { + event *sendableEvent + attachmentShouldBeSent bool + taskShouldBeSent bool + }{ + { + // ENI Attachment is only sent if task status == NONE + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStopped, + }), + attachmentShouldBeSent: false, + taskShouldBeSent: true, + }, + { + // ENI Attachment is only sent if task status == NONE and if + // the event has a non nil attachment object + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStatusNone, + }), + attachmentShouldBeSent: false, + taskShouldBeSent: false, + }, + { + // ENI Attachment is only sent if task status == NONE and if + // the event has a non nil attachment object and if expiration + // ack timeout is set for future + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStatusNone, + Attachment: &api.ENIAttachment{ + ExpiresAt: time.Unix(time.Now().Unix()-1, 0), + AttachStatusSent: false, + }, + }), + attachmentShouldBeSent: false, + taskShouldBeSent: false, + }, + { + // ENI Attachment is only sent if task status == NONE and if + // the event has a non nil attachment object and if expiration + // ack timeout is set for future and if attachment status hasn't + // already been sent + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStatusNone, + Attachment: &api.ENIAttachment{ + ExpiresAt: time.Unix(time.Now().Unix()+10, 0), + AttachStatusSent: true, + }, + }), + attachmentShouldBeSent: false, + taskShouldBeSent: false, + }, + { + // Valid attachment event, ensure that its sent + event: newSendableTaskEvent(api.TaskStateChange{ + Status: api.TaskStatusNone, + Attachment: &api.ENIAttachment{ + ExpiresAt: time.Unix(time.Now().Unix()+10, 0), + AttachStatusSent: false, + }, + }), + attachmentShouldBeSent: true, + taskShouldBeSent: false, + }, + } { + t.Run(fmt.Sprintf("Event[%s] should be sent[attachment=%t;task=%t]", + tc.event.String(), tc.attachmentShouldBeSent, tc.taskShouldBeSent), func(t *testing.T) { + assert.Equal(t, tc.attachmentShouldBeSent, tc.event.taskAttachmentShouldBeSent()) + assert.Equal(t, tc.taskShouldBeSent, tc.event.taskShouldBeSent()) + assert.Equal(t, false, tc.event.containerShouldBeSent()) + }) + } +}