Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to get host resources reserved for a task #3706

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/aws/amazon-ecs-agent/agent/acs/model/ecsacs"
apiappmesh "github.com/aws/amazon-ecs-agent/agent/api/appmesh"
"github.com/aws/amazon-ecs-agent/agent/api/container"
apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status"
apieni "github.com/aws/amazon-ecs-agent/agent/api/eni"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/credentials"
"github.com/aws/amazon-ecs-agent/agent/dockerclient"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
"github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs"
"github.com/aws/amazon-ecs-agent/agent/logger"
"github.com/aws/amazon-ecs-agent/agent/logger/field"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
Expand Down Expand Up @@ -3502,3 +3504,138 @@ func (task *Task) IsServiceConnectConnectionDraining() bool {
defer task.lock.RUnlock()
return task.ServiceConnectConnectionDrainingUnsafe
}

// ToHostResources will convert a task to a map of resources which ECS takes into account when scheduling tasks on instances
// * CPU
// - If task level CPU is set, use that
// - Else add up container CPUs
//
// * Memory
// - If task level memory is set, use that
// - Else add up container level
// - If memoryReservation field is set, use that
// - Else use memory field
//
// * Ports (TCP/UDP)
// - Only account for hostPort
// - Don't need to account for awsvpc mode, each task gets its own namespace
//
// * GPU
// - Return num of gpus requested (len of GPUIDs field)
//
// TODO remove this once ToHostResources is used
//
//lint:file-ignore U1000 Ignore all unused code
func (task *Task) ToHostResources() map[string]*ecs.Resource {
resources := make(map[string]*ecs.Resource)
// CPU
if task.CPU > 0 {
// cpu unit is vcpu at task level
// convert to cpushares
taskCPUint64 := int64(task.CPU * 1024)
resources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &taskCPUint64,
}
} else {
// cpu unit is cpushares at container level
containerCPUint64 := int64(0)
for _, container := range task.Containers {
containerCPUint64 += int64(container.CPU)
}
resources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &containerCPUint64,
}
}

// Memory
if task.Memory > 0 {
// memory unit is MiB at task level
taskMEMint64 := task.Memory
resources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &taskMEMint64,
}
} else {
containerMEMint64 := int64(0)
// To parse memory reservation / soft limit
hostConfig := &dockercontainer.HostConfig{}

for _, c := range task.Containers {
if c.DockerConfig.HostConfig != nil {
err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig)
if err != nil || hostConfig.MemoryReservation <= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curious, is there a case that MemoryReservation actually less than 0? How did we know it's unit with unmarshal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a scenario where it will be less than 0. Units are mentioned in comments here, also verified by manual runs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int64 implies that this is a signed integer (ie could be negative). You might also add another check to validate that the HostConfig is an int.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryReservation is the soft limit we are concerned with here, which is part of the HostConfig. I believe we are already checking if it is zero or negative by the check here hostConfig.MemoryReservation <= 0, so should be good imo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that MemoryReservation field in the HostConfig is zero-initialized by default so we'll always have it if we don't have a null HostConfig. Resolving.

Copy link
Contributor Author

@prateekchaudhry prateekchaudhry May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right, thanks!

// container memory unit is MiB, keeping as is
containerMEMint64 += int64(c.Memory)
} else {
// Soft limit is specified in MiB units but translated to bytes while being transferred to Agent
// Converting back to MiB
containerMEMint64 += hostConfig.MemoryReservation / (1024 * 1024)
}
} else {
// container memory unit is MiB, keeping as is
containerMEMint64 += int64(c.Memory)
}
}
resources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &containerMEMint64,
}
}

// PORTS_TCP and PORTS_UDP
var tcpPortSet []uint16
var udpPortSet []uint16

// AWSVPC tasks have 'host' ports mapped to task ENI, not to host
// So don't need to keep an 'account' of awsvpc tasks with host ports fields assigned
if !task.IsNetworkModeAWSVPC() {
for _, c := range task.Containers {
for _, port := range c.Ports {
hostPort := port.HostPort
protocol := port.Protocol
if hostPort > 0 && protocol == container.TransportProtocolTCP {
tcpPortSet = append(tcpPortSet, hostPort)
} else if hostPort > 0 && protocol == container.TransportProtocolUDP {
udpPortSet = append(udpPortSet, hostPort)
}
}
}
}
resources["PORTS_TCP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_TCP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: utils.Uint16SliceToStringSlice(tcpPortSet),
}
resources["PORTS_UDP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_UDP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: utils.Uint16SliceToStringSlice(udpPortSet),
}

// GPU
var num_gpus int64
num_gpus = 0
for _, c := range task.Containers {
num_gpus += int64(len(c.GPUIDs))
}
resources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &num_gpus,
}
logger.Debug("Task host resources to account for", logger.Fields{
"taskArn": task.Arn,
"CPU": *resources["CPU"].IntegerValue,
"MEMORY": *resources["MEMORY"].IntegerValue,
"PORTS_TCP": resources["PORTS_TCP"].StringSetValue,
"PORTS_UDP": resources["PORTS_UDP"].StringSetValue,
"GPU": *resources["GPU"].IntegerValue,
})
return resources
}
194 changes: 194 additions & 0 deletions agent/api/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/dockerclient"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
mock_dockerapi "github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi/mocks"
"github.com/aws/amazon-ecs-agent/agent/ecs_client/model/ecs"
mock_s3_factory "github.com/aws/amazon-ecs-agent/agent/s3/factory/mocks"
mock_ssm_factory "github.com/aws/amazon-ecs-agent/agent/ssm/factory/mocks"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
Expand Down Expand Up @@ -4802,3 +4803,196 @@ func TestInitializeAndGetCredentialSpecResource(t *testing.T) {
_, ok := task.GetCredentialSpecResource()
assert.True(t, ok)
}

