Skip to content

Commit

Permalink
eni attachment state change: ignore expired
Browse files Browse the repository at this point in the history
This change fixes a bug with the Agent's event submission
logic, where if the ENI attachment is ignored if the
ack time sent by the backend has expired. Not doing this
will result in the Agent forever trying to send the
attachment event to the backend as we can't reliably
distinguish between throttle errors and invalid input
errors from SubmitTaskStateChange response.
  • Loading branch information
aaithal committed Oct 7, 2017
1 parent a50aad7 commit ed1cd67
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 87 deletions.
9 changes: 9 additions & 0 deletions agent/api/eniattachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ func (eni *ENIAttachment) StopAckTimer() {
eni.ackTimer.Stop()
}

// HasExpired returns true if the ENI attachment object has exceeded the
// threshold for notifying the backend of the attachment
func (eni *ENIAttachment) HasExpired() bool {
eni.guard.RLock()
defer eni.guard.RUnlock()

return time.Now().After(eni.ExpiresAt)
}

// String returns a string representation of the ENI Attachment
func (eni *ENIAttachment) String() string {
eni.guard.RLock()
Expand Down
23 changes: 23 additions & 0 deletions agent/api/eniattachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,26 @@ func TestStartTimerErrorWhenExpiresAtIsInThePast(t *testing.T) {
}
assert.Error(t, attachment.StartTimer(func() {}))
}

func TestHasExpired(t *testing.T) {
for _, tc := range []struct {
expiresAt int64
expected bool
name string
}{
{time.Now().Unix() - 1, true, "expiresAt in past returns true"},
{time.Now().Unix() + 10, false, "expiresAt in future returns false"},
} {
t.Run(tc.name, func(t *testing.T) {
attachment := &ENIAttachment{
TaskARN: taskARN,
AttachmentARN: attachmentARN,
AttachStatusSent: attachSent,
MACAddress: mac,
Status: ENIAttachmentNone,
ExpiresAt: time.Unix(tc.expiresAt, 0),
}
assert.Equal(t, tc.expected, attachment.HasExpired())
})
}
}
2 changes: 1 addition & 1 deletion agent/ecscni/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (client *cniClient) ReleaseIPResource(cfg *Config) error {
Plugins: []*libcni.NetworkConfig{ipamConfig},
}

seelog.Debugf("Releasing the ip resource from ipam db, id: [%s], ip: [%s]", cfg.ID, cfg.IPAMV4Address)
seelog.Debugf("Releasing the ip resource from ipam db, id: [%s], ip: [%v]", cfg.ID, cfg.IPAMV4Address)
os.Setenv("ECS_CNI_LOGLEVEL", logger.GetLevel())
defer os.Unsetenv("ECS_CNI_LOGLEVEL")
return client.libcni.DelNetworkList(networkConfigList, cns)
Expand Down
60 changes: 31 additions & 29 deletions agent/eni/watcher/watcher_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,46 +139,46 @@ func (udevWatcher *UdevWatcher) reconcileOnce() error {

// Add new interfaces next
for mac, _ := range currentState {
udevWatcher.sendENIStateChange(mac)
if err := udevWatcher.sendENIStateChange(mac); err != nil {
log.Warnf("Udev watcher reconciliation: unable to send state change: %v", err)
}
}
return nil
}

// sendENIStateChange handles the eni event from udev or reconcile phase
func (udevWatcher *UdevWatcher) sendENIStateChange(mac string) {
eniAttachment, ok := udevWatcher.shouldSendENIStateChange(mac)
if ok {
go func(eni *api.ENIAttachment) {
eni.Status = api.ENIAttached
log.Infof("Emitting ENI change event for: %v", eni)
udevWatcher.eniChangeEvent <- api.TaskStateChange{
TaskARN: eni.TaskARN,
Attachment: eni,
}
}(eniAttachment)
}
}

