Skip to content

Commit

Permalink
Fix a bug where network stats is not presented in container stats.
Browse files Browse the repository at this point in the history
When switching to use Docker Go SDK, we used types.Stats to save the stats response from corresponding Docker Engine API. This struct doesn't contain all fields in the API response, notably the "networks" field is missing and therefore the field isn't presented in container stats response. This field was populated when we were using fsouza/go-dockerclient prior to v1.24.0. Fixing it by changing to use types.StatsJSON struct which contains the "networks" field along with fields in types.Stats.
  • Loading branch information
fenxiong committed Mar 11, 2019
1 parent 08d3eaf commit 06d4f50
Show file tree
Hide file tree
Showing 21 changed files with 111 additions and 64 deletions.
16 changes: 8 additions & 8 deletions agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ type DockerClient interface {

// Stats returns a channel of stat data for the specified container. A context should be provided so the request can
// be canceled.
Stats(context.Context, string, time.Duration) (<-chan *types.Stats, error)
Stats(context.Context, string, time.Duration) (<-chan *types.StatsJSON, error)

// Version returns the version of the Docker daemon.
Version(context.Context, time.Duration) (string, error)
Expand Down Expand Up @@ -1287,8 +1287,8 @@ func (dg *dockerGoClient) APIVersion() (dockerclient.DockerVersion, error) {
return dg.sdkClientFactory.FindClientAPIVersion(client), nil
}

// Stats returns a channel of *types.Stats entries for the container.
func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeout time.Duration) (<-chan *types.Stats, error) {
// Stats returns a channel of *types.StatsJSON entries for the container.
func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeout time.Duration) (<-chan *types.StatsJSON, error) {
subCtx, cancelRequest := context.WithCancel(ctx)

client, err := dg.sdkDockerClient()
Expand All @@ -1298,7 +1298,7 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
}

// Create channel to hold the stats
statsChnl := make(chan *types.Stats)
statsChnl := make(chan *types.StatsJSON)
var resp types.ContainerStats

