Skip to content

Commit

Permalink
periodically disconnect from acs
Browse files Browse the repository at this point in the history
  • Loading branch information
singholt committed Feb 22, 2023
1 parent 7d2b6da commit 6d54c7c
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 22 deletions.
48 changes: 38 additions & 10 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"
"github.com/aws/amazon-ecs-agent/agent/version"
"github.com/aws/amazon-ecs-agent/agent/wsclient"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/cihub/seelog"
)
Expand All @@ -54,6 +55,10 @@ const (

inactiveInstanceReconnectDelay = 1 * time.Hour

// connectionTime is the maximum time after which agent closes its connection to ACS
connectionTime = 15 * time.Minute
connectionJitter = 30 * time.Minute

connectionBackoffMin = 250 * time.Millisecond
connectionBackoffMax = 2 * time.Minute
connectionBackoffJitter = 0.2
Expand Down Expand Up @@ -100,14 +105,16 @@ type session struct {
doctor *doctor.Doctor
_heartbeatTimeout time.Duration
_heartbeatJitter time.Duration
connectionTime time.Duration
connectionJitter time.Duration
_inactiveInstanceReconnectDelay time.Duration
}

// sessionResources defines the resource creator interface for starting
// a session with ACS. This interface is intended to define methods
// that create resources used to establish the connection to ACS
// It is confined to just the createACSClient() method for now. It can be
// extended to include the acsWsURL() and newDisconnectionTimer() methods
// extended to include the acsWsURL() and newHeartbeatTimer() methods
// when needed
// The goal is to make it easier to test and inject dependencies
type sessionResources interface {
Expand Down Expand Up @@ -182,6 +189,8 @@ func NewSession(
doctor: doctor,
_heartbeatTimeout: heartbeatTimeout,
_heartbeatJitter: heartbeatJitter,
connectionTime: connectionTime,
connectionJitter: connectionJitter,
_inactiveInstanceReconnectDelay: inactiveInstanceReconnectDelay,
}
}
Expand Down Expand Up @@ -224,7 +233,7 @@ func (acsSession *session) Start() error {
}
}
if shouldReconnectWithoutBackoff(acsError) {
// If ACS closed the connection, there's no need to backoff,
// If ACS or agent closed the connection, there's no need to backoff,
// reconnect immediately
seelog.Infof("ACS Websocket connection closed for a valid reason: %v", acsError)
acsSession.backoff.Reset()
Expand Down Expand Up @@ -360,11 +369,15 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
}

seelog.Info("Connected to ACS endpoint")
// Start inactivity timer for closing the connection
timer := newDisconnectionTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
// Any message from the server resets the disconnect timeout
client.SetAnyRequestHandler(anyMessageHandler(timer, client))
defer timer.Stop()
// Start a connection timer; agent will close its ACS websocket connection after this timer expires
connectionTimer := newConnectionTimer(client, acsSession.connectionTime, acsSession.connectionJitter)
defer connectionTimer.Stop()

// Start a heartbeat timer for closing the connection
heartbeatTimer := newHeartbeatTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
// Any message from the server resets the heartbeat timer
client.SetAnyRequestHandler(anyMessageHandler(heartbeatTimer, client))
defer heartbeatTimer.Stop()

acsSession.resources.connectedToACS()

Expand Down Expand Up @@ -393,7 +406,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
case err := <-serveErr:
// Stop receiving and sending messages from and to ACS when
// client.Serve returns an error. This can happen when the
// the connection is closed by ACS or the agent
// connection is closed by ACS or the agent
if err == nil || err == io.EOF {
seelog.Info("ACS Websocket connection closed for a valid reason")
} else {
Expand Down Expand Up @@ -478,9 +491,9 @@ func acsWsURL(endpoint, cluster, containerInstanceArn string, taskEngine engine.
return acsURL + "?" + query.Encode()
}

// newDisconnectionTimer creates a new time object, with a callback to
// newHeartbeatTimer creates a new time object, with a callback to
// disconnect from ACS on inactivity
func newDisconnectionTimer(client wsclient.ClientServer, timeout time.Duration, jitter time.Duration) ttime.Timer {
func newHeartbeatTimer(client wsclient.ClientServer, timeout time.Duration, jitter time.Duration) ttime.Timer {
timer := time.AfterFunc(retry.AddJitter(timeout, jitter), func() {
seelog.Warn("ACS Connection hasn't had any activity for too long; closing connection")
if err := client.Close(); err != nil {
Expand All @@ -492,6 +505,21 @@ func newDisconnectionTimer(client wsclient.ClientServer, timeout time.Duration,
return timer
}

// newConnectionTimer creates a new timer, after which agent closes its ACS websocket connection
func newConnectionTimer(client wsclient.ClientServer, connectionTime time.Duration, connectionJitter time.Duration) ttime.Timer {
expiresAt := retry.AddJitter(connectionTime, connectionJitter)
timer := time.AfterFunc(expiresAt, func() {
seelog.Infof("Closing ACS websocket connection after %v minutes", expiresAt.Minutes())
// WriteCloseMessage() writes a close message using websocket control messages
// Ref: https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages
err := client.WriteCloseMessage()
if err != nil {
seelog.Warnf("Error writing close message: %v", err)
}
})
return timer
}

// anyMessageHandler handles any server message. Any server message means the
// connection is active and thus the heartbeat disconnect should not occur
func anyMessageHandler(timer ttime.Timer, client wsclient.ClientServer) func(interface{}) {
Expand Down
Loading

0 comments on commit 6d54c7c

Please sign in to comment.