// shouldSendENIStateChange checks whether this eni is managed by ecs
// and if its status should be sent to backend
func (udevWatcher *UdevWatcher) shouldSendENIStateChange(macAddress string) (*api.ENIAttachment, bool) {
if macAddress == "" {
log.Warn("ENI state manager: device with empty mac address")
return nil, false
func (udevWatcher *UdevWatcher) sendENIStateChange(mac string) error {
if mac == "" {
return errors.New("udev watcher send ENI state change: empty mac address")
}
// check if this is an eni required by a task
eni, ok := udevWatcher.agentState.ENIByMac(macAddress)
eni, ok := udevWatcher.agentState.ENIByMac(mac)
if !ok {
log.Infof("ENI state manager: eni not managed by ecs: %s", macAddress)
return nil, false
return errors.Errorf("udev watcher send ENI state change: eni not managed by ecs: %s", mac)
}

if eni.IsSent() {
log.Infof("ENI state manager: eni attach status has already sent: %s", macAddress)
return eni, false
return errors.Errorf("udev watcher send ENI state change: eni status already sent: %s", eni.String())
}
if eni.HasExpired() {
// Agent is aware of the ENI, but we decide not to ack it
// as it's ack timeout has expired
udevWatcher.agentState.RemoveENIAttachment(eni.MACAddress)
return errors.Errorf(
"udev watcher send ENI state change: eni status expired, no longer tracking it: %s",
eni.String())
}

return eni, true
// We found an ENI, which has the expiration time set in future and
// needs to be acknowledged as having been 'attached' to the Instance
go func(eni *api.ENIAttachment) {
eni.Status = api.ENIAttached
log.Infof("Emitting ENI change event for: %s", eni.String())
udevWatcher.eniChangeEvent <- api.TaskStateChange{
TaskARN: eni.TaskARN,
Attachment: eni,
}
}(eni)
return nil
}

// buildState is used to build a state of the system for reconciliation
Expand Down Expand Up @@ -227,7 +227,9 @@ func (udevWatcher *UdevWatcher) eventHandler() {
log.Warnf("Udev watcher event-handler: error obtaining MACAddress for interface %s", netInterface)
continue
}
udevWatcher.sendENIStateChange(macAddress)
if err := udevWatcher.sendENIStateChange(macAddress); err != nil {
log.Warnf("Udev watcher event-handler: unable to send state change: %v", err)
}
case <-udevWatcher.ctx.Done():
log.Info("Stopping udev event handler")
// Send the shutdown signal and close the connection
Expand Down
93 changes: 50 additions & 43 deletions agent/eni/watcher/watcher_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package watcher
import (
"context"
"errors"
"fmt"
"net"
"sync"
"testing"
"time"

"github.com/deniswernert/udev"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -63,6 +63,7 @@ func TestWatcherInit(t *testing.T) {
taskEngineState.AddENIAttachment(&api.ENIAttachment{
MACAddress: randomMAC,
AttachStatusSent: false,
ExpiresAt: time.Unix(time.Now().Unix()+10, 0),
})
eventChannel := make(chan statechange.Event)

Expand Down Expand Up @@ -164,6 +165,7 @@ func TestReconcileENIs(t *testing.T) {
taskEngineState.AddENIAttachment(&api.ENIAttachment{
MACAddress: randomMAC,
AttachStatusSent: false,
ExpiresAt: time.Unix(time.Now().Unix()+10, 0),
})

mockNetlink.EXPECT().LinkList().Return([]netlink.Link{
Expand Down Expand Up @@ -293,7 +295,7 @@ func TestUdevAddEvent(t *testing.T) {
},
}, nil),
mockStateManager.EXPECT().ENIByMac(randomMAC).Return(
&api.ENIAttachment{}, true),
&api.ENIAttachment{ExpiresAt: time.Unix(time.Now().Unix()+10, 0)}, true),
)

// Spin off event handler
Expand Down Expand Up @@ -452,7 +454,9 @@ func TestSendENIStateChange(t *testing.T) {

watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, eventChannel)

mockStateManager.EXPECT().ENIByMac(randomMAC).Return(&api.ENIAttachment{}, true)
mockStateManager.EXPECT().ENIByMac(randomMAC).Return(&api.ENIAttachment{
ExpiresAt: time.Unix(time.Now().Unix()+10, 0),
}, true)

go watcher.sendENIStateChange(randomMAC)

Expand All @@ -462,46 +466,49 @@ func TestSendENIStateChange(t *testing.T) {
assert.Equal(t, api.ENIAttached, taskStateChange.Attachment.Status)
}

func TestShouldSendENIStateChange(t *testing.T) {
testCases := []struct {
eniAttachment *api.ENIAttachment
eniByMACExists bool
expectStateChange bool
}{
{
&api.ENIAttachment{},
true,
true,
},
{
func TestSendENIStateChangeUnmanaged(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl)
watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil)

mockStateManager.EXPECT().ENIByMac(randomMAC).Return(nil, false)
assert.Error(t, watcher.sendENIStateChange(randomMAC))
}

func TestSendENIStateChangeAlreadySent(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl)
watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil)

mockStateManager.EXPECT().ENIByMac(randomMAC).Return(&api.ENIAttachment{
AttachStatusSent: true,
ExpiresAt: time.Unix(time.Now().Unix()+10, 0),
MACAddress: randomMAC,
}, true)

assert.Error(t, watcher.sendENIStateChange(randomMAC))
}

func TestSendENIStateChangeExpired(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl)
watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil)

gomock.InOrder(
mockStateManager.EXPECT().ENIByMac(randomMAC).Return(
&api.ENIAttachment{
AttachStatusSent: true,
},
true,
false,
},
{
&api.ENIAttachment{},
false,
false,
},
}
for _, tc := range testCases {
t.Run(
fmt.Sprintf("return %t when exists is %t and sent is %s",
tc.expectStateChange, tc.eniByMACExists, tc.eniAttachment.Status.String()),
func(t *testing.T) {

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockStateManager := mock_dockerstate.NewMockTaskEngineState(mockCtrl)
watcher := newWatcher(context.TODO(), primaryMAC, nil, nil, mockStateManager, nil)

mockStateManager.EXPECT().ENIByMac(randomMAC).Return(tc.eniAttachment, tc.eniByMACExists)
_, ok := watcher.shouldSendENIStateChange(randomMAC)
assert.Equal(t, tc.expectStateChange, ok)
})
}
AttachStatusSent: false,
ExpiresAt: time.Unix(time.Now().Unix()-10, 0),
MACAddress: randomMAC,
}, true),
mockStateManager.EXPECT().RemoveENIAttachment(randomMAC),
)