if !dg.config.PollMetrics {
Expand All @@ -1322,7 +1322,7 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou

// Returns a *Decoder and takes in a readCloser
decoder := json.NewDecoder(resp.Body)
data := new(types.Stats)
data := new(types.StatsJSON)
for err := decoder.Decode(data); err != io.EOF; err = decoder.Decode(data) {
if err != nil {
seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err)
Expand All @@ -1334,7 +1334,7 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou
}

statsChnl <- data
data = new(types.Stats)
data = new(types.StatsJSON)
}
}()
} else {
Expand All @@ -1359,15 +1359,15 @@ func (dg *dockerGoClient) Stats(ctx context.Context, id string, inactivityTimeou

// Returns a *Decoder and takes in a readCloser
decoder := json.NewDecoder(resp.Body)
data := new(types.Stats)
data := new(types.StatsJSON)
err := decoder.Decode(data)
if err != nil {
seelog.Warnf("DockerGoClient: Unable to decode stats for container %s: %v", id, err)
return
}

statsChnl <- data
data = new(types.Stats)
data = new(types.StatsJSON)
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion agent/dockerclient/dockerapi/docker_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ func (ms mockStream) Read(data []byte) (n int, err error) {
func (ms mockStream) Close() error {
return nil
}
func waitForStats(t *testing.T, stat *types.Stats) {
func waitForStats(t *testing.T, stat *types.StatsJSON) {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
for {
Expand Down
4 changes: 2 additions & 2 deletions agent/dockerclient/dockerapi/mocks/dockerapi_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 12 additions & 8 deletions agent/handlers/task_server_setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,8 @@ func TestV2ContainerStats(t *testing.T) {
statsEngine := mock_stats.NewMockEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)

dockerStats := &types.Stats{NumProcs: 2}
dockerStats := &types.StatsJSON{}
dockerStats.NumProcs = 2
gomock.InOrder(
state.EXPECT().GetTaskByIPAddress(remoteIP).Return(taskARN, true),
statsEngine.EXPECT().ContainerDockerStats(taskARN, containerID).Return(dockerStats, nil),
Expand All @@ -684,7 +685,7 @@ func TestV2ContainerStats(t *testing.T) {
res, err := ioutil.ReadAll(recorder.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, recorder.Code)
var statsFromResult *types.Stats
var statsFromResult *types.StatsJSON
err = json.Unmarshal(res, &statsFromResult)
assert.NoError(t, err)
assert.Equal(t, dockerStats.NumProcs, statsFromResult.NumProcs)
Expand Down Expand Up @@ -712,7 +713,8 @@ func TestV2TaskStats(t *testing.T) {
statsEngine := mock_stats.NewMockEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)

dockerStats := &types.Stats{NumProcs: 2}
dockerStats := &types.StatsJSON{}
dockerStats.NumProcs = 2
containerMap := map[string]*apicontainer.DockerContainer{
containerName: {
DockerID: containerID,
Expand All @@ -732,7 +734,7 @@ func TestV2TaskStats(t *testing.T) {
res, err := ioutil.ReadAll(recorder.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, recorder.Code)
var statsFromResult map[string]*types.Stats
var statsFromResult map[string]*types.StatsJSON
err = json.Unmarshal(res, &statsFromResult)
assert.NoError(t, err)
containerStats, ok := statsFromResult[containerID]
Expand Down Expand Up @@ -938,7 +940,8 @@ func TestV3TaskStats(t *testing.T) {
statsEngine := mock_stats.NewMockEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)

dockerStats := &types.Stats{NumProcs: 2}
dockerStats := &types.StatsJSON{}
dockerStats.NumProcs = 2

containerMap := map[string]*apicontainer.DockerContainer{
containerName: {
Expand All @@ -959,7 +962,7 @@ func TestV3TaskStats(t *testing.T) {
res, err := ioutil.ReadAll(recorder.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, recorder.Code)
var statsFromResult map[string]*types.Stats
var statsFromResult map[string]*types.StatsJSON
err = json.Unmarshal(res, &statsFromResult)
assert.NoError(t, err)
containerStats, ok := statsFromResult[containerID]
Expand All @@ -976,7 +979,8 @@ func TestV3ContainerStats(t *testing.T) {
statsEngine := mock_stats.NewMockEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)

dockerStats := &types.Stats{NumProcs: 2}
dockerStats := &types.StatsJSON{}
dockerStats.NumProcs = 2

gomock.InOrder(
state.EXPECT().TaskARNByV3EndpointID(v3EndpointID).Return(taskARN, true),
Expand All @@ -991,7 +995,7 @@ func TestV3ContainerStats(t *testing.T) {
res, err := ioutil.ReadAll(recorder.Body)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, recorder.Code)
var statsFromResult *types.Stats
var statsFromResult *types.StatsJSON
err = json.Unmarshal(res, &statsFromResult)
assert.NoError(t, err)
assert.Equal(t, dockerStats.NumProcs, statsFromResult.NumProcs)
Expand Down
4 changes: 2 additions & 2 deletions agent/handlers/v2/stats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// NewTaskStatsResponse returns a new task stats response object
func NewTaskStatsResponse(taskARN string,
state dockerstate.TaskEngineState,
statsEngine stats.Engine) (map[string]*types.Stats, error) {
statsEngine stats.Engine) (map[string]*types.StatsJSON, error) {

containerMap, ok := state.ContainerMapByArn(taskARN)
if !ok {
Expand All @@ -33,7 +33,7 @@ func NewTaskStatsResponse(taskARN string,
taskARN)
}

resp := make(map[string]*types.Stats)
resp := make(map[string]*types.StatsJSON)
for _, dockerContainer := range containerMap {
containerID := dockerContainer.DockerID
dockerStats, err := statsEngine.ContainerDockerStats(taskARN, containerID)
Expand Down
3 changes: 2 additions & 1 deletion agent/handlers/v2/stats_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestTaskStatsResponseSuccess(t *testing.T) {
state := mock_dockerstate.NewMockTaskEngineState(ctrl)
statsEngine := mock_stats.NewMockEngine(ctrl)

dockerStats := &types.Stats{NumProcs: 2}
dockerStats := &types.StatsJSON{}
dockerStats.NumProcs = 2
containerMap := map[string]*apicontainer.DockerContainer{
containerName: {
DockerID: containerID,
Expand Down
12 changes: 6 additions & 6 deletions agent/stats/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func TestContainerStatsCollection(t *testing.T) {

dockerID := "container1"
ctx, cancel := context.WithCancel(context.TODO())
statChan := make(chan *types.Stats)
statChan := make(chan *types.StatsJSON)
mockDockerClient.EXPECT().Stats(ctx, dockerID, dockerclient.StatsInactivityTimeout).Return(statChan, nil)
go func() {
for _, stat := range statsData {
// doing this with json makes me sad, but is the easiest way to
// deal with the types.Stats.MemoryStats inner struct
// deal with the types.StatsJSON.MemoryStats inner struct
jsonStat := fmt.Sprintf(`
{
"memory_stats": {"usage":%d, "privateworkingset":%d},
Expand All @@ -72,7 +72,7 @@ func TestContainerStatsCollection(t *testing.T) {
}
}
}`, stat.memBytes, stat.memBytes, stat.cpuTime, stat.cpuTime)
dockerStat := &types.Stats{}
dockerStat := &types.StatsJSON{}
json.Unmarshal([]byte(jsonStat), dockerStat)
dockerStat.Read = stat.timestamp
statChan <- dockerStat
Expand Down Expand Up @@ -134,9 +134,9 @@ func TestContainerStatsCollectionReconnection(t *testing.T) {
dockerID := "container1"
ctx, cancel := context.WithCancel(context.TODO())

statChan := make(chan *types.Stats)
statChan := make(chan *types.StatsJSON)
statErr := fmt.Errorf("test error")
closedChan := make(chan *types.Stats)
closedChan := make(chan *types.StatsJSON)
close(closedChan)

mockContainer := &apicontainer.DockerContainer{
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestContainerStatsCollectionStopsIfContainerIsTerminal(t *testing.T) {
dockerID := "container1"
ctx, cancel := context.WithCancel(context.TODO())

closedChan := make(chan *types.Stats)
closedChan := make(chan *types.StatsJSON)
close(closedChan)

statsErr := fmt.Errorf("test error")
Expand Down
4 changes: 2 additions & 2 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type DockerContainerMetadataResolver struct {
// defined to make testing easier.
type Engine interface {
GetInstanceMetrics() (*ecstcs.MetricsMetadata, []*ecstcs.TaskMetric, error)
ContainerDockerStats(taskARN string, containerID string) (*types.Stats, error)
ContainerDockerStats(taskARN string, containerID string) (*types.StatsJSON, error)
GetTaskHealthMetrics() (*ecstcs.HealthMetadata, []*ecstcs.TaskHealth, error)
}

Expand Down Expand Up @@ -648,7 +648,7 @@ func (engine *DockerStatsEngine) resetStatsUnsafe() {
}

// ContainerDockerStats returns the last stored raw docker stats object for a container
func (engine *DockerStatsEngine) ContainerDockerStats(taskARN string, containerID string) (*types.Stats, error) {
func (engine *DockerStatsEngine) ContainerDockerStats(taskARN string, containerID string) (*types.StatsJSON, error) {
engine.lock.RLock()
defer engine.lock.RUnlock()

Expand Down
15 changes: 5 additions & 10 deletions agent/stats/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestStatsEngineAddRemoveContainers(t *testing.T) {
resolver.EXPECT().ResolveContainer(gomock.Any()).AnyTimes().Return(&apicontainer.DockerContainer{
Container: &apicontainer.Container{},
}, nil)
mockStatsChannel := make(chan *types.Stats)
mockStatsChannel := make(chan *types.StatsJSON)
defer close(mockStatsChannel)
mockDockerClient.EXPECT().Stats(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStatsChannel, nil).AnyTimes()

Expand Down Expand Up @@ -194,14 +194,9 @@ func TestStatsEngineMetadataInStatsSets(t *testing.T) {
{22400432, 1839104, ts1},
{116499979, 3649536, ts2},
}
dockerStats := []*types.Stats{
{
Read: ts1,
},
{
Read: ts2,
},
}
dockerStats := []*types.StatsJSON{{}, {},}
dockerStats[0].Read = ts1
dockerStats[1].Read = ts2
containers, _ := engine.tasksToContainers["t1"]
for _, statsContainer := range containers {
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -398,7 +393,7 @@ func TestSynchronizeOnRestart(t *testing.T) {
defer ctrl.Finish()

containerID := "containerID"
statsChan := make(chan *types.Stats)
statsChan := make(chan *types.StatsJSON)
statsStarted := make(chan struct{})
client := mock_dockerapi.NewMockDockerClient(ctrl)
resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl)
Expand Down
4 changes: 2 additions & 2 deletions agent/stats/mock/engine.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions agent/stats/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Queue struct {
buffer []UsageStats
maxSize int
lastResetTime time.Time
lastStat *types.Stats
lastStat *types.StatsJSON
lock sync.RWMutex
}

Expand All @@ -57,7 +57,7 @@ func (queue *Queue) Reset() {
}

// Add adds a new set of container stats to the queue.
func (queue *Queue) Add(dockerStat *types.Stats) error {
func (queue *Queue) Add(dockerStat *types.StatsJSON) error {
queue.setLastStat(dockerStat)
stat, err := dockerStatsToContainerStats(dockerStat)
if err != nil {
Expand All @@ -67,7 +67,7 @@ func (queue *Queue) Add(dockerStat *types.Stats) error {
return nil
}

func (queue *Queue) setLastStat(stat *types.Stats) {
func (queue *Queue) setLastStat(stat *types.StatsJSON) {
queue.lock.Lock()
defer queue.lock.Unlock()

Expand Down Expand Up @@ -108,7 +108,7 @@ func (queue *Queue) add(rawStat *ContainerStats) {
}

// GetLastStat returns the last recorded raw statistics object from docker
func (queue *Queue) GetLastStat() *types.Stats {
func (queue *Queue) GetLastStat() *types.StatsJSON {
queue.lock.RLock()
defer queue.lock.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion agent/stats/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestDockerStatsToContainerStatsMemUsage(t *testing.T) {
"privateworkingset": %d
}
}`, 1, 2, 3, 4, 100, 30, 100, 20, 10, 10)
dockerStat := &types.Stats{}
dockerStat := &types.StatsJSON{}
json.Unmarshal([]byte(jsonStat), dockerStat)
containerStats, err := dockerStatsToContainerStats(dockerStat)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion agent/stats/utils_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// dockerStatsToContainerStats returns a new object of the ContainerStats object from docker stats.
func dockerStatsToContainerStats(dockerStats *types.Stats) (*ContainerStats, error) {
func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, error) {
// The length of PercpuUsage represents the number of cores in an instance.
if len(dockerStats.CPUStats.CPUUsage.PercpuUsage) == 0 || numCores == uint64(0) {
seelog.Debug("Invalid container statistics reported, no cpu core usage reported")
Expand Down
4 changes: 2 additions & 2 deletions agent/stats/utils_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) {
}
}
}`, 100)
dockerStat := &types.Stats{}
dockerStat := &types.StatsJSON{}
json.Unmarshal([]byte(jsonStat), dockerStat)
_, err := dockerStatsToContainerStats(dockerStat)
assert.Error(t, err, "expected error converting container stats with empty PercpuUsage")
Expand All @@ -57,7 +57,7 @@ func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) {
}
}
}`, 1, 2, 3, 4, 100)
dockerStat := &types.Stats{}
dockerStat := &types.StatsJSON{}
json.Unmarshal([]byte(jsonStat), dockerStat)
containerStats, err := dockerStatsToContainerStats(dockerStat)
assert.NoError(t, err, "converting container stats failed")
Expand Down
2 changes: 1 addition & 1 deletion agent/stats/utils_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// dockerStatsToContainerStats returns a new object of the ContainerStats object from docker stats.
func dockerStatsToContainerStats(dockerStats *types.Stats) (*ContainerStats, error) {
func dockerStatsToContainerStats(dockerStats *types.StatsJSON) (*ContainerStats, error) {
if numCores == uint64(0) {
seelog.Error("Invalid number of cpu cores acquired from the system")
return nil, fmt.Errorf("invalid number of cpu cores acquired from the system")
Expand Down
4 changes: 2 additions & 2 deletions agent/stats/utils_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDockerStatsToContainerStatsZeroCoresGeneratesError(t *testing.T) {
}
}
}`, 100)
dockerStat := &types.Stats{}
dockerStat := &types.StatsJSON{}
json.Unmarshal([]byte(jsonStat), dockerStat)
_, err := dockerStatsToContainerStats(dockerStat)
assert.Error(t, err, "expected error converting container stats with zero cpu cores")
Expand All @@ -55,7 +55,7 @@ func TestDockerStatsToContainerStatsCpuUsage(t *testing.T) {
}
}
}`, 100)
dockerStat := &types.Stats{}
dockerStat := &types.StatsJSON{}
json.Unmarshal([]byte(jsonStat), dockerStat)
containerStats, err := dockerStatsToContainerStats(dockerStat)
assert.NoError(t, err, "converting container stats failed")
Expand Down
Loading

0 comments on commit 06d4f50

Please sign in to comment.