Skip to content

Commit

Permalink
send pending acks before closing ACS connection
Browse files Browse the repository at this point in the history
  • Loading branch information
singholt committed Feb 23, 2023
1 parent 6d54c7c commit 5ca6dc2
Showing 1 changed file with 50 additions and 3 deletions.
53 changes: 50 additions & 3 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

acsclient "github.com/aws/amazon-ecs-agent/agent/acs/client"
Expand Down Expand Up @@ -75,6 +76,10 @@ const (
// 1: default protocol version
// 2: ACS will proactively close the connection when heartbeat acks are missing
acsProtocolVersion = 2
// numOfHandlersSendingAcks is the number of handlers that send acks back to ACS and that are not saved across
// sessions. We use this to send pending acks, before agent initiates a disconnect to ACS.
// they are: refreshCredentialsHandler, taskManifestHandler, payloadHandler and heartbeatHandler
numOfHandlersSendingAcks = 4
)

// Session defines an interface for handler's long-lived connection with ACS.
Expand Down Expand Up @@ -369,8 +374,10 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
}

seelog.Info("Connected to ACS endpoint")
// Start a connection timer; agent will close its ACS websocket connection after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter)
// Start a connection timer; agent will send pending acks and close its ACS websocket connection
// after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter,
&refreshCredsHandler, &taskManifestHandler, &payloadHandler, &heartbeatHandler)
defer connectionTimer.Stop()

// Start a heartbeat timer for closing the connection
Expand Down Expand Up @@ -506,9 +513,49 @@ func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitt
}

// newConnectionTimer creates a new timer, after which agent closes its ACS websocket connection
func newConnectionTimer(client wsclient.ClientServer, connectionTime time.Duration, connectionJitter time.Duration) ttime.Timer {
func newConnectionTimer(
client wsclient.ClientServer,
connectionTime time.Duration,
connectionJitter time.Duration,
refreshCredsHandler *refreshCredentialsHandler,
taskManifestHandler *taskManifestHandler,
payloadHandler *payloadRequestHandler,
heartbeatHandler *heartbeatHandler,
) ttime.Timer {
expiresAt := retry.AddJitter(connectionTime, connectionJitter)
timer := time.AfterFunc(expiresAt, func() {
seelog.Debugf("Sending pending acks to ACS before closing the connection")
var wg sync.WaitGroup
wg.Add(numOfHandlersSendingAcks)

// send pending creds refresh acks
go func() {
refreshCredsHandler.sendAcks()
wg.Done()
}()

// send pending task manifest acks and task stop verification acks
go func() {
taskManifestHandler.sendTaskManifestMessageAck()
taskManifestHandler.handleTaskStopVerificationAck()
wg.Done()
}()

// send pending payload acks
go func() {
payloadHandler.sendAcks()
wg.Done()
}()

// send pending heartbeat acks
go func() {
heartbeatHandler.sendHeartbeatAck()
wg.Done()
}()

// wait for acks from all handlers above to be sent before closing the websocket connection
wg.Wait()

seelog.Infof("Closing ACS websocket connection after %v minutes", expiresAt.Minutes())
// WriteCloseMessage() writes a close message using websocket control messages
// Ref: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages
Expand Down

0 comments on commit 5ca6dc2

Please sign in to comment.