Skip to content

Commit

Permalink
Add functionality in Agent to confirm Trunk ENI attachment.
Browse files Browse the repository at this point in the history
A high level workflow to for this is as follow:

1. An ACS handler, AttachInstanceENIHandler receives an AttachNetworkInterfacesMessage from ACS which contains the information of the trunk ENI. upon receiving this message the handler adds the trunk eni attachment to the attachment table in Agent's state;
2. Udev Watcher is periodically scanning through attached network devices on the instances and also listens to network device's attached event. Once it found that the trunk ENI is attached (it knows a device is the trunk ENI from information in attachment table added in step 1), it sends an AttachmentStateChange event to the global state change event channel;
3. An event handler, AttachmentEventHandler handles the AttachmentStateChange event from the event channel by sending it to backend.
  • Loading branch information
fenxiong committed Feb 27, 2019
1 parent e63f93a commit e0b6a1b
Show file tree
Hide file tree
Showing 37 changed files with 2,168 additions and 775 deletions.
73 changes: 72 additions & 1 deletion agent/acs/client/acs_client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build unit

// Copyright 2014-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
Expand Down Expand Up @@ -75,6 +75,27 @@ const (
}]
}
}
`
sampleAttachInstanceENIMessage = `
{
"type": "AttachInstanceNetworkInterfacesMessage",
"message": {
"messageId": "123",
"clusterArn": "default",
"elasticNetworkInterfaces":[{
"attachmentArn": "attach_arn",
"ec2Id": "eni_id",
"ipv4Addresses":[{
"primary": true,
"privateAddress": "ipv4"
}],
"ipv6Addresses":[{
"address": "ipv6"
}],
"macAddress": "mac"
}]
}
}
`
)

Expand Down Expand Up @@ -410,3 +431,53 @@ func TestAttachENIHandlerCalled(t *testing.T) {

assert.Equal(t, <-messageChannel, expectedMessage)
}

func TestAttachInstanceENIHandlerCalled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

conn := mock_wsconn.NewMockWebsocketConn(ctrl)
cs := testCS(conn)
defer cs.Close()

// Messages should be read from the connection at least once
conn.EXPECT().SetReadDeadline(gomock.Any()).Return(nil).MinTimes(1)
conn.EXPECT().ReadMessage().Return(websocket.TextMessage,
[]byte(sampleAttachInstanceENIMessage), nil).MinTimes(1)
// Invoked when closing the connection
conn.EXPECT().SetWriteDeadline(gomock.Any()).Return(nil)
conn.EXPECT().Close()

messageChannel := make(chan *ecsacs.AttachInstanceNetworkInterfacesMessage)
reqHandler := func(message *ecsacs.AttachInstanceNetworkInterfacesMessage) {
messageChannel <- message
}

cs.AddRequestHandler(reqHandler)

go cs.Serve()

expectedMessage := &ecsacs.AttachInstanceNetworkInterfacesMessage{
MessageId: aws.String("123"),
ClusterArn: aws.String("default"),
ElasticNetworkInterfaces: []*ecsacs.ElasticNetworkInterface{
{AttachmentArn: aws.String("attach_arn"),
Ec2Id: aws.String("eni_id"),
Ipv4Addresses: []*ecsacs.IPv4AddressAssignment{
{
Primary: aws.Bool(true),
PrivateAddress: aws.String("ipv4"),
},
},
Ipv6Addresses: []*ecsacs.IPv6AddressAssignment{
{
Address: aws.String("ipv6"),
},
},
MacAddress: aws.String("mac"),
},
},
}

assert.Equal(t, <-messageChannel, expectedMessage)
}
3 changes: 2 additions & 1 deletion agent/acs/client/acs_client_types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Copyright 2014-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
Expand Down Expand Up @@ -46,6 +46,7 @@ func init() {
ecsacs.InactiveInstanceException{},
ecsacs.ErrorMessage{},
ecsacs.AttachTaskNetworkInterfacesMessage{},
ecsacs.AttachInstanceNetworkInterfacesMessage{},
}
}

Expand Down
8 changes: 4 additions & 4 deletions agent/acs/client/acs_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ func TestServerException(t *testing.T) {
func TestGenericErrorConversion(t *testing.T) {
err := acsErr.NewError(errors.New("generic error"))

require.True(t, err.Retry(),"Should default to retriable")
require.EqualError(t, err, "ACSError: generic error")
require.True(t, err.Retry(), "Should default to retriable")
require.EqualError(t, err, "ACSError: generic error")
}

func TestSomeRandomTypeConversion(t *testing.T) {
// This is really just an 'it doesn't panic' check.
err := acsErr.NewError(t)
require.True(t, err.Retry(),"Should default to retriable")
require.True(t, err.Retry(), "Should default to retriable")
require.True(t, strings.HasPrefix(err.Error(), "ACSError: Unknown error"))
}

func TestBadlyTypedMessage(t *testing.T) {
// Another 'does not panic' check
err := acsErr.NewError(struct{ Message int }{1})
require.True(t, err.Retry(),"Should default to retriable")
require.True(t, err.Retry(), "Should default to retriable")
require.NotNil(t, err.Error())
}
18 changes: 16 additions & 2 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {

client.AddRequestHandler(refreshCredsHandler.handlerFunc())

// Add handler to ack ENI attach message
eniAttachHandler := newAttachENIHandler(
// Add handler to ack task ENI attach message
eniAttachHandler := newAttachTaskENIHandler(
acsSession.ctx,
cfg.Cluster,
acsSession.containerInstanceARN,
Expand All @@ -279,6 +279,20 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {

client.AddRequestHandler(eniAttachHandler.handlerFunc())

// Add handler to ack instance ENI attach message
instanceENIAttachHandler := newAttachInstanceENIHandler(
acsSession.ctx,
cfg.Cluster,
acsSession.containerInstanceARN,
client,
acsSession.state,
acsSession.stateManager,
)
instanceENIAttachHandler.start()
defer instanceENIAttachHandler.stop()

client.AddRequestHandler(instanceENIAttachHandler.handlerFunc())

// Add request handler for handling payload messages from ACS
payloadHandler := newPayloadRequestHandler(
acsSession.ctx,
Expand Down
112 changes: 112 additions & 0 deletions agent/acs/handler/attach_eni_handler_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package handler

import (
"fmt"
"time"

"github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs"
apieni "github.com/aws/amazon-ecs-agent/agent/api/eni"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
"github.com/aws/amazon-ecs-agent/agent/wsclient"
"github.com/aws/aws-sdk-go/aws"

"github.com/cihub/seelog"
"github.com/pkg/errors"
)

// ackTimeoutHandler remove ENI attachment from agent state after the ENI ack timeout
type ackTimeoutHandler struct {
mac string
state dockerstate.TaskEngineState
}

func (handler *ackTimeoutHandler) handle() {
eniAttachment, ok := handler.state.ENIByMac(handler.mac)
if !ok {
seelog.Warnf("Ignoring unmanaged ENI attachment with MAC address: %s", handler.mac)
return
}
if !eniAttachment.IsSent() {
seelog.Warnf("Timed out waiting for ENI ack; removing ENI attachment record with MAC address: %s", handler.mac)
handler.state.RemoveENIAttachment(handler.mac)
}
}

// sendAck sends ack for a certain ACS message
func sendAck(acsClient wsclient.ClientServer, clusterArn *string, containerInstanceArn *string, messageId *string) {
if err := acsClient.MakeRequest(&ecsacs.AckRequest{
Cluster: clusterArn,
ContainerInstance: containerInstanceArn,
MessageId: messageId,
}); err != nil {
seelog.Warnf("Failed to ack request with messageId: %s, error: %v", aws.StringValue(messageId), err)
}
}

// handleENIAttachment handles an ENI attachment via the following:
// 1. Check whether we already have this attachment in state, if so, start its ack timer and return
// 2. Otherwise add the attachment to state, start its ack timer, and save the state
// These are common tasks for handling a task ENI attachment and an instance ENI attachment, so they are put
// into this function to be shared by both attachment handlers
func handleENIAttachment(attachmentType, attachmentARN, taskARN, mac string,
expiresAt time.Time,
state dockerstate.TaskEngineState,
saver statemanager.Saver) error {
seelog.Infof("Handling ENI attachment: %s", attachmentARN)

if eniAttachment, ok := state.ENIByMac(mac); ok {
seelog.Infof("Duplicate %s attachment message for ENI with MAC address: %s", attachmentType, mac)
eniAckTimeoutHandler := ackTimeoutHandler{mac: mac, state: state}
return eniAttachment.StartTimer(eniAckTimeoutHandler.handle)
}
if err := addENIAttachmentToState(attachmentType, attachmentARN, taskARN, mac, expiresAt, state); err != nil {
return errors.Wrapf(err, fmt.Sprintf("attach %s message handler: unable to add eni attachment to engine state", attachmentType))
}
if err := saver.Save(); err != nil {
return errors.Wrapf(err, fmt.Sprintf("attach %s message handler: unable to save agent state", attachmentType))
}
return nil
}

// addENIAttachmentToState adds an ENI attachment to state, and start its ack timer
func addENIAttachmentToState(attachmentType, attachmentARN, taskARN, mac string, expiresAt time.Time, state dockerstate.TaskEngineState) error {
eniAttachment := &apieni.ENIAttachment{
TaskARN: taskARN,
AttachmentType: attachmentType,
AttachmentARN: attachmentARN,
AttachStatusSent: false,
MACAddress: mac,
ExpiresAt: expiresAt, // Stop tracking the eni attachment after timeout
}
eniAckTimeoutHandler := ackTimeoutHandler{mac: mac, state: state}
if err := eniAttachment.StartTimer(eniAckTimeoutHandler.handle); err != nil {
return err
}

switch attachmentType {
case apieni.ENIAttachmentTypeTaskENI:
seelog.Infof("Adding task eni attachment info for task '%s' to state, attachment=%s mac=%s",
taskARN, attachmentARN, mac)
case apieni.ENIAttachmentTypeInstanceENI:
seelog.Infof("Adding instance eni attachment info to state, attachment=%s mac=%s", attachmentARN, mac)
default:
return fmt.Errorf("unrecognized eni attachment type: %s", attachmentType)
}

state.AddENIAttachment(eniAttachment)
return nil
}
116 changes: 116 additions & 0 deletions agent/acs/handler/attach_eni_handler_common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// +build unit

// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package handler

import (
"testing"
"time"

apieni "github.com/aws/amazon-ecs-agent/agent/api/eni"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)

const (
attachmentArn = "attachmentarn"
)

// TestTaskENIAckTimeout tests acknowledge timeout for a task eni before submit the state change
func TestTaskENIAckTimeout(t *testing.T) {
testENIAckTimeout(t, apieni.ENIAttachmentTypeTaskENI)
}

// TestInstanceENIAckTimeout tests acknowledge timeout for an instance level eni before submit the state change
func TestInstanceENIAckTimeout(t *testing.T) {
testENIAckTimeout(t, apieni.ENIAttachmentTypeInstanceENI)
}

func testENIAckTimeout(t *testing.T, attachmentType string) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

taskEngineState := dockerstate.NewTaskEngineState()

expiresAt := time.Now().Add(time.Duration(waitTimeoutMillis) * time.Millisecond)
err := addENIAttachmentToState(attachmentType, attachmentArn, taskArn, randomMAC, expiresAt, taskEngineState)
assert.NoError(t, err)
assert.Len(t, taskEngineState.(*dockerstate.DockerTaskEngineState).AllENIAttachments(), 1)
for {
time.Sleep(time.Millisecond * waitTimeoutMillis)
if len(taskEngineState.(*dockerstate.DockerTaskEngineState).AllENIAttachments()) == 0 {
break
}
}
}

// TestTaskENIAckWithinTimeout tests the eni state change was reported before the timeout, for a task eni
func TestTaskENIAckWithinTimeout(t *testing.T) {
testENIAckWithinTimeout(t, apieni.ENIAttachmentTypeTaskENI)
}

// TestInstanceENIAckWithinTimeout tests the eni state change was reported before the timeout, for an instance eni
func TestInstanceENIAckWithinTimeout(t *testing.T) {
testENIAckWithinTimeout(t, apieni.ENIAttachmentTypeInstanceENI)
}

func testENIAckWithinTimeout(t *testing.T, attachmentType string) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

taskEngineState := dockerstate.NewTaskEngineState()
expiresAt := time.Now().Add(time.Duration(waitTimeoutMillis) * time.Millisecond)
err := addENIAttachmentToState(attachmentType, attachmentArn, taskArn, randomMAC, expiresAt, taskEngineState)
assert.NoError(t, err)
assert.Len(t, taskEngineState.(*dockerstate.DockerTaskEngineState).AllENIAttachments(), 1)
eniAttachment, ok := taskEngineState.(*dockerstate.DockerTaskEngineState).ENIByMac(randomMAC)
assert.True(t, ok)
eniAttachment.SetSentStatus()

time.Sleep(time.Millisecond * waitTimeoutMillis)

assert.Len(t, taskEngineState.(*dockerstate.DockerTaskEngineState).AllENIAttachments(), 1)
}

// TestHandleENIAttachmentTaskENI tests handling a new task eni
func TestHandleENIAttachmentTaskENI(t *testing.T) {
testHandleENIAttachment(t, apieni.ENIAttachmentTypeTaskENI, taskArn)
}

// TestHandleENIAttachmentInstanceENI tests handling a new instance eni
func TestHandleENIAttachmentInstanceENI(t *testing.T) {
testHandleENIAttachment(t, apieni.ENIAttachmentTypeInstanceENI, "")
}

func testHandleENIAttachment(t *testing.T, attachmentType, taskArn string) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

taskEngineState := dockerstate.NewTaskEngineState()
expiresAt := time.Now().Add(time.Duration(waitTimeoutMillis) * time.Millisecond)
stateManager := statemanager.NewNoopStateManager()
err := handleENIAttachment(attachmentType, attachmentArn, taskArn, randomMAC, expiresAt, taskEngineState, stateManager)
assert.NoError(t, err)
assert.Len(t, taskEngineState.(*dockerstate.DockerTaskEngineState).AllENIAttachments(), 1)
eniAttachment, ok := taskEngineState.(*dockerstate.DockerTaskEngineState).ENIByMac(randomMAC)
assert.True(t, ok)
eniAttachment.SetSentStatus()

time.Sleep(time.Millisecond * waitTimeoutMillis)

assert.Len(t, taskEngineState.(*dockerstate.DockerTaskEngineState).AllENIAttachments(), 1)
}
Loading

0 comments on commit e0b6a1b

Please sign in to comment.