Skip to content

Commit

Permalink
Fixed aws#506
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenwu-amazon committed Jan 4, 2017
1 parent f1da6e0 commit 178a672
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 13 deletions.
6 changes: 3 additions & 3 deletions agent/engine/docker_container_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ const (
// output will be suppressed in debug mode
pullStatusSuppressDelay = 2 * time.Second

// statsInactivityTimeout controls the amount of time we hold open a
// StatsInactivityTimeout controls the amount of time we hold open a
// connection to the Docker daemon waiting for stats data
statsInactivityTimeout = 5 * time.Second
StatsInactivityTimeout = 5 * time.Second
)

// DockerClient interface to make testing it easier
Expand Down Expand Up @@ -809,7 +809,7 @@ func (dg *dockerGoClient) Stats(id string, ctx context.Context) (<-chan *docker.
Stats: stats,
Stream: true,
Context: ctx,
InactivityTimeout: statsInactivityTimeout,
InactivityTimeout: StatsInactivityTimeout,
}

go func() {
Expand Down
1 change: 1 addition & 0 deletions agent/stats/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newStatsContainer(dockerID string, client ecsengine.DockerClient, resolver
func (container *StatsContainer) StartStatsCollection() {
// Create the queue to store utilization data from docker stats
container.statsQueue = NewQueue(ContainerStatsBufferLength)
container.statsQueue.Reset()
go container.collect()
}

Expand Down
12 changes: 11 additions & 1 deletion agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package stats
//go:generate go run ../../scripts/generate/mockgen.go github.com/aws/amazon-ecs-agent/agent/stats Engine mock/$GOFILE

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -35,6 +36,7 @@ import (
const (
containerChangeHandler = "DockerStatsEngineDockerEventsHandler"
listContainersTimeout = 10 * time.Minute
queueResetThreshold = 2 * ecsengine.StatsInactivityTimeout
)

// DockerContainerMetadataResolver implements ContainerMetadataResolver for
Expand Down Expand Up @@ -67,6 +69,7 @@ type DockerStatsEngine struct {
// dockerStatsEngine is a singleton object of DockerStatsEngine.
// TODO make dockerStatsEngine not a singleton object
var dockerStatsEngine *DockerStatsEngine
var EmptyMetricsError = errors.New("No task metrics to report")

// ResolveTask resolves the api task object, given container id.
func (resolver *DockerContainerMetadataResolver) ResolveTask(dockerID string) (*api.Task, error) {
Expand Down Expand Up @@ -234,7 +237,7 @@ func (engine *DockerStatsEngine) GetInstanceMetrics() (*ecstcs.MetricsMetadata,

if len(taskMetrics) == 0 {
// Not idle. Expect taskMetrics to be there.
return nil, nil, fmt.Errorf("No task metrics to report")
return nil, nil, EmptyMetricsError
}

// Reset current stats. Retaining older stats results in incorrect utilization stats
Expand Down Expand Up @@ -398,6 +401,13 @@ func (engine *DockerStatsEngine) getContainerMetricsForTask(taskArn string) ([]*
engine.doRemoveContainer(container, taskArn)
continue
}

if !container.statsQueue.resetThresholdElapsed(queueResetThreshold) &&
!container.statsQueue.enoughDatapointsInBuffer() {
seelog.Debug("Stats not ready for container %s", dockerID)
continue
}

// Container is not terminal. Get CPU stats set.
cpuStatsSet, err := container.statsQueue.GetCPUStatsSet()
if err != nil {
Expand Down
25 changes: 21 additions & 4 deletions agent/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"sync"
"time"

"github.com/aws/amazon-ecs-agent/agent/tcs/model/ecstcs"
"github.com/cihub/seelog"
Expand All @@ -27,11 +28,14 @@ const (
BytesInMiB = 1024 * 1024
)

const minimumQueueDatapoints = 2

// Queue abstracts a queue using UsageStats slice.
type Queue struct {
buffer []UsageStats
maxSize int
bufferLock sync.RWMutex
buffer []UsageStats
maxSize int
lastResetTime time.Time
bufferLock sync.RWMutex
}

// NewQueue creates a queue.
Expand All @@ -46,7 +50,7 @@ func NewQueue(maxSize int) *Queue {
func (queue *Queue) Reset() {
queue.bufferLock.Lock()
defer queue.bufferLock.Unlock()

queue.lastResetTime = time.Now()
queue.buffer = queue.buffer[:0]
}

Expand Down Expand Up @@ -133,6 +137,19 @@ func getMemoryUsagePerc(s *UsageStats) float64 {

type getUsageFunc func(*UsageStats) float64

func (queue *Queue) resetThresholdElapsed(timeout time.Duration) bool {
queue.bufferLock.RLock()
defer queue.bufferLock.RUnlock()
duration := time.Since(queue.lastResetTime)
return duration.Seconds() > timeout.Seconds()
}

func (queue *Queue) enoughDatapointsInBuffer() bool {
queue.bufferLock.RLock()
defer queue.bufferLock.RUnlock()
return len(queue.buffer) >= minimumQueueDatapoints
}

// getCWStatsSet gets the stats set for either CPU or Memory based on the
// function pointer.
func (queue *Queue) getCWStatsSet(f getUsageFunc) (*ecstcs.CWStatsSet, error) {
Expand Down
53 changes: 53 additions & 0 deletions agent/stats/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/assert"
)

const (
Expand Down Expand Up @@ -309,3 +310,55 @@ func TestCpuStatsSetNotSetToInfinity(t *testing.T) {
t.Errorf("Computed cpuStatsSet.SampleCount (%d) != expected value (%d)", sampleCount, 1)
}
}

func TestResetThresholdElapsed(t *testing.T) {
// create a queue
queueLength := 3
queue := NewQueue(queueLength)

queue.Reset()

thresholdElapsed := queue.resetThresholdElapsed(2 * time.Millisecond)
assert.False(t, thresholdElapsed, "Queue is NOT expected to elapsed the threshold right after reset")

time.Sleep(3 * time.Millisecond)
thresholdElapsed = queue.resetThresholdElapsed(2 * time.Millisecond)

assert.True(t, thresholdElapsed, "Queue is expected to elapse the threshold")

}
func TestEnoughDatapointsInBuffer(t *testing.T) {
// timestamps will be used to simulate +Inf CPU Usage
// timestamps[0] = timestamps[1]
timestamps := []time.Time{
parseNanoTime("2015-02-12T21:22:05.131117533Z"),
parseNanoTime("2015-02-12T21:22:05.131117533Z"),
parseNanoTime("2015-02-12T21:22:05.333776335Z"),
}
cpuTimes := []uint64{
22400432,
116499979,
248503503,
}
memoryUtilizationInBytes := []uint64{
3649536,
3649536,
3649536,
}
// create a queue
queueLength := 3
queue := NewQueue(queueLength)

enoughDataPoints := queue.enoughDatapointsInBuffer()
assert.False(t, enoughDataPoints, "Queue is expected NOT having enough data points right after creation")
for i, time := range timestamps {
queue.Add(&ContainerStats{cpuUsage: cpuTimes[i], memoryUsage: memoryUtilizationInBytes[i], timestamp: time})
}

enoughDataPoints = queue.enoughDatapointsInBuffer()
assert.True(t, enoughDataPoints, "Queue is expected to have enough data points when it has more than 2 msgs queued")

queue.Reset()
enoughDataPoints = queue.enoughDatapointsInBuffer()
assert.False(t, enoughDataPoints, "Queue is expected NOT having enough data points right after RESET")
}
17 changes: 12 additions & 5 deletions agent/tcs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,33 +137,40 @@ func (cs *clientServer) publishMetrics() {
// Publish metrics immediately after we connect and wait for ticks. This makes
// sure that there is no data loss when a scheduled metrics publishing fails
// due to a connection reset.
cs.publishMetricsOnce()
err := cs.publishMetricsOnce()
if err != nil && err != stats.EmptyMetricsError {
seelog.Warnf("Error getting instance metrics: %v", err)
}
// don't simply range over the ticker since its channel doesn't ever get closed
for {
select {
case <-cs.publishTicker.C:
cs.publishMetricsOnce()
err := cs.publishMetricsOnce()
if err != nil {
seelog.Warnf("Error getting instance metrics: %v", err)
}
case <-cs.endPublish:
return
}
}
}

// publishMetricsOnce is invoked by the ticker to periodically publish metrics to backend.
func (cs *clientServer) publishMetricsOnce() {
func (cs *clientServer) publishMetricsOnce() error {
// Get the list of objects to send to backend.
requests, err := cs.metricsToPublishMetricRequests()
if err != nil {
seelog.Warnf("Error getting instance metrics: %v", err)
return err
}

// Make the publish metrics request to the backend.
for _, request := range requests {
err = cs.MakeRequest(request)
if err != nil {
seelog.Warnf("Error publishing metrics: %v. Request: %v", err, request)
return err
}
}
return nil
}

// metricsToPublishMetricRequests gets task metrics and converts them to a list of PublishMetricRequest
Expand Down

0 comments on commit 178a672

Please sign in to comment.