diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 43710a26fe7..9b46e297340 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -26,6 +26,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs" apiappmesh "github.com/aws/amazon-ecs-agent/agent/api/appmesh" + "github.com/aws/amazon-ecs-agent/agent/api/container" apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status" apieni "github.com/aws/amazon-ecs-agent/agent/api/eni" @@ -36,6 +37,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/credentials" "github.com/aws/amazon-ecs-agent/agent/dockerclient" "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" + "github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs" "github.com/aws/amazon-ecs-agent/agent/logger" "github.com/aws/amazon-ecs-agent/agent/logger/field" "github.com/aws/amazon-ecs-agent/agent/taskresource" @@ -3502,3 +3504,138 @@ func (task *Task) IsServiceConnectConnectionDraining() bool { defer task.lock.RUnlock() return task.ServiceConnectConnectionDrainingUnsafe } + +// ToHostResources will convert a task to a map of resources which ECS takes into account when scheduling tasks on instances +// * CPU +// - If task level CPU is set, use that +// - Else add up container CPUs +// +// * Memory +// - If task level memory is set, use that +// - Else add up container level +// - If memoryReservation field is set, use that +// - Else use memory field +// +// * Ports (TCP/UDP) +// - Only account for hostPort +// - Don't need to account for awsvpc mode, each task gets its own namespace +// +// * GPU +// - Return num of gpus requested (len of GPUIDs field) +// +// TODO remove this once ToHostResources is used +// +//lint:file-ignore U1000 Ignore all unused code +func (task *Task) ToHostResources() map[string]*ecs.Resource { + resources := make(map[string]*ecs.Resource) + // CPU + if task.CPU > 0 { + // cpu unit is vcpu at task level + // convert to cpushares + taskCPUint64 := int64(task.CPU * 1024) + resources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &taskCPUint64, + } + } else { + // cpu unit is cpushares at container level + containerCPUint64 := int64(0) + for _, container := range task.Containers { + containerCPUint64 += int64(container.CPU) + } + resources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &containerCPUint64, + } + } + + // Memory + if task.Memory > 0 { + // memory unit is MiB at task level + taskMEMint64 := task.Memory + resources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &taskMEMint64, + } + } else { + containerMEMint64 := int64(0) + // To parse memory reservation / soft limit + hostConfig := &dockercontainer.HostConfig{} + + for _, c := range task.Containers { + if c.DockerConfig.HostConfig != nil { + err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig) + if err != nil || hostConfig.MemoryReservation <= 0 { + // container memory unit is MiB, keeping as is + containerMEMint64 += int64(c.Memory) + } else { + // Soft limit is specified in MiB units but translated to bytes while being transferred to Agent + // Converting back to MiB + containerMEMint64 += hostConfig.MemoryReservation / (1024 * 1024) + } + } else { + // container memory unit is MiB, keeping as is + containerMEMint64 += int64(c.Memory) + } + } + resources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &containerMEMint64, + } + } + + // PORTS_TCP and PORTS_UDP + var tcpPortSet []uint16 + var udpPortSet []uint16 + + // AWSVPC tasks have 'host' ports mapped to task ENI, not to host + // So don't need to keep an 'account' of awsvpc tasks with host ports fields assigned + if !task.IsNetworkModeAWSVPC() { + for _, c := range task.Containers { + for _, port := range c.Ports { + hostPort := port.HostPort + protocol := port.Protocol + if hostPort > 0 && protocol == container.TransportProtocolTCP { + tcpPortSet = append(tcpPortSet, hostPort) + } else if hostPort > 0 && protocol == container.TransportProtocolUDP { + udpPortSet = append(udpPortSet, hostPort) + } + } + } + } + resources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: utils.Uint16SliceToStringSlice(tcpPortSet), + } + resources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: utils.Uint16SliceToStringSlice(udpPortSet), + } + + // GPU + var num_gpus int64 + num_gpus = 0 + for _, c := range task.Containers { + num_gpus += int64(len(c.GPUIDs)) + } + resources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &num_gpus, + } + logger.Debug("Task host resources to account for", logger.Fields{ + "taskArn": task.Arn, + "CPU": *resources["CPU"].IntegerValue, + "MEMORY": *resources["MEMORY"].IntegerValue, + "PORTS_TCP": resources["PORTS_TCP"].StringSetValue, + "PORTS_UDP": resources["PORTS_UDP"].StringSetValue, + "GPU": *resources["GPU"].IntegerValue, + }) + return resources +} diff --git a/agent/api/task/task_test.go b/agent/api/task/task_test.go index 8f3a2f8fb03..30fbf8d1824 100644 --- a/agent/api/task/task_test.go +++ b/agent/api/task/task_test.go @@ -45,6 +45,7 @@ import ( "github.com/aws/amazon-ecs-agent/agent/dockerclient" "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi" mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks" + "github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs" mock_s3_factory "github.com/aws/amazon-ecs-agent/agent/s3/factory/mocks" mock_ssm_factory "github.com/aws/amazon-ecs-agent/agent/ssm/factory/mocks" "github.com/aws/amazon-ecs-agent/agent/taskresource" @@ -4802,3 +4803,196 @@ func TestInitializeAndGetCredentialSpecResource(t *testing.T) { _, ok := task.GetCredentialSpecResource() assert.True(t, ok) } + +func getTestTaskResourceMap(cpu int64, mem int64, ports []*string, portsUdp []*string, numGPUs int64) map[string]*ecs.Resource { + taskResources := make(map[string]*ecs.Resource) + taskResources["CPU"] = &ecs.Resource{ + Name: utils.Strptr("CPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &cpu, + } + + taskResources["MEMORY"] = &ecs.Resource{ + Name: utils.Strptr("MEMORY"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &mem, + } + + taskResources["PORTS_TCP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_TCP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: ports, + } + + taskResources["PORTS_UDP"] = &ecs.Resource{ + Name: utils.Strptr("PORTS_UDP"), + Type: utils.Strptr("STRINGSET"), + StringSetValue: portsUdp, + } + + taskResources["GPU"] = &ecs.Resource{ + Name: utils.Strptr("GPU"), + Type: utils.Strptr("INTEGER"), + IntegerValue: &numGPUs, + } + + return taskResources +} + +func TestToHostResources(t *testing.T) { + //Prepare a simple hostConfig with memory reservation field for test cases + hostConfig := dockercontainer.HostConfig{ + // 400 MiB + Resources: dockercontainer.Resources{ + MemoryReservation: int64(419430400), + }, + } + + rawHostConfig, err := json.Marshal(&hostConfig) + if err != nil { + t.Fatal(err) + } + + // Prefer task level, and check gpu assignment + testTask1 := &Task{ + CPU: 1.0, + Memory: int64(512), + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{ + HostConfig: strptr(string(rawHostConfig)), + }, + GPUIDs: []string{"gpu1", "gpu2"}, + }, + }, + } + + // If task not set, use container level (MemoryReservation pref) + testTask2 := &Task{ + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{ + HostConfig: strptr(string(rawHostConfig)), + }, + }, + }, + } + + // If task not set, if MemoryReservation not set, use container level hard limit (c.Memory) + testTask3 := &Task{ + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{}, + }, + }, + } + + // Check ports + testTask4 := &Task{ + CPU: 1.0, + Memory: int64(512), + Containers: []*apicontainer.Container{ + { + CPU: uint(1200), + Memory: uint(1200), + DockerConfig: apicontainer.DockerConfig{ + HostConfig: strptr(string(rawHostConfig)), + }, + Ports: []apicontainer.PortBinding{ + { + ContainerPort: 10, + HostPort: 10, + BindIP: "", + Protocol: apicontainer.TransportProtocolTCP, + }, + { + ContainerPort: 20, + HostPort: 20, + BindIP: "", + Protocol: apicontainer.TransportProtocolUDP, + }, + { + ContainerPortRange: "99-999", + BindIP: "", + Protocol: apicontainer.TransportProtocolTCP, + }, + { + ContainerPortRange: "121-221", + BindIP: "", + Protocol: apicontainer.TransportProtocolUDP, + }, + }, + }, + }, + } + + portsTCP := []uint16{10} + portsUDP := []uint16{20} + + testCases := []struct { + task *Task + expectedResources map[string]*ecs.Resource + }{ + { + task: testTask1, + expectedResources: getTestTaskResourceMap(int64(1024), int64(512), []*string{}, []*string{}, int64(2)), + }, + { + task: testTask2, + expectedResources: getTestTaskResourceMap(int64(1200), int64(400), []*string{}, []*string{}, int64(0)), + }, + { + task: testTask3, + expectedResources: getTestTaskResourceMap(int64(1200), int64(1200), []*string{}, []*string{}, int64(0)), + }, + { + task: testTask4, + expectedResources: getTestTaskResourceMap(int64(1024), int64(512), utils.Uint16SliceToStringSlice(portsTCP), utils.Uint16SliceToStringSlice(portsUDP), int64(0)), + }, + } + + for _, tc := range testCases { + calcResources := tc.task.ToHostResources() + + //CPU + assert.Equal(t, tc.expectedResources["CPU"].IntegerValue, calcResources["CPU"].IntegerValue, "Error converting task CPU tesources") + + //MEMORY + assert.Equal(t, tc.expectedResources["MEMORY"].IntegerValue, calcResources["MEMORY"].IntegerValue, "Error converting task Memory tesources") + + //GPU + assert.Equal(t, tc.expectedResources["GPU"].IntegerValue, calcResources["GPU"].IntegerValue, "Error converting task GPU tesources") + + //PORTS + for _, expectedPort := range tc.expectedResources["PORTS_TCP"].StringSetValue { + found := false + for _, calcPort := range calcResources["PORTS_TCP"].StringSetValue { + if *expectedPort == *calcPort { + found = true + break + } + } + assert.True(t, found, "Could not convert TCP port resources") + } + assert.Equal(t, len(tc.expectedResources["PORTS_TCP"].StringSetValue), len(calcResources["PORTS_TCP"].StringSetValue), "Error converting task TCP port tesources") + + //PORTS_UDP + for _, expectedPort := range tc.expectedResources["PORTS_UDP"].StringSetValue { + found := false + for _, calcPort := range calcResources["PORTS_UDP"].StringSetValue { + if *expectedPort == *calcPort { + found = true + break + } + } + assert.True(t, found, "Could not convert UDP port resources") + } + assert.Equal(t, len(tc.expectedResources["PORTS_UDP"].StringSetValue), len(calcResources["PORTS_UDP"].StringSetValue), "Error converting task UDP port tesources") + } +}