diff --git a/agent/engine/docker_container_engine.go b/agent/engine/docker_container_engine.go index e462d7db01d..7b42eab8dc5 100644 --- a/agent/engine/docker_container_engine.go +++ b/agent/engine/docker_container_engine.go @@ -61,9 +61,9 @@ const ( // output will be suppressed in debug mode pullStatusSuppressDelay = 2 * time.Second - // statsInactivityTimeout controls the amount of time we hold open a + // StatsInactivityTimeout controls the amount of time we hold open a // connection to the Docker daemon waiting for stats data - statsInactivityTimeout = 5 * time.Second + StatsInactivityTimeout = 5 * time.Second ) // DockerClient interface to make testing it easier @@ -809,7 +809,7 @@ func (dg *dockerGoClient) Stats(id string, ctx context.Context) (<-chan *docker. Stats: stats, Stream: true, Context: ctx, - InactivityTimeout: statsInactivityTimeout, + InactivityTimeout: StatsInactivityTimeout, } go func() { diff --git a/agent/stats/container.go b/agent/stats/container.go index 6ca6ba6d94f..ca8f8c26951 100644 --- a/agent/stats/container.go +++ b/agent/stats/container.go @@ -47,6 +47,7 @@ func newStatsContainer(dockerID string, client ecsengine.DockerClient, resolver func (container *StatsContainer) StartStatsCollection() { // Create the queue to store utilization data from docker stats container.statsQueue = NewQueue(ContainerStatsBufferLength) + container.statsQueue.Reset() go container.collect() } diff --git a/agent/stats/engine.go b/agent/stats/engine.go index af7d42935a6..d04feb7526c 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -16,6 +16,7 @@ package stats //go:generate go run ../../scripts/generate/mockgen.go github.com/aws/amazon-ecs-agent/agent/stats Engine mock/$GOFILE import ( + "errors" "fmt" "sync" "time" @@ -35,6 +36,7 @@ import ( const ( containerChangeHandler = "DockerStatsEngineDockerEventsHandler" listContainersTimeout = 10 * time.Minute + queueResetThreshold = 2 * ecsengine.StatsInactivityTimeout ) // DockerContainerMetadataResolver implements ContainerMetadataResolver for @@ -67,6 +69,7 @@ type DockerStatsEngine struct { // dockerStatsEngine is a singleton object of DockerStatsEngine. // TODO make dockerStatsEngine not a singleton object var dockerStatsEngine *DockerStatsEngine +var EmptyMetricsError = errors.New("No task metrics to report") // ResolveTask resolves the api task object, given container id. func (resolver *DockerContainerMetadataResolver) ResolveTask(dockerID string) (*api.Task, error) { @@ -234,7 +237,7 @@ func (engine *DockerStatsEngine) GetInstanceMetrics() (*ecstcs.MetricsMetadata, if len(taskMetrics) == 0 { // Not idle. Expect taskMetrics to be there. - return nil, nil, fmt.Errorf("No task metrics to report") + return nil, nil, EmptyMetricsError } // Reset current stats. Retaining older stats results in incorrect utilization stats @@ -398,6 +401,13 @@ func (engine *DockerStatsEngine) getContainerMetricsForTask(taskArn string) ([]* engine.doRemoveContainer(container, taskArn) continue } + + if !container.statsQueue.resetThresholdElapsed(queueResetThreshold) && + !container.statsQueue.enoughDatapointsInBuffer() { + seelog.Debug("Stats not ready for container %s", dockerID) + continue + } + // Container is not terminal. Get CPU stats set. cpuStatsSet, err := container.statsQueue.GetCPUStatsSet() if err != nil { diff --git a/agent/stats/queue.go b/agent/stats/queue.go index d02a571c64d..a6f1ee63205 100644 --- a/agent/stats/queue.go +++ b/agent/stats/queue.go @@ -17,6 +17,7 @@ import ( "fmt" "math" "sync" + "time" "github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs" "github.com/cihub/seelog" @@ -27,11 +28,14 @@ const ( BytesInMiB = 1024 * 1024 ) +const minimumQueueDatapoints = 2 + // Queue abstracts a queue using UsageStats slice. type Queue struct { - buffer []UsageStats - maxSize int - bufferLock sync.RWMutex + buffer []UsageStats + maxSize int + lastResetTime time.Time + bufferLock sync.RWMutex } // NewQueue creates a queue. @@ -46,7 +50,7 @@ func NewQueue(maxSize int) *Queue { func (queue *Queue) Reset() { queue.bufferLock.Lock() defer queue.bufferLock.Unlock() - + queue.lastResetTime = time.Now() queue.buffer = queue.buffer[:0] } @@ -133,6 +137,19 @@ func getMemoryUsagePerc(s *UsageStats) float64 { type getUsageFunc func(*UsageStats) float64 +func (queue *Queue) resetThresholdElapsed(timeout time.Duration) bool { + queue.bufferLock.RLock() + defer queue.bufferLock.RUnlock() + duration := time.Since(queue.lastResetTime) + return duration.Seconds() > timeout.Seconds() +} + +func (queue *Queue) enoughDatapointsInBuffer() bool { + queue.bufferLock.RLock() + defer queue.bufferLock.RUnlock() + return len(queue.buffer) >= minimumQueueDatapoints +} + // getCWStatsSet gets the stats set for either CPU or Memory based on the // function pointer. func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) { diff --git a/agent/stats/queue_test.go b/agent/stats/queue_test.go index af705ab8fe1..6e9d3e3791c 100644 --- a/agent/stats/queue_test.go +++ b/agent/stats/queue_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" ) const ( @@ -309,3 +310,55 @@ func TestCpuStatsSetNotSetToInfinity(t *testing.T) { t.Errorf("Computed cpuStatsSet.SampleCount (%d) != expected value (%d)", sampleCount, 1) } } + +func TestResetThresholdElapsed(t *testing.T) { + // create a queue + queueLength := 3 + queue := NewQueue(queueLength) + + queue.Reset() + + thresholdElapsed := queue.resetThresholdElapsed(2 * time.Millisecond) + assert.False(t, thresholdElapsed, "Queue is NOT expected to elapsed the threshold right after reset") + + time.Sleep(3 * time.Millisecond) + thresholdElapsed = queue.resetThresholdElapsed(2 * time.Millisecond) + + assert.True(t, thresholdElapsed, "Queue is expected to elapse the threshold") + +} +func TestEnoughDatapointsInBuffer(t *testing.T) { + // timestamps will be used to simulate +Inf CPU Usage + // timestamps[0] = timestamps[1] + timestamps := []time.Time{ + parseNanoTime("2015-02-12T21:22:05.131117533Z"), + parseNanoTime("2015-02-12T21:22:05.131117533Z"), + parseNanoTime("2015-02-12T21:22:05.333776335Z"), + } + cpuTimes := []uint64{ + 22400432, + 116499979, + 248503503, + } + memoryUtilizationInBytes := []uint64{ + 3649536, + 3649536, + 3649536, + } + // create a queue + queueLength := 3 + queue := NewQueue(queueLength) + + enoughDataPoints := queue.enoughDatapointsInBuffer() + assert.False(t, enoughDataPoints, "Queue is expected NOT having enough data points right after creation") + for i, time := range timestamps { + queue.Add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time}) + } + + enoughDataPoints = queue.enoughDatapointsInBuffer() + assert.True(t, enoughDataPoints, "Queue is expected to have enough data points when it has more than 2 msgs queued") + + queue.Reset() + enoughDataPoints = queue.enoughDatapointsInBuffer() + assert.False(t, enoughDataPoints, "Queue is expected NOT having enough data points right after RESET") +} diff --git a/agent/tcs/client/client.go b/agent/tcs/client/client.go index 1bb73aea1f6..d477e50f9ea 100644 --- a/agent/tcs/client/client.go +++ b/agent/tcs/client/client.go @@ -137,12 +137,18 @@ func (cs *clientServer) publishMetrics() { // Publish metrics immediately after we connect and wait for ticks. This makes // sure that there is no data loss when a scheduled metrics publishing fails // due to a connection reset. - cs.publishMetricsOnce() + err := cs.publishMetricsOnce() + if err != nil && err != stats.EmptyMetricsError { + seelog.Warnf("Error getting instance metrics: %v", err) + } // don't simply range over the ticker since its channel doesn't ever get closed for { select { case <-cs.publishTicker.C: - cs.publishMetricsOnce() + err := cs.publishMetricsOnce() + if err != nil { + seelog.Warnf("Error getting instance metrics: %v", err) + } case <-cs.endPublish: return } @@ -150,20 +156,21 @@ func (cs *clientServer) publishMetrics() { } // publishMetricsOnce is invoked by the ticker to periodically publish metrics to backend. -func (cs *clientServer) publishMetricsOnce() { +func (cs *clientServer) publishMetricsOnce() error { // Get the list of objects to send to backend. requests, err := cs.metricsToPublishMetricRequests() if err != nil { - seelog.Warnf("Error getting instance metrics: %v", err) + return err } // Make the publish metrics request to the backend. for _, request := range requests { err = cs.MakeRequest(request) if err != nil { - seelog.Warnf("Error publishing metrics: %v. Request: %v", err, request) + return err } } + return nil } // metricsToPublishMetricRequests gets task metrics and converts them to a list of PublishMetricRequest