diff --git a/agent/stats/engine.go b/agent/stats/engine.go index 25663f07e49..ef19761c482 100644 --- a/agent/stats/engine.go +++ b/agent/stats/engine.go @@ -281,7 +281,7 @@ func (engine *DockerStatsEngine) addToStatsTaskMapUnsafe(task *apitask.Task, doc } containerpid := strconv.Itoa(containerInspect.State.Pid) statsTaskContainer, err = newStatsTaskContainer(task.Arn, containerpid, numberOfContainers, - engine.resolver, engine.config.PollingMetricsWaitDuration) + engine.resolver, engine.config.PollingMetricsWaitDuration, task.ENIs) if err != nil { return } diff --git a/agent/stats/task.go b/agent/stats/task.go new file mode 100644 index 00000000000..b8cdea6fcbd --- /dev/null +++ b/agent/stats/task.go @@ -0,0 +1,155 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package stats + +import ( + "context" + "fmt" + "time" + + apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" + "github.com/aws/amazon-ecs-agent/agent/config" + "github.com/aws/amazon-ecs-agent/agent/stats/resolver" + "github.com/aws/amazon-ecs-agent/agent/utils/retry" + + "github.com/cihub/seelog" + "github.com/docker/docker/api/types" + dockerstats "github.com/docker/docker/api/types" +) + +// statsTaskCommon contains the common fields in StatsTask for both Linux and Windows. +// StatsTask abstracts methods to gather and aggregate network data for a task. Used only for AWSVPC mode. +type statsTaskCommon struct { + StatsQueue *Queue + TaskMetadata *TaskMetadata + Ctx context.Context + Cancel context.CancelFunc + Resolver resolver.ContainerMetadataResolver + metricPublishInterval time.Duration +} + +func (taskStat *StatsTask) StartStatsCollection() { + queueSize := int(config.DefaultContainerMetricsPublishInterval.Seconds() * 4) + taskStat.StatsQueue = NewQueue(queueSize) + taskStat.StatsQueue.Reset() + go taskStat.collect() +} + +func (taskStat *StatsTask) StopStatsCollection() { + taskStat.Cancel() +} + +func (taskStat *StatsTask) collect() { + taskArn := taskStat.TaskMetadata.TaskArn + backoff := retry.NewExponentialBackoff(time.Second*1, time.Second*10, 0.5, 2) + + for { + err := taskStat.processStatsStream() + select { + case <-taskStat.Ctx.Done(): + seelog.Debugf("Stopping stats collection for taskStat %s", taskArn) + return + default: + if err != nil { + d := backoff.Duration() + time.Sleep(d) + seelog.Debugf("Error querying stats for task %s: %v", taskArn, err) + } + // We were disconnected from the stats stream. + // Check if the task is terminal. If it is, stop collecting metrics. + terminal, err := taskStat.terminal() + if err != nil { + // Error determining if the task is terminal. clean-up anyway. + seelog.Warnf("Error determining if the task %s is terminal, stopping stats collection: %v", + taskArn, err) + taskStat.StopStatsCollection() + } else if terminal { + seelog.Infof("Task %s is terminal, stopping stats collection", taskArn) + taskStat.StopStatsCollection() + } + } + } +} + +func (taskStat *StatsTask) processStatsStream() error { + taskArn := taskStat.TaskMetadata.TaskArn + awsvpcNetworkStats, errC := taskStat.getAWSVPCNetworkStats() + + returnError := false + for { + select { + case <-taskStat.Ctx.Done(): + seelog.Info("task context is done") + return nil + case err := <-errC: + seelog.Warnf("Error encountered processing metrics stream from host, this may affect "+ + "cloudwatch metric accuracy: %s", err) + returnError = true + case rawStat, ok := <-awsvpcNetworkStats: + if !ok { + if returnError { + return fmt.Errorf("error encountered processing metrics stream from host") + } + return nil + } + if err := taskStat.StatsQueue.Add(rawStat); err != nil { + seelog.Warnf("Task [%s]: error converting stats: %v", taskArn, err) + } + } + + } +} + +func (taskStat *StatsTask) terminal() (bool, error) { + resolvedTask, err := taskStat.Resolver.ResolveTaskByARN(taskStat.TaskMetadata.TaskArn) + if err != nil { + return false, err + } + return resolvedTask.GetKnownStatus() == apitaskstatus.TaskStopped, nil +} + +func (taskStat *StatsTask) getAWSVPCNetworkStats() (<-chan *types.StatsJSON, <-chan error) { + + errC := make(chan error, 1) + statsC := make(chan *dockerstats.StatsJSON) + if taskStat.TaskMetadata.NumberContainers > 0 { + go func() { + defer close(statsC) + statPollTicker := time.NewTicker(taskStat.metricPublishInterval) + defer statPollTicker.Stop() + for range statPollTicker.C { + + networkStats, err := taskStat.retrieveNetworkStatistics() + if err != nil { + errC <- err + return + } + + dockerStats := &types.StatsJSON{ + Networks: networkStats, + Stats: types.Stats{ + Read: time.Now(), + }, + } + select { + case <-taskStat.Ctx.Done(): + return + case statsC <- dockerStats: + } + } + }() + } + + return statsC, errC +} diff --git a/agent/stats/task_linux.go b/agent/stats/task_linux.go index b388f596641..ca07c3a7571 100644 --- a/agent/stats/task_linux.go +++ b/agent/stats/task_linux.go @@ -20,17 +20,13 @@ import ( "fmt" "time" - apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" - "github.com/aws/amazon-ecs-agent/agent/config" + "github.com/aws/amazon-ecs-agent/agent/api/task" "github.com/aws/amazon-ecs-agent/agent/ecscni" "github.com/aws/amazon-ecs-agent/agent/eni/netlinkwrapper" "github.com/aws/amazon-ecs-agent/agent/stats/resolver" "github.com/aws/amazon-ecs-agent/agent/utils/nswrapper" - "github.com/aws/amazon-ecs-agent/agent/utils/retry" - "github.com/containernetworking/plugins/pkg/ns" - "github.com/cihub/seelog" - "github.com/docker/docker/api/types" + "github.com/containernetworking/plugins/pkg/ns" dockerstats "github.com/docker/docker/api/types" netlinklib "github.com/vishvananda/netlink" ) @@ -46,119 +42,35 @@ const ( encapTypeLoopback = "loopback" ) -// StatsTask abstracts methods to gather and aggregate network data for a task. Used only for AWSVPC mode. type StatsTask struct { - StatsQueue *Queue - TaskMetadata *TaskMetadata - Ctx context.Context - Cancel context.CancelFunc - Resolver resolver.ContainerMetadataResolver - nswrapperinterface nswrapper.NS - netlinkinterface netlinkwrapper.NetLink - metricPublishInterval time.Duration + *statsTaskCommon + nswrapperinterface nswrapper.NS + netlinkinterface netlinkwrapper.NetLink } func newStatsTaskContainer(taskARN string, containerPID string, numberOfContainers int, - resolver resolver.ContainerMetadataResolver, publishInterval time.Duration) (*StatsTask, error) { + resolver resolver.ContainerMetadataResolver, publishInterval time.Duration, _ task.TaskENIs) (*StatsTask, error) { nsAgent := nswrapper.NewNS() netlinkclient := netlinkwrapper.New() ctx, cancel := context.WithCancel(context.Background()) return &StatsTask{ - TaskMetadata: &TaskMetadata{ - TaskArn: taskARN, - ContainerPID: containerPID, - NumberContainers: numberOfContainers, + statsTaskCommon: &statsTaskCommon{ + TaskMetadata: &TaskMetadata{ + TaskArn: taskARN, + ContainerPID: containerPID, + NumberContainers: numberOfContainers, + }, + Ctx: ctx, + Cancel: cancel, + Resolver: resolver, + metricPublishInterval: publishInterval, }, - Ctx: ctx, - Cancel: cancel, - Resolver: resolver, - netlinkinterface: netlinkclient, - nswrapperinterface: nsAgent, - metricPublishInterval: publishInterval, + netlinkinterface: netlinkclient, + nswrapperinterface: nsAgent, }, nil } -func (taskStat *StatsTask) StartStatsCollection() { - queueSize := int(config.DefaultContainerMetricsPublishInterval.Seconds() * 4) - taskStat.StatsQueue = NewQueue(queueSize) - taskStat.StatsQueue.Reset() - go taskStat.collect() -} - -func (taskStat *StatsTask) StopStatsCollection() { - taskStat.Cancel() -} - -func (taskStat *StatsTask) collect() { - taskArn := taskStat.TaskMetadata.TaskArn - backoff := retry.NewExponentialBackoff(time.Second*1, time.Second*10, 0.5, 2) - - for { - err := taskStat.processStatsStream() - select { - case <-taskStat.Ctx.Done(): - seelog.Debugf("Stopping stats collection for taskStat %s", taskArn) - return - default: - if err != nil { - d := backoff.Duration() - time.Sleep(d) - seelog.Debugf("Error querying stats for task %s: %v", taskArn, err) - } - // We were disconnected from the stats stream. - // Check if the task is terminal. If it is, stop collecting metrics. - terminal, err := taskStat.terminal() - if err != nil { - // Error determining if the task is terminal. clean-up anyway. - seelog.Warnf("Error determining if the task %s is terminal, stopping stats collection: %v", - taskArn, err) - taskStat.StopStatsCollection() - } else if terminal { - seelog.Infof("Task %s is terminal, stopping stats collection", taskArn) - taskStat.StopStatsCollection() - } - } - } -} - -func (taskStat *StatsTask) processStatsStream() error { - taskArn := taskStat.TaskMetadata.TaskArn - awsvpcNetworkStats, errC := taskStat.getAWSVPCNetworkStats() - - returnError := false - for { - select { - case <-taskStat.Ctx.Done(): - seelog.Info("task context is done") - return nil - case err := <-errC: - seelog.Warnf("Error encountered processing metrics stream from host, this may affect "+ - "cloudwatch metric accuracy: %s", err) - returnError = true - case rawStat, ok := <-awsvpcNetworkStats: - if !ok { - if returnError { - return fmt.Errorf("error encountered processing metrics stream from host") - } - return nil - } - if err := taskStat.StatsQueue.Add(rawStat); err != nil { - seelog.Warnf("Task [%s]: error converting stats: %v", taskArn, err) - } - } - - } -} - -func (taskStat *StatsTask) terminal() (bool, error) { - resolvedTask, err := taskStat.Resolver.ResolveTaskByARN(taskStat.TaskMetadata.TaskArn) - if err != nil { - return false, err - } - return resolvedTask.GetKnownStatus() == apitaskstatus.TaskStopped, nil -} - func getDevicesList(linkList []netlinklib.Link) []string { var deviceNames []string for _, link := range linkList { @@ -202,60 +114,35 @@ func linkStatsToDockerStats(netLinkStats *netlinklib.LinkStatistics, numberOfCon return networkStats } -func (taskStat *StatsTask) getAWSVPCNetworkStats() (<-chan *types.StatsJSON, <-chan error) { - - errC := make(chan error, 1) - statsC := make(chan *dockerstats.StatsJSON) - if taskStat.TaskMetadata.NumberContainers > 0 { - go func() { - defer close(statsC) - statPollTicker := time.NewTicker(taskStat.metricPublishInterval) - defer statPollTicker.Stop() - for range statPollTicker.C { - if len(taskStat.TaskMetadata.DeviceName) == 0 { - var err error - taskStat.TaskMetadata.DeviceName, err = taskStat.populateNIDeviceList(taskStat.TaskMetadata.ContainerPID) - if err != nil { - errC <- err - return - } - } - networkStats := make(map[string]dockerstats.NetworkStats, len(taskStat.TaskMetadata.DeviceName)) - for _, device := range taskStat.TaskMetadata.DeviceName { - var link netlinklib.Link - err := taskStat.nswrapperinterface.WithNetNSPath(fmt.Sprintf(ecscni.NetnsFormat, - taskStat.TaskMetadata.ContainerPID), - func(ns.NetNS) error { - var linkErr error - if link, linkErr = taskStat.netlinkinterface.LinkByName(device); linkErr != nil { - return linkErr - } - return nil - }) +func (taskStat *StatsTask) retrieveNetworkStatistics() (map[string]dockerstats.NetworkStats, error) { + if len(taskStat.TaskMetadata.DeviceName) == 0 { + var err error + taskStat.TaskMetadata.DeviceName, err = taskStat.populateNIDeviceList(taskStat.TaskMetadata.ContainerPID) + if err != nil { + return nil, err + } + } - if err != nil { - errC <- err - return - } - netLinkStats := link.Attrs().Statistics - networkStats[link.Attrs().Name] = linkStatsToDockerStats(netLinkStats, - uint64(taskStat.TaskMetadata.NumberContainers)) + networkStats := make(map[string]dockerstats.NetworkStats, len(taskStat.TaskMetadata.DeviceName)) + for _, device := range taskStat.TaskMetadata.DeviceName { + var link netlinklib.Link + err := taskStat.nswrapperinterface.WithNetNSPath(fmt.Sprintf(ecscni.NetnsFormat, + taskStat.TaskMetadata.ContainerPID), + func(ns.NetNS) error { + var linkErr error + if link, linkErr = taskStat.netlinkinterface.LinkByName(device); linkErr != nil { + return linkErr } + return nil + }) - dockerStats := &types.StatsJSON{ - Networks: networkStats, - Stats: types.Stats{ - Read: time.Now(), - }, - } - select { - case <-taskStat.Ctx.Done(): - return - case statsC <- dockerStats: - } - } - }() + if err != nil { + return nil, err + } + netLinkStats := link.Attrs().Statistics + networkStats[link.Attrs().Name] = linkStatsToDockerStats(netLinkStats, + uint64(taskStat.TaskMetadata.NumberContainers)) } - return statsC, errC + return networkStats, nil } diff --git a/agent/stats/task_linux_test.go b/agent/stats/task_linux_test.go index 45f701407cd..05997589126 100644 --- a/agent/stats/task_linux_test.go +++ b/agent/stats/task_linux_test.go @@ -45,18 +45,20 @@ func TestTaskStatsCollection(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) numberOfContainers := 2 taskStats := &StatsTask{ - TaskMetadata: &TaskMetadata{ - TaskArn: taskId, - ContainerPID: containerPID, - DeviceName: []string{"device1", "device2"}, - NumberContainers: numberOfContainers, + statsTaskCommon: &statsTaskCommon{ + TaskMetadata: &TaskMetadata{ + TaskArn: taskId, + ContainerPID: containerPID, + DeviceName: []string{"device1", "device2"}, + NumberContainers: numberOfContainers, + }, + Ctx: ctx, + Cancel: cancel, + Resolver: resolver, + metricPublishInterval: time.Second, }, - Ctx: ctx, - Cancel: cancel, - Resolver: resolver, - netlinkinterface: mockNetLink, - nswrapperinterface: mockNS, - metricPublishInterval: time.Second, + netlinkinterface: mockNetLink, + nswrapperinterface: mockNS, } testTask := &apitask.Task{ @@ -105,18 +107,20 @@ func TestTaskStatsCollectionError(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) taskStats := &StatsTask{ - TaskMetadata: &TaskMetadata{ - TaskArn: taskId, - ContainerPID: containerPID, - DeviceName: []string{"device1", "device2"}, - NumberContainers: 2, + statsTaskCommon: &statsTaskCommon{ + TaskMetadata: &TaskMetadata{ + TaskArn: taskId, + ContainerPID: containerPID, + DeviceName: []string{"device1", "device2"}, + NumberContainers: 2, + }, + Ctx: ctx, + Cancel: cancel, + Resolver: resolver, + metricPublishInterval: time.Second, }, - Ctx: ctx, - Cancel: cancel, - Resolver: resolver, - netlinkinterface: mockNetLink, - nswrapperinterface: mockNS, - metricPublishInterval: time.Second, + netlinkinterface: mockNetLink, + nswrapperinterface: mockNS, } testTask := &apitask.Task{ diff --git a/agent/stats/task_unspecified.go b/agent/stats/task_unspecified.go index b23fcdf996d..c7ed848c4ad 100644 --- a/agent/stats/task_unspecified.go +++ b/agent/stats/task_unspecified.go @@ -1,4 +1,4 @@ -//go:build !linux +//go:build !linux && !windows // Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. // @@ -18,26 +18,24 @@ package stats import ( "time" + "github.com/aws/amazon-ecs-agent/agent/api/task" "github.com/aws/amazon-ecs-agent/agent/stats/resolver" + + dockerstats "github.com/docker/docker/api/types" "github.com/pkg/errors" ) // Dummy StatsTasks type StatsTask struct { - // AWSVPC network stats only supported on linux - TaskMetadata *TaskMetadata - StatsQueue *Queue + // AWSVPC network stats only supported on linux and Windows + *statsTaskCommon } func newStatsTaskContainer(taskARN string, containerPID string, numberOfContainers int, - resolver resolver.ContainerMetadataResolver, publishInterval time.Duration) (*StatsTask, error) { + resolver resolver.ContainerMetadataResolver, publishInterval time.Duration, _ task.TaskENIs) (*StatsTask, error) { return nil, errors.New("Unsupported platform") } -func (task *StatsTask) StartStatsCollection() { - // AWSVPC network stats only supported on linux -} - -func (task *StatsTask) StopStatsCollection() { - // AWSVPC network stats only supported on linux +func (taskStat *StatsTask) retrieveNetworkStatistics() (map[string]dockerstats.NetworkStats, error) { + return nil, errors.New("Unsupported platform") } diff --git a/agent/stats/task_windows.go b/agent/stats/task_windows.go new file mode 100644 index 00000000000..4bbc84fbaa9 --- /dev/null +++ b/agent/stats/task_windows.go @@ -0,0 +1,175 @@ +//go:build windows + +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package stats + +import ( + "context" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/aws/amazon-ecs-agent/agent/api/task" + "github.com/aws/amazon-ecs-agent/agent/stats/resolver" + + dockerstats "github.com/docker/docker/api/types" + "github.com/pkg/errors" +) + +const ( + receivedBroadcastPackets = "ReceivedBroadcastPackets" + receivedMulticastPackets = "ReceivedMulticastPackets" + receivedUnicastPackets = "ReceivedUnicastPackets" + sentBroadcastPackets = "SentBroadcastPackets" + sentMulticastPackets = "SentMulticastPackets" + sentUnicastPackets = "SentUnicastPackets" + receivedBytes = "ReceivedBytes" + receivedPacketErrors = "ReceivedPacketErrors" + receivedDiscardedPackets = "ReceivedDiscardedPackets" + sentBytes = "SentBytes" + outboundPacketErrors = "OutboundPacketErrors" + outboundDiscardedPackets = "OutboundDiscardedPackets" +) + +var ( + // Making it visible for unit testing + execCommand = exec.Command + // Fields to be extracted from the stats returned by cmdlet. + networkStatKeys = []string{ + receivedBroadcastPackets, + receivedMulticastPackets, + receivedUnicastPackets, + sentBroadcastPackets, + sentMulticastPackets, + sentUnicastPackets, + receivedBytes, + receivedPacketErrors, + receivedDiscardedPackets, + sentBytes, + outboundDiscardedPackets, + outboundPacketErrors, + } +) + +type StatsTask struct { + *statsTaskCommon +} + +func newStatsTaskContainer(taskARN string, containerPID string, numberOfContainers int, + resolver resolver.ContainerMetadataResolver, publishInterval time.Duration, taskENIs task.TaskENIs) (*StatsTask, error) { + ctx, cancel := context.WithCancel(context.Background()) + + devices := make([]string, len(taskENIs)) + for index, device := range taskENIs { + devices[index] = device.LinkName + } + + return &StatsTask{ + statsTaskCommon: &statsTaskCommon{ + TaskMetadata: &TaskMetadata{ + TaskArn: taskARN, + ContainerPID: containerPID, + DeviceName: devices, + NumberContainers: numberOfContainers, + }, + Ctx: ctx, + Cancel: cancel, + Resolver: resolver, + metricPublishInterval: publishInterval, + }, + }, nil +} + +func (taskStat *StatsTask) retrieveNetworkStatistics() (map[string]dockerstats.NetworkStats, error) { + if len(taskStat.TaskMetadata.DeviceName) == 0 { + return nil, errors.Errorf("unable to find any device name associated with the task %s", taskStat.TaskMetadata.TaskArn) + } + + networkStats := make(map[string]dockerstats.NetworkStats, len(taskStat.TaskMetadata.DeviceName)) + for _, device := range taskStat.TaskMetadata.DeviceName { + networkAdaptorStatistics, err := taskStat.getNetworkAdaptorStatistics(device) + if err != nil { + return nil, err + } + networkStats[device] = *networkAdaptorStatistics + } + + return networkStats, nil +} + +// getNetworkAdaptorStatistics returns the network statistics per container for the given network interface. +func (taskStat *StatsTask) getNetworkAdaptorStatistics(device string) (*dockerstats.NetworkStats, error) { + // Ref: https://docs.microsoft.com/en-us/powershell/module/netadapter/get-netadapterstatistics?view=windowsserver2019-ps + // The Get-NetAdapterStatistics cmdlet gets networking statistics from a network adapter. + // The statistics include broadcast, multicast, discards, and errors. + cmd := "Get-NetAdapterStatistics -Name \"" + device + "\" | Format-List -Property *" + out, err := execCommand("powershell", "-Command", cmd).CombinedOutput() + + if err != nil { + return nil, errors.Wrapf(err, "failed to run Get-NetAdapterStatistics for %s", device) + } + str := string(out) + + // Extract rawStats from the cmdlet output. + lines := strings.Split(str, "\n") + rawStats := make(map[string]string) + for _, line := range lines { + // populate all the network metrics in a map + kv := strings.Split(line, ":") + if len(kv) != 2 { + continue + } + key := strings.TrimSpace(kv[0]) + value := strings.TrimSpace(kv[1]) + rawStats[key] = value + } + + // Parse the required fields from the generated map. + parsedStats := make(map[string]uint64) + for _, key := range networkStatKeys { + value, err := taskStat.getMapValue(rawStats, key) + if err != nil { + return nil, err + } + parsedStats[key] = value + } + + numberOfContainers := uint64(taskStat.TaskMetadata.NumberContainers) + + return &dockerstats.NetworkStats{ + RxBytes: parsedStats[receivedBytes] / numberOfContainers, + RxPackets: (parsedStats[receivedBroadcastPackets] + parsedStats[receivedMulticastPackets] + parsedStats[receivedUnicastPackets]) / numberOfContainers, + RxErrors: parsedStats[receivedPacketErrors] / numberOfContainers, + RxDropped: parsedStats[receivedDiscardedPackets] / numberOfContainers, + TxBytes: parsedStats[sentBytes] / numberOfContainers, + TxPackets: (parsedStats[sentBroadcastPackets] + parsedStats[sentMulticastPackets] + parsedStats[sentUnicastPackets]) / numberOfContainers, + TxErrors: parsedStats[outboundPacketErrors] / numberOfContainers, + TxDropped: parsedStats[outboundDiscardedPackets] / numberOfContainers, + }, nil +} + +// getMapValue retrieves the value of the key from the given map. +func (taskStat *StatsTask) getMapValue(m map[string]string, key string) (uint64, error) { + v, ok := m[key] + if !ok { + return 0, errors.Errorf("failed to find key: %s in output", key) + } + val, err := strconv.ParseUint(v, 10, 64) + if err != nil { + return 0, errors.Errorf("failed to parse network stats for %s with value: %s", key, v) + } + return val, nil +} diff --git a/agent/stats/task_windows_test.go b/agent/stats/task_windows_test.go new file mode 100644 index 00000000000..874e930efdc --- /dev/null +++ b/agent/stats/task_windows_test.go @@ -0,0 +1,154 @@ +//go:build windows + +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package stats + +import ( + "context" + "fmt" + "os" + "os/exec" + "testing" + "time" + + apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" + apitask "github.com/aws/amazon-ecs-agent/agent/api/task" + apitaskstatus "github.com/aws/amazon-ecs-agent/agent/api/task/status" + mock_resolver "github.com/aws/amazon-ecs-agent/agent/stats/resolver/mock" + + dockerstats "github.com/docker/docker/api/types" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const ( + networkAdapterStatisticsResult = `ifAlias : Ethernet 3 +InterfaceAlias : Ethernet 3 +InterfaceDescription : Amazon Elastic Network Adapter #2 +Name : Ethernet 3 +Source : 2 +OutboundDiscardedPackets : 30 +OutboundPacketErrors : 20 +ReceivedBroadcastBytes : 247548 +ReceivedBroadcastPackets : 5894 +ReceivedBytes : 249578 +ReceivedDiscardedPackets : 10 +ReceivedMulticastBytes : 0 +ReceivedMulticastPackets : 0 +ReceivedPacketErrors : 4 +ReceivedUnicastBytes : 2030 +ReceivedUnicastPackets : 8 +SentBroadcastBytes : 2858 +SentBroadcastPackets : 26 +SentBytes : 256478 +SentMulticastBytes : 5995 +SentMulticastPackets : 65 +SentUnicastBytes : 247624 +SentUnicastPackets : 5895 +SupportedStatistics : 4163583` +) + +var expectedNetworkStats = dockerstats.NetworkStats{ + RxBytes: 249578, + RxPackets: 5902, + RxErrors: 4, + RxDropped: 10, + TxBytes: 256478, + TxPackets: 5986, + TxErrors: 20, + TxDropped: 30, + EndpointID: "", + InstanceID: "", +} + +// Supporting methods for network stats test. +func fakeExecCommandForNetworkStats(command string, args ...string) *exec.Cmd { + cs := []string{"-test.run=TestNetworkStatsProcess", "--", command} + cs = append(cs, args...) + cmd := exec.Command(os.Args[0], cs...) + cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"} + return cmd +} + +// TestNetworkStatsProcess is invoked from fakeExecCommand to return the network stats. +func TestNetworkStatsProcess(t *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + fmt.Fprintf(os.Stdout, networkAdapterStatisticsResult) + os.Exit(0) +} + +func TestTaskStatsCollection(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.TODO()) + resolver := mock_resolver.NewMockContainerMetadataResolver(ctrl) + execCommand = fakeExecCommandForNetworkStats + + containerPID := "23" + taskId := "task1" + numberOfContainers := 2 + + taskStats := &StatsTask{ + statsTaskCommon: &statsTaskCommon{ + TaskMetadata: &TaskMetadata{ + TaskArn: taskId, + ContainerPID: containerPID, + DeviceName: []string{"Ethernet 3"}, + NumberContainers: numberOfContainers, + }, + Ctx: ctx, + Cancel: cancel, + Resolver: resolver, + metricPublishInterval: time.Second, + }, + } + + testTask := &apitask.Task{ + Containers: []*apicontainer.Container{ + {Name: "c1"}, + {Name: "c2", Type: apicontainer.ContainerCNIPause}, + }, + KnownStatusUnsafe: apitaskstatus.TaskRunning, + } + resolver.EXPECT().ResolveTaskByARN(gomock.Any()).Return(testTask, nil).AnyTimes() + + taskStats.StartStatsCollection() + time.Sleep(checkPointSleep) + taskStats.StopStatsCollection() + + networkStatsSet, err := taskStats.StatsQueue.GetNetworkStatsSet() + assert.NoError(t, err) + assert.NotNil(t, networkStatsSet) + + rxBytesSum := (*networkStatsSet.RxBytes.SampleCount * (int64(expectedNetworkStats.RxBytes))) / int64(numberOfContainers) + assert.Equal(t, rxBytesSum, *networkStatsSet.RxBytes.Sum) + txBytesSum := (*networkStatsSet.TxBytes.SampleCount * (int64(expectedNetworkStats.TxBytes))) / int64(numberOfContainers) + assert.Equal(t, txBytesSum, *networkStatsSet.TxBytes.Sum) + rxPacketsSum := (*networkStatsSet.RxPackets.SampleCount * (int64(expectedNetworkStats.RxPackets))) / int64(numberOfContainers) + assert.Equal(t, rxPacketsSum, *networkStatsSet.RxPackets.Sum) + txPacketsSum := (*networkStatsSet.TxPackets.SampleCount * (int64(expectedNetworkStats.TxPackets))) / int64(numberOfContainers) + assert.Equal(t, txPacketsSum, *networkStatsSet.TxPackets.Sum) + rxErrorSum := (*networkStatsSet.RxErrors.SampleCount * (int64(expectedNetworkStats.RxErrors))) / int64(numberOfContainers) + assert.Equal(t, rxErrorSum, *networkStatsSet.RxErrors.Sum) + txErrorSum := (*networkStatsSet.TxErrors.SampleCount * (int64(expectedNetworkStats.TxErrors))) / int64(numberOfContainers) + assert.Equal(t, txErrorSum, *networkStatsSet.TxErrors.Sum) + rxDroppedSum := (*networkStatsSet.RxDropped.SampleCount * (int64(expectedNetworkStats.RxDropped))) / int64(numberOfContainers) + assert.Equal(t, rxDroppedSum, *networkStatsSet.RxDropped.Sum) + txDroppedSum := (*networkStatsSet.TxDropped.SampleCount * (int64(expectedNetworkStats.TxDropped))) / int64(numberOfContainers) + assert.Equal(t, txDroppedSum, *networkStatsSet.TxDropped.Sum) +}