Skip to content

Commit

Permalink
Rebase with the dev branch in upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
chienhanlin committed Feb 25, 2023
2 parents 5b7cbda + 0fe21da commit 69495e9
Show file tree
Hide file tree
Showing 20 changed files with 496 additions and 37 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/static.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ jobs:
echo "invalid GO version"
exit 1
fi
echo "::set-output name=GO_VERSION::$go_version"
- uses: actions/setup-go@v2
echo "GO_VERSION=$go_version" >> $GITHUB_OUTPUT
- uses: actions/setup-go@v3
with:
go-version: ${{ steps.get-go-version.outputs.GO_VERSION }}
- uses: actions/checkout@v2
Expand Down Expand Up @@ -60,8 +60,8 @@ jobs:
echo "invalid GO version"
exit 1
fi
echo "::set-output name=GO_VERSION::$go_version"
- uses: actions/setup-go@v2
echo "GO_VERSION=$go_version" >> $GITHUB_OUTPUT
- uses: actions/setup-go@v3
with:
go-version: ${{ steps.get-go-version.outputs.GO_VERSION }}
- uses: actions/checkout@v2
Expand Down Expand Up @@ -96,8 +96,8 @@ jobs:
echo "invalid GO version"
exit 1
fi
echo "::set-output name=GO_VERSION::$go_version"
- uses: actions/setup-go@v2
echo "GO_VERSION=$go_version" >> $GITHUB_OUTPUT
- uses: actions/setup-go@v3
with:
go-version: ${{ steps.get-go-version.outputs.GO_VERSION }}
- uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ jobs:
echo "invalid GO version"
exit 1
}
Write-Output "::set-output name=GO_VERSION_WINDOWS::$go_version_win"
- uses: actions/setup-go@v2
Write-Output "GO_VERSION_WINDOWS=$go_version_win" >> $GITHUB_OUTPUT
- uses: actions/setup-go@v3
with:
go-version: ${{ steps.get-go-version.outputs.GO_VERSION_WINDOWS }}
- uses: actions/checkout@v2
Expand Down
98 changes: 88 additions & 10 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

acsclient "github.com/aws/amazon-ecs-agent/agent/acs/client"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/agent/wsclient"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/cihub/seelog"
)
Expand All @@ -54,6 +56,10 @@ const (

inactiveInstanceReconnectDelay = 1 * time.Hour

// connectionTime is the maximum time after which agent closes its connection to ACS
connectionTime = 15 * time.Minute
connectionJitter = 30 * time.Minute

connectionBackoffMin = 250 * time.Millisecond
connectionBackoffMax = 2 * time.Minute
connectionBackoffJitter = 0.2
Expand All @@ -70,6 +76,10 @@ const (
// 1: default protocol version
// 2: ACS will proactively close the connection when heartbeat acks are missing
acsProtocolVersion = 2
// numOfHandlersSendingAcks is the number of handlers that send acks back to ACS and that are not saved across
// sessions. We use this to send pending acks, before agent initiates a disconnect to ACS.
// they are: refreshCredentialsHandler, taskManifestHandler, payloadHandler and heartbeatHandler
numOfHandlersSendingAcks = 4
)

// Session defines an interface for handler's long-lived connection with ACS.
Expand Down Expand Up @@ -100,14 +110,16 @@ type session struct {
doctor *doctor.Doctor
_heartbeatTimeout time.Duration
_heartbeatJitter time.Duration
connectionTime time.Duration
connectionJitter time.Duration
_inactiveInstanceReconnectDelay time.Duration
}

// sessionResources defines the resource creator interface for starting
// a session with ACS. This interface is intended to define methods
// that create resources used to establish the connection to ACS
// It is confined to just the createACSClient() method for now. It can be
// extended to include the acsWsURL() and newDisconnectionTimer() methods
// extended to include the acsWsURL() and newHeartbeatTimer() methods
// when needed
// The goal is to make it easier to test and inject dependencies
type sessionResources interface {
Expand Down Expand Up @@ -182,6 +194,8 @@ func NewSession(
doctor: doctor,
_heartbeatTimeout: heartbeatTimeout,
_heartbeatJitter: heartbeatJitter,
connectionTime: connectionTime,
connectionJitter: connectionJitter,
_inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay,
}
}
Expand Down Expand Up @@ -224,7 +238,7 @@ func (acsSession *session) Start() error {
}
}
if shouldReconnectWithoutBackoff(acsError) {
// If ACS closed the connection, there's no need to backoff,
// If ACS or agent closed the connection, there's no need to backoff,
// reconnect immediately
seelog.Infof("ACS Websocket connection closed for a valid reason: %v", acsError)
acsSession.backoff.Reset()
Expand Down Expand Up @@ -360,11 +374,17 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
}