func getTestTaskResourceMap(cpu int64, mem int64, ports []*string, portsUdp []*string, numGPUs int64) map[string]*ecs.Resource {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a well-written test case 🥇

taskResources := make(map[string]*ecs.Resource)
taskResources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &cpu,
}

taskResources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &mem,
}

taskResources["PORTS_TCP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_TCP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: ports,
}

taskResources["PORTS_UDP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_UDP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: portsUdp,
}

taskResources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &numGPUs,
}

return taskResources
}

func TestToHostResources(t *testing.T) {
//Prepare a simple hostConfig with memory reservation field for test cases
hostConfig := dockercontainer.HostConfig{
// 400 MiB
Resources: dockercontainer.Resources{
MemoryReservation: int64(419430400),
},
}

rawHostConfig, err := json.Marshal(&hostConfig)
if err != nil {
t.Fatal(err)
}

// Prefer task level, and check gpu assignment
testTask1 := &Task{
CPU: 1.0,
Memory: int64(512),
Containers: []*apicontainer.Container{
{
CPU: uint(1200),
Memory: uint(1200),
DockerConfig: apicontainer.DockerConfig{
HostConfig: strptr(string(rawHostConfig)),
},
GPUIDs: []string{"gpu1", "gpu2"},
},
},
}

// If task not set, use container level (MemoryReservation pref)
testTask2 := &Task{
Containers: []*apicontainer.Container{
{
CPU: uint(1200),
Memory: uint(1200),
DockerConfig: apicontainer.DockerConfig{
HostConfig: strptr(string(rawHostConfig)),
},
},
},
}

// If task not set, if MemoryReservation not set, use container level hard limit (c.Memory)
testTask3 := &Task{
Containers: []*apicontainer.Container{
{
CPU: uint(1200),
Memory: uint(1200),
DockerConfig: apicontainer.DockerConfig{},
},
},
}

// Check ports
testTask4 := &Task{
CPU: 1.0,
Memory: int64(512),
Containers: []*apicontainer.Container{
{
CPU: uint(1200),
Memory: uint(1200),
DockerConfig: apicontainer.DockerConfig{
HostConfig: strptr(string(rawHostConfig)),
},
Ports: []apicontainer.PortBinding{
{
ContainerPort: 10,
HostPort: 10,
BindIP: "",
Protocol: apicontainer.TransportProtocolTCP,
},
{
ContainerPort: 20,
HostPort: 20,
BindIP: "",
Protocol: apicontainer.TransportProtocolUDP,
},
{
ContainerPortRange: "99-999",
BindIP: "",
Protocol: apicontainer.TransportProtocolTCP,
},
{
ContainerPortRange: "121-221",
BindIP: "",
Protocol: apicontainer.TransportProtocolUDP,
},
},
},
},
}

portsTCP := []uint16{10}
portsUDP := []uint16{20}

testCases := []struct {
task *Task
expectedResources map[string]*ecs.Resource
}{
{
task: testTask1,
expectedResources: getTestTaskResourceMap(int64(1024), int64(512), []*string{}, []*string{}, int64(2)),
},
{
task: testTask2,
expectedResources: getTestTaskResourceMap(int64(1200), int64(400), []*string{}, []*string{}, int64(0)),
},
{
task: testTask3,
expectedResources: getTestTaskResourceMap(int64(1200), int64(1200), []*string{}, []*string{}, int64(0)),
},
{
task: testTask4,
expectedResources: getTestTaskResourceMap(int64(1024), int64(512), utils.Uint16SliceToStringSlice(portsTCP), utils.Uint16SliceToStringSlice(portsUDP), int64(0)),
},
}

for _, tc := range testCases {
calcResources := tc.task.ToHostResources()

//CPU
assert.Equal(t, tc.expectedResources["CPU"].IntegerValue, calcResources["CPU"].IntegerValue, "Error converting task CPU tesources")

//MEMORY
assert.Equal(t, tc.expectedResources["MEMORY"].IntegerValue, calcResources["MEMORY"].IntegerValue, "Error converting task Memory tesources")

//GPU
assert.Equal(t, tc.expectedResources["GPU"].IntegerValue, calcResources["GPU"].IntegerValue, "Error converting task GPU tesources")

//PORTS
for _, expectedPort := range tc.expectedResources["PORTS_TCP"].StringSetValue {
found := false
for _, calcPort := range calcResources["PORTS_TCP"].StringSetValue {
if *expectedPort == *calcPort {
found = true
break
}
}
assert.True(t, found, "Could not convert TCP port resources")
}
assert.Equal(t, len(tc.expectedResources["PORTS_TCP"].StringSetValue), len(calcResources["PORTS_TCP"].StringSetValue), "Error converting task TCP port tesources")

//PORTS_UDP
for _, expectedPort := range tc.expectedResources["PORTS_UDP"].StringSetValue {
found := false
for _, calcPort := range calcResources["PORTS_UDP"].StringSetValue {
if *expectedPort == *calcPort {
found = true
break
}
}
assert.True(t, found, "Could not convert UDP port resources")
}
assert.Equal(t, len(tc.expectedResources["PORTS_UDP"].StringSetValue), len(calcResources["PORTS_UDP"].StringSetValue), "Error converting task UDP port tesources")
}
}