Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.59.0 stage #3124

Merged
merged 4 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.59.0
* Feature - prevent instances in EC2 Autoscaling warm pool from being registered with cluster [#3123](https://github.com/aws/amazon-ecs-agent/pull/3123)
* Enhancement - DiscoverPollEndpoint: lengthen cache ttl and improve resiliency [#3109](https://github.com/aws/amazon-ecs-agent/pull/3109)

## 1.58.0
* Enhancement - Update agent build go version to 1.17.5 [#3105](https://github.com/aws/amazon-ecs-agent/pull/3105)
* Enhancement - bumped pause container gcc build version [#3108](https://github.com/aws/amazon-ecs-agent/pull/3108)
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ additional details on each available environment variable.
| `ECS_FSX_WINDOWS_FILE_SERVER_SUPPORTED` | `true` | Whether FSx for Windows File Server volume type is supported on the container instance. This variable is only supported on agent versions 1.47.0 and later. | `false` | `true` |
| `ECS_ENABLE_RUNTIME_STATS` | `true` | Determines if [pprof](https://pkg.go.dev/net/http/pprof) is enabled for the agent. If enabled, the different profiles can be accessed through the agent's introspection port (e.g. `curl http://localhost:51678/debug/pprof/heap > heap.pprof`). In addition, agent's [runtime stats](https://pkg.go.dev/runtime#ReadMemStats) are logged to `/var/log/ecs/runtime-stats.log` file. | `false` | `false` |
| `ECS_EXCLUDE_IPV6_PORTBINDING` | `true` | Determines if agent should exclude IPv6 port binding using default network mode. If enabled, IPv6 port binding will be filtered out, and the response of DescribeTasks API call will not show tasks' IPv6 port bindings, but it is still included in Task metadata endpoint. | `true` | `true` |

| `ECS_WARM_POOLS_CHECK` | `true` | Whether to ensure instances going into an [EC2 Auto Scaling group warm pool](https://docs.aws.amazon.com/autoscaling/ec2/userguide/ec2-auto-scaling-warm-pools.html) are prevented from being registered with the cluster. Set to true only if using EC2 Autoscaling | `false` | `false` |

### Persistence

When you run the Amazon ECS Container Agent in production, its `datadir` should be persisted between runs of the Docker
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.58.0
1.59.0
29 changes: 19 additions & 10 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ const (
ecsMaxImageDigestLength = 255
ecsMaxReasonLength = 255
ecsMaxRuntimeIDLength = 255
pollEndpointCacheSize = 1
pollEndpointCacheTTL = 20 * time.Minute
pollEndpointCacheTTL = 12 * time.Hour
roundtripTimeout = 5 * time.Second
azAttrName = "ecs.availability-zone"
cpuArchAttrName = "ecs.cpu-architecture"
Expand All @@ -56,7 +55,7 @@ type APIECSClient struct {
standardClient api.ECSSDK
submitStateChangeClient api.ECSSubmitStateSDK
ec2metadata ec2.EC2MetadataClient
pollEndpoinCache async.Cache
pollEndpointCache async.TTLCache
}

// NewECSClient creates a new ECSClient interface object
Expand All @@ -74,14 +73,13 @@ func NewECSClient(
}
standardClient := ecs.New(session.New(&ecsConfig))
submitStateChangeClient := newSubmitStateChangeClient(&ecsConfig)
pollEndpoinCache := async.NewLRUCache(pollEndpointCacheSize, pollEndpointCacheTTL)
return &APIECSClient{
credentialProvider: credentialProvider,
config: config,
standardClient: standardClient,
submitStateChangeClient: submitStateChangeClient,
ec2metadata: ec2MetadataClient,
pollEndpoinCache: pollEndpoinCache,
pollEndpointCache: async.NewTTLCache(pollEndpointCacheTTL),
}
}

Expand Down Expand Up @@ -585,26 +583,37 @@ func (client *APIECSClient) DiscoverTelemetryEndpoint(containerInstanceArn strin

func (client *APIECSClient) discoverPollEndpoint(containerInstanceArn string) (*ecs.DiscoverPollEndpointOutput, error) {
// Try getting an entry from the cache
cachedEndpoint, found := client.pollEndpoinCache.Get(containerInstanceArn)
if found {
// Cache hit. Return the output.
cachedEndpoint, expired, found := client.pollEndpointCache.Get(containerInstanceArn)
if !expired && found {
// Cache hit and not expired. Return the output.
if output, ok := cachedEndpoint.(*ecs.DiscoverPollEndpointOutput); ok {
seelog.Infof("Using cached DiscoverPollEndpoint. endpoint=%s telemetryEndpoint=%s containerInstanceARN=%s",
aws.StringValue(output.Endpoint), aws.StringValue(output.TelemetryEndpoint), containerInstanceArn)
return output, nil
}
}

// Cache miss, invoke the ECS DiscoverPollEndpoint API.
// Cache miss or expired, invoke the ECS DiscoverPollEndpoint API.
seelog.Debugf("Invoking DiscoverPollEndpoint for '%s'", containerInstanceArn)
output, err := client.standardClient.DiscoverPollEndpoint(&ecs.DiscoverPollEndpointInput{
ContainerInstance: &containerInstanceArn,
Cluster: &client.config.Cluster,
})
if err != nil {
// if we got an error calling the API, fallback to an expired cached endpoint if
// we have it.
if expired {
if output, ok := cachedEndpoint.(*ecs.DiscoverPollEndpointOutput); ok {
seelog.Infof("Error calling DiscoverPollEndpoint. Using cached but expired endpoint as a fallback. error=%s endpoint=%s telemetryEndpoint=%s containerInstanceARN=%s",
err, aws.StringValue(output.Endpoint), aws.StringValue(output.TelemetryEndpoint), containerInstanceArn)
return output, nil
}
}
return nil, err
}

// Cache the response from ECS.
client.pollEndpoinCache.Set(containerInstanceArn, output)
client.pollEndpointCache.Set(containerInstanceArn, output)
return output, nil
}

Expand Down
67 changes: 51 additions & 16 deletions agent/api/ecsclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,23 +838,23 @@ func TestDiscoverPollEndpointCacheHit(t *testing.T) {
defer mockCtrl.Finish()

mockSDK := mock_api.NewMockECSSDK(mockCtrl)
pollEndpoinCache := mock_async.NewMockCache(mockCtrl)
pollEndpointCache := mock_async.NewMockTTLCache(mockCtrl)
client := &APIECSClient{
credentialProvider: credentials.AnonymousCredentials,
config: &config.Config{
Cluster: configuredCluster,
AWSRegion: "us-east-1",
},
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpoinCache: pollEndpoinCache,
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpointCache: pollEndpointCache,
}

pollEndpoint := "http://127.0.0.1"
pollEndpoinCache.EXPECT().Get("containerInstance").Return(
pollEndpointCache.EXPECT().Get("containerInstance").Return(
&ecs.DiscoverPollEndpointOutput{
Endpoint: aws.String(pollEndpoint),
}, true)
}, false, true)
output, err := client.discoverPollEndpoint("containerInstance")
if err != nil {
t.Fatalf("Error in discoverPollEndpoint: %v", err)
Expand All @@ -869,26 +869,61 @@ func TestDiscoverPollEndpointCacheMiss(t *testing.T) {
defer mockCtrl.Finish()

mockSDK := mock_api.NewMockECSSDK(mockCtrl)
pollEndpoinCache := mock_async.NewMockCache(mockCtrl)
pollEndpointCache := mock_async.NewMockTTLCache(mockCtrl)
client := &APIECSClient{
credentialProvider: credentials.AnonymousCredentials,
config: &config.Config{
Cluster: configuredCluster,
AWSRegion: "us-east-1",
},
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpoinCache: pollEndpoinCache,
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpointCache: pollEndpointCache,
}
pollEndpoint := "http://127.0.0.1"
pollEndpointOutput := &ecs.DiscoverPollEndpointOutput{
Endpoint: &pollEndpoint,
}

gomock.InOrder(
pollEndpoinCache.EXPECT().Get("containerInstance").Return(nil, false),
pollEndpointCache.EXPECT().Get("containerInstance").Return(nil, false, false),
mockSDK.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(pollEndpointOutput, nil),
pollEndpoinCache.EXPECT().Set("containerInstance", pollEndpointOutput),
pollEndpointCache.EXPECT().Set("containerInstance", pollEndpointOutput),
)

output, err := client.discoverPollEndpoint("containerInstance")
if err != nil {
t.Fatalf("Error in discoverPollEndpoint: %v", err)
}
if aws.StringValue(output.Endpoint) != pollEndpoint {
t.Errorf("Mismatch in poll endpoint: %s != %s", aws.StringValue(output.Endpoint), pollEndpoint)
}
}

func TestDiscoverPollEndpointExpiredButDPEFailed(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

mockSDK := mock_api.NewMockECSSDK(mockCtrl)
pollEndpointCache := mock_async.NewMockTTLCache(mockCtrl)
client := &APIECSClient{
credentialProvider: credentials.AnonymousCredentials,
config: &config.Config{
Cluster: configuredCluster,
AWSRegion: "us-east-1",
},
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpointCache: pollEndpointCache,
}
pollEndpoint := "http://127.0.0.1"
pollEndpointOutput := &ecs.DiscoverPollEndpointOutput{
Endpoint: &pollEndpoint,
}

gomock.InOrder(
pollEndpointCache.EXPECT().Get("containerInstance").Return(pollEndpointOutput, true, false),
mockSDK.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(nil, fmt.Errorf("error!")),
)

output, err := client.discoverPollEndpoint("containerInstance")
Expand All @@ -905,16 +940,16 @@ func TestDiscoverTelemetryEndpointAfterPollEndpointCacheHit(t *testing.T) {
defer mockCtrl.Finish()

mockSDK := mock_api.NewMockECSSDK(mockCtrl)
pollEndpoinCache := async.NewLRUCache(1, 10*time.Minute)
pollEndpointCache := async.NewTTLCache(10 * time.Minute)
client := &APIECSClient{
credentialProvider: credentials.AnonymousCredentials,
config: &config.Config{
Cluster: configuredCluster,
AWSRegion: "us-east-1",
},
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpoinCache: pollEndpoinCache,
standardClient: mockSDK,
ec2metadata: ec2.NewBlackholeEC2MetadataClient(),
pollEndpointCache: pollEndpointCache,
}

pollEndpoint := "http://127.0.0.1"
Expand Down
88 changes: 86 additions & 2 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ const (
instanceIdBackoffJitter = 0.2
instanceIdBackoffMultiple = 1.3
instanceIdMaxRetryCount = 3

targetLifecycleBackoffMin = time.Second
targetLifecycleBackoffMax = time.Second * 5
targetLifecycleBackoffJitter = 0.2
targetLifecycleBackoffMultiple = 1.3
targetLifecycleMaxRetryCount = 3
inServiceState = "InService"
asgLifecyclePollWait = time.Minute
asgLifecyclePollMax = 120 // given each poll cycle waits for about a minute, this gives 2-3 hours before timing out
)

var (
Expand Down Expand Up @@ -291,6 +300,19 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
seelog.Criticalf("Unable to initialize new task engine: %v", err)
return exitcodes.ExitTerminal
}

// Start termination handler in goroutine
go agent.terminationHandler(state, agent.dataClient, taskEngine, agent.cancel)

// If part of ASG, wait until instance is being set up to go in service before registering with cluster
if agent.cfg.WarmPoolsSupport.Enabled() {
err := agent.waitUntilInstanceInService(asgLifecyclePollWait, asgLifecyclePollMax, targetLifecycleMaxRetryCount)
if err != nil && err.Error() != blackholed {
seelog.Criticalf("Could not determine target lifecycle of instance: %v", err)
return exitcodes.ExitTerminal
}
}

agent.initMetricsEngine()

loadPauseErr := agent.loadPauseContainer()
Expand Down Expand Up @@ -387,6 +409,70 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
deregisterInstanceEventStream, client, state, taskHandler, doctor)
}

// waitUntilInstanceInService Polls IMDS until the target lifecycle state indicates that the instance is going in
// service. This is to avoid instances going to a warm pool being registered as container instances with the cluster
func (agent *ecsAgent) waitUntilInstanceInService(pollWaitDuration time.Duration, pollMaxTimes int, maxRetries int) error {
seelog.Info("Waiting for instance to go InService")
var err error
var targetState string
// Poll until a target lifecycle state is obtained from IMDS, or an unexpected error occurs
targetState, err = agent.pollUntilTargetLifecyclePresent(pollWaitDuration, pollMaxTimes, maxRetries)
if err != nil {
return err
}
// Poll while the instance is in a warmed state until it is going to go into service
for targetState != inServiceState {
time.Sleep(pollWaitDuration)
targetState, err = agent.getTargetLifecycle(maxRetries)
if err != nil {
// Do not exit if error is due to throttling or temporary server errors
// These are likely transient, as at this point IMDS has been successfully queried for state
switch utils.GetRequestFailureStatusCode(err) {
case 429, 500, 502, 503, 504:
seelog.Warnf("Encountered error while waiting for warmed instance to go in service: %v", err)
default:
return err
}
}
}
return err
}

// pollUntilTargetLifecyclePresent polls until obtains a target state or receives an unexpected error
func (agent *ecsAgent) pollUntilTargetLifecyclePresent(pollWaitDuration time.Duration, pollMaxTimes int, maxRetries int) (string, error) {
var err error
var targetState string
for i := 0; i < pollMaxTimes; i++ {
targetState, err = agent.getTargetLifecycle(maxRetries)
if targetState != "" ||
(err != nil && utils.GetRequestFailureStatusCode(err) != 404) {
break
}
time.Sleep(pollWaitDuration)
}
return targetState, err
}

// getTargetLifecycle obtains the target lifecycle state for the instance from IMDS. This is populated for instances
// associated with an ASG
func (agent *ecsAgent) getTargetLifecycle(maxRetries int) (string, error) {
var targetState string
var err error
backoff := retry.NewExponentialBackoff(targetLifecycleBackoffMin, targetLifecycleBackoffMax, targetLifecycleBackoffJitter, targetLifecycleBackoffMultiple)
for i := 0; i < maxRetries; i++ {
targetState, err = agent.ec2MetadataClient.TargetLifecycleState()
if err == nil {
break
}
seelog.Debugf("Error when getting intended lifecycle state: %v", err)
if i < maxRetries {
time.Sleep(backoff.Duration())
}
}
seelog.Debugf("Target lifecycle state of instance: %v", targetState)
return targetState, err
}

// newTaskEngine creates a new docker task engine object. It tries to load the
// local state if needed, else initializes a new one
func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.EventStream,
Expand Down Expand Up @@ -687,8 +773,6 @@ func (agent *ecsAgent) startAsyncRoutines(
go agent.startSpotInstanceDrainingPoller(agent.ctx, client)
}

go agent.terminationHandler(state, agent.dataClient, taskEngine, agent.cancel)

// Agent introspection api
go handlers.ServeIntrospectionHTTPEndpoint(agent.ctx, &agent.containerInstanceARN, taskEngine, agent.cfg)

Expand Down
Loading