assert.Error(t, watcher.sendENIStateChange(randomMAC))
}
14 changes: 0 additions & 14 deletions agent/eventhandler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,20 +277,6 @@ func TestCleanupTaskEventAfterSubmit(t *testing.T) {
assert.Len(t, handler.tasksToEvents, 0)
}

func TestShouldBeSent(t *testing.T) {
sendableEvent := newSendableContainerEvent(api.ContainerStateChange{
Status: api.ContainerStopped,
})

if sendableEvent.taskShouldBeSent() {
t.Error("Container event should not be sent as a task")
}

if !sendableEvent.containerShouldBeSent() {
t.Error("Container should be sent if it's the first try")
}
}

func containerEvent(arn string) statechange.Event {
return api.ContainerStateChange{TaskArn: arn, ContainerName: "containerName", Status: api.ContainerRunning, Container: &api.Container{}}
}
Expand Down
1 change: 1 addition & 0 deletions agent/eventhandler/task_handler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (event *sendableEvent) taskAttachmentShouldBeSent() bool {
tevent := event.taskChange
return tevent.Status == api.TaskStatusNone && // Task Status is not set for attachments as task record has yet to be streamed down
tevent.Attachment != nil && // Task has attachment records
!tevent.Attachment.HasExpired() && // ENI attachment ack timestamp hasn't expired
!tevent.Attachment.IsSent() // Task status hasn't already been sent
}

Expand Down
Loading

0 comments on commit ed1cd67

Please sign in to comment.