seelog.Info("Connected to ACS endpoint")
// Start inactivity timer for closing the connection
timer := newDisconnectionTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
// Any message from the server resets the disconnect timeout
client.SetAnyRequestHandler(anyMessageHandler(timer, client))
defer timer.Stop()
// Start a connection timer; agent will send pending acks and close its ACS websocket connection
// after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter,
&refreshCredsHandler, &taskManifestHandler, &payloadHandler, &heartbeatHandler)
defer connectionTimer.Stop()

// Start a heartbeat timer for closing the connection
heartbeatTimer := newHeartbeatTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
// Any message from the server resets the heartbeat timer
client.SetAnyRequestHandler(anyMessageHandler(heartbeatTimer, client))
defer heartbeatTimer.Stop()

acsSession.resources.connectedToACS()

Expand Down Expand Up @@ -393,7 +413,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
case err := <-serveErr:
// Stop receiving and sending messages from and to ACS when
// client.Serve returns an error. This can happen when the
// the connection is closed by ACS or the agent
// connection is closed by ACS or the agent
if err == nil || err == io.EOF {
seelog.Info("ACS Websocket connection closed for a valid reason")
} else {
Expand Down Expand Up @@ -478,9 +498,9 @@ func acsWsURL(endpoint, cluster, containerInstanceArn string, taskEngine engine.
return acsURL + "?" + query.Encode()
}

// newDisconnectionTimer creates a new time object, with a callback to
// newHeartbeatTimer creates a new time object, with a callback to
// disconnect from ACS on inactivity
func newDisconnectionTimer(client wsclient.ClientServer, timeout time.Duration, jitter time.Duration) ttime.Timer {
func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitter time.Duration) ttime.Timer {
timer := time.AfterFunc(retry.AddJitter(timeout, jitter), func() {
seelog.Warn("ACS Connection hasn't had any activity for too long; closing connection")
if err := client.Close(); err != nil {
Expand All @@ -492,6 +512,64 @@ func newDisconnectionTimer(client wsclient.ClientServer, timeout time.Duration,
return timer
}

// newConnectionTimer creates a new timer, after which agent sends any pending acks to ACS and closes
// its websocket connection
func newConnectionTimer(
client wsclient.ClientServer,
connectionTime time.Duration,
connectionJitter time.Duration,
refreshCredsHandler *refreshCredentialsHandler,
taskManifestHandler *taskManifestHandler,
payloadHandler *payloadRequestHandler,
heartbeatHandler *heartbeatHandler,
) ttime.Timer {
expiresAt := retry.AddJitter(connectionTime, connectionJitter)
timer := time.AfterFunc(expiresAt, func() {
seelog.Debugf("Sending pending acks to ACS before closing the connection")

wg := sync.WaitGroup{}
wg.Add(numOfHandlersSendingAcks)

// send pending creds refresh acks to ACS
go func() {
refreshCredsHandler.sendPendingAcks()
wg.Done()
}()

// send pending task manifest acks and task stop verification acks to ACS
go func() {
taskManifestHandler.sendPendingTaskManifestMessageAck()
taskManifestHandler.handlePendingTaskStopVerificationAck()
wg.Done()
}()

// send pending payload acks to ACS
go func() {
payloadHandler.sendPendingAcks()
wg.Done()
}()

// send pending heartbeat acks to ACS
go func() {
heartbeatHandler.sendPendingHeartbeatAck()
wg.Done()
}()

// wait for acks from all the handlers above to be sent to ACS before closing the websocket connection.
// the methods used to read pending acks are non-blocking, so it is safe to wait here.
wg.Wait()

seelog.Infof("Closing ACS websocket connection after %v minutes", expiresAt.Minutes())
// WriteCloseMessage() writes a close message using websocket control messages
// Ref: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages
err := client.WriteCloseMessage()
if err != nil {
seelog.Warnf("Error writing close message: %v", err)
}
})
return timer
}

// anyMessageHandler handles any server message. Any server message means the
// connection is active and thus the heartbeat disconnect should not occur
func anyMessageHandler(timer ttime.Timer, client wsclient.ClientServer) func(interface{}) {
Expand Down
Loading

0 comments on commit 69495e9

Please sign in to comment.