Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full task sync #2191

Merged
merged 1 commit into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agent/acs/client/acs_client_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func init() {
ecsacs.ErrorMessage{},
ecsacs.AttachTaskNetworkInterfacesMessage{},
ecsacs.AttachInstanceNetworkInterfacesMessage{},
ecsacs.TaskManifestMessage{},
ecsacs.TaskStopVerificationAck{},
ecsacs.TaskStopVerificationMessage{},
}
}

Expand Down
17 changes: 15 additions & 2 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type session struct {
cancel context.CancelFunc
backoff retry.Backoff
resources sessionResources
latestSeqNumTaskManifest *int64
_heartbeatTimeout time.Duration
_heartbeatJitter time.Duration
_inactiveInstanceReconnectDelay time.Duration
Expand Down Expand Up @@ -143,7 +144,7 @@ func NewSession(ctx context.Context,
stateManager statemanager.StateManager,
taskEngine engine.TaskEngine,
credentialsManager rolecredentials.Manager,
taskHandler *eventhandler.TaskHandler) Session {
taskHandler *eventhandler.TaskHandler, latestSeqNumTaskManifest *int64) Session {
resources := newSessionResources(credentialsProvider)
backoff := retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax,
connectionBackoffJitter, connectionBackoffMultiplier)
Expand All @@ -164,6 +165,7 @@ func NewSession(ctx context.Context,
cancel: cancel,
backoff: backoff,
resources: resources,
latestSeqNumTaskManifest: latestSeqNumTaskManifest,
_heartbeatTimeout: heartbeatTimeout,
_heartbeatJitter: heartbeatJitter,
_inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay,
Expand Down Expand Up @@ -294,6 +296,17 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {

client.AddRequestHandler(instanceENIAttachHandler.handlerFunc())

// Add TaskManifestHandler
taskManifestHandler := newTaskManifestHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN,
client, acsSession.stateManager, acsSession.taskEngine, acsSession.latestSeqNumTaskManifest)

defer taskManifestHandler.clearAcks()
taskManifestHandler.start()
defer taskManifestHandler.stop()

client.AddRequestHandler(taskManifestHandler.handlerFuncTaskManifestMessage())
client.AddRequestHandler(taskManifestHandler.handlerFuncTaskStopVerificationMessage())

// Add request handler for handling payload messages from ACS
payloadHandler := newPayloadRequestHandler(
acsSession.ctx,
Expand All @@ -305,7 +318,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
acsSession.stateManager,
refreshCredsHandler,
acsSession.credentialsManager,
acsSession.taskHandler)
acsSession.taskHandler, acsSession.latestSeqNumTaskManifest)
// Clear the acks channel on return because acks of messageids don't have any value across sessions
defer payloadHandler.clearAcks()
payloadHandler.start()
Expand Down
30 changes: 17 additions & 13 deletions agent/acs/handler/acs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/agent/wsclient"
mock_wsclient "github.com/aws/amazon-ecs-agent/agent/wsclient/mock"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
Expand Down Expand Up @@ -365,6 +366,7 @@ func TestHandlerReconnectsWithoutBackoffOnEOFError(t *testing.T) {
ctx: ctx,
cancel: cancel,
resources: &mockSessionResources{mockWsClient},
latestSeqNumTaskManifest: aws.Int64(10),
_heartbeatTimeout: 20 * time.Millisecond,
_heartbeatJitter: 10 * time.Millisecond,
_inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay,
Expand Down Expand Up @@ -833,18 +835,19 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
go func() {

acsSession := session{
containerInstanceARN: "myArn",
credentialsProvider: testCreds,
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
taskHandler: taskHandler,
ctx: ctx,
_heartbeatTimeout: 1 * time.Second,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
resources: newSessionResources(testCreds),
credentialsManager: rolecredentials.NewManager(),
containerInstanceARN: "myArn",
credentialsProvider: testCreds,
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
taskHandler: taskHandler,
ctx: ctx,
_heartbeatTimeout: 1 * time.Second,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
resources: newSessionResources(testCreds),
credentialsManager: rolecredentials.NewManager(),
latestSeqNumTaskManifest: aws.Int64(12),
}
acsSession.Start()
ended <- true
Expand Down Expand Up @@ -914,6 +917,7 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) {

credentialsManager := mock_credentials.NewMockManager(ctrl)

latestSeqNumberTaskManifest := int64(10)
ended := make(chan bool, 1)
go func() {
acsSession := NewSession(ctx,
Expand All @@ -926,7 +930,7 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) {
stateManager,
taskEngine,
credentialsManager,
taskHandler,
taskHandler, &latestSeqNumberTaskManifest,
)
acsSession.Start()
// StartSession should never return unless the context is canceled
Expand Down
53 changes: 32 additions & 21 deletions agent/acs/handler/payload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ type payloadRequestHandler struct {
saver statemanager.Saver
taskHandler *eventhandler.TaskHandler
// cancel is used to stop go routines started by start() method
cancel context.CancelFunc
cluster string
containerInstanceArn string
acsClient wsclient.ClientServer
refreshHandler refreshCredentialsHandler
credentialsManager credentials.Manager
cancel context.CancelFunc
cluster string
containerInstanceArn string
acsClient wsclient.ClientServer
refreshHandler refreshCredentialsHandler
credentialsManager credentials.Manager
latestSeqNumberTaskManifest *int64
}

// newPayloadRequestHandler returns a new payloadRequestHandler object
Expand All @@ -62,23 +63,24 @@ func newPayloadRequestHandler(
saver statemanager.Saver,
refreshHandler refreshCredentialsHandler,
credentialsManager credentials.Manager,
taskHandler *eventhandler.TaskHandler) payloadRequestHandler {
taskHandler *eventhandler.TaskHandler, seqNumTaskManifest *int64) payloadRequestHandler {
// Create a cancelable context from the parent context
derivedContext, cancel := context.WithCancel(ctx)
return payloadRequestHandler{
messageBuffer: make(chan *ecsacs.PayloadMessage, payloadMessageBufferSize),
ackRequest: make(chan string, payloadMessageBufferSize),
taskEngine: taskEngine,
ecsClient: ecsClient,
saver: saver,
taskHandler: taskHandler,
ctx: derivedContext,
cancel: cancel,
cluster: cluster,
containerInstanceArn: containerInstanceArn,
acsClient: acsClient,
refreshHandler: refreshHandler,
credentialsManager: credentialsManager,
messageBuffer: make(chan *ecsacs.PayloadMessage, payloadMessageBufferSize),
ackRequest: make(chan string, payloadMessageBufferSize),
taskEngine: taskEngine,
ecsClient: ecsClient,
saver: saver,
taskHandler: taskHandler,
ctx: derivedContext,
cancel: cancel,
cluster: cluster,
containerInstanceArn: containerInstanceArn,
acsClient: acsClient,
refreshHandler: refreshHandler,
credentialsManager: credentialsManager,
latestSeqNumberTaskManifest: seqNumTaskManifest,
}
}

Expand Down Expand Up @@ -151,10 +153,19 @@ func (payloadHandler *payloadRequestHandler) handleSingleMessage(payload *ecsacs
}
seelog.Debugf("Received payload message, message id: %s", aws.StringValue(payload.MessageId))
credentialsAcks, allTasksHandled := payloadHandler.addPayloadTasks(payload)

// Update latestSeqNumberTaskManifest for it to get updated in state file
if payloadHandler.latestSeqNumberTaskManifest != nil && payload.SeqNum != nil &&
*payloadHandler.latestSeqNumberTaskManifest < *payload.SeqNum {

*payloadHandler.latestSeqNumberTaskManifest = *payload.SeqNum
}

// save the state of tasks we know about after passing them to the task engine
err := payloadHandler.saver.Save()
if err != nil {
seelog.Errorf("Error saving state for payload message! err: %v, messageId: %s", err, aws.StringValue(payload.MessageId))
seelog.Errorf("Error saving state for payload message! err: %v, messageId: %s", err,
aws.StringValue(payload.MessageId))
// Don't ack; maybe we can save it in the future.
return fmt.Errorf("error saving state for payload message, with messageId: %s", aws.StringValue(payload.MessageId))
}
Expand Down
3 changes: 2 additions & 1 deletion agent/acs/handler/payload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func setup(t *testing.T) *testHelper {
credentialsManager := credentials.NewManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
latestSeqNumberTaskManifest := int64(10)

handler := newPayloadRequestHandler(
ctx,
Expand All @@ -80,7 +81,7 @@ func setup(t *testing.T) *testHelper {
stateManager,
refreshCredentialsHandler{},
credentialsManager,
taskHandler)
taskHandler, &latestSeqNumberTaskManifest)

return &testHelper{
ctrl: ctrl,
Expand Down
Loading