Skip to content

Commit

Permalink
feature: 支持容器扩展资源调度机制 TencentBlueKing#424
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Apr 21, 2020
1 parent 82d8341 commit b385e92
Show file tree
Hide file tree
Showing 18 changed files with 340 additions and 446 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pre:
@echo "git tag: ${GITTAG}"
mkdir -p ${PACKAGEPATH}
mkdir -p ${EXPORTPATH}
go fmt ./...
cd ./scripts && chmod +x vet.sh && ./vet.sh

api:pre
mkdir -p ${PACKAGEPATH}/bcs-services
Expand Down
6 changes: 3 additions & 3 deletions bcs-common/pkg/mesosdriver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMesosPlatform(conf *Config) (*MesosDriverClient, error) {
//init http client
m.cli = httpclient.NewHttpClient()
//if https
if m.conf.ClientCert!=nil && m.conf.ClientCert.IsSSL {
if m.conf.ClientCert != nil && m.conf.ClientCert.IsSSL {
blog.Infof("NetworkDetection http client cert ssl")
m.cli.SetTlsVerity(m.conf.ClientCert.CAFile, m.conf.ClientCert.CertFile, m.conf.ClientCert.KeyFile,
m.conf.ClientCert.CertPasswd)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *MesosDriverClient) getModuleAddr(clusterid string) (string, error) {

//update agent external resources
func (m *MesosDriverClient) UpdateAgentExtendedResources(er *commtypes.ExtendedResource) error {
by,_ := json.Marshal(er)
by, _ := json.Marshal(er)
_, err := m.requestMesosApiserver(m.conf.ClusterId, http.MethodPut, "agentsettings/extendedresource", by)
if err != nil {
blog.Errorf("update agent %s external resources error %s", er.InnerIP, err.Error())
Expand All @@ -107,7 +107,7 @@ func (m *MesosDriverClient) requestMesosApiserver(clusterid, method, url string,
}
uri := fmt.Sprintf("%s/mesosdriver/v4/%s", addr, url)
m.cli.SetHeader("BCS-ClusterID", clusterid)
blog.V(3).Infof("request %s body(%s)",uri, string(payload))
blog.V(3).Infof("request %s body(%s)", uri, string(payload))

var by []byte
switch method {
Expand Down
30 changes: 14 additions & 16 deletions bcs-mesos/bcs-container-executor/container/cni/cni_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewPod(operator container.Container, tasks []*container.BcsContainerTask,
conTasks: taskMap,
networkTaskId: tasks[0].TaskId,
runningContainer: make(map[string]*container.BcsContainerInfo),
pluginManager: device_plugin_manager.NewDevicePluginManager(),
pluginManager: device_plugin_manager.NewDevicePluginManager(),
}
if len(tasks[0].NetworkIPAddr) != 0 {
//ip injected by executor
Expand Down Expand Up @@ -138,7 +138,7 @@ type CNIPod struct {
networkTaskId string
netImage string
//device plugin manager
pluginManager *device_plugin_manager.DevicePluginManager
pluginManager *device_plugin_manager.DevicePluginManager
}

//IsHealthy check pod is healthy
Expand Down Expand Up @@ -254,7 +254,7 @@ func (p *CNIPod) Init() error {
HostName: p.cniHostName,
}
p.netTask.Resource = &bcstypes.Resource{
Cpus: 1,
Cpus: 1,
}

netflag := container.BcsKV{
Expand Down Expand Up @@ -390,32 +390,29 @@ func (p *CNIPod) Start() error {
task.HostName = ""
var extendedErr error
//if task contains extended resources, need connect device plugin to allocate resources
for _,ex :=range task.ExtendedResources {
for _, ex := range task.ExtendedResources {
logs.Infof("task %s contains extended resource %s, then allocate it", task.TaskId, ex.Name)
deviceIds,err := p.pluginManager.ListAndWatch(ex)
if err!=nil {
logs.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
deviceIds, err := p.pluginManager.ListAndWatch(ex)
if err != nil {
extendedErr = fmt.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
task.TaskId, ex.Name, err.Error())
extendedErr = err
break
}

//allocate device
if len(deviceIds)<int(ex.Value) {
if len(deviceIds) < int(ex.Value) {
extendedErr = fmt.Errorf("extended resources %s Capacity %d, not enough", ex.Name, len(deviceIds))
logs.Errorf(extendedErr.Error())
break
}
envs,err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err!=nil {
logs.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
envs, err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err != nil {
extendedErr = fmt.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
task.TaskId, ex.Name, deviceIds[:int(ex.Value)], err.Error())
extendedErr = err
break
}

//append response docker envs to task.envs
for k,v :=range envs {
for k, v := range envs {
kv := container.BcsKV{
Key: k,
Value: v,
Expand All @@ -425,7 +422,8 @@ func (p *CNIPod) Start() error {
}

//if allocate extended resource failed, then return and exit
if extendedErr!=nil {
if extendedErr != nil {
logs.Errorf(extendedErr.Error())
task.RuntimeConf.Status = container.ContainerStatus_EXITED
task.RuntimeConf.Message = extendedErr.Error()
p.startFailedStop(extendedErr)
Expand Down
54 changes: 28 additions & 26 deletions bcs-mesos/bcs-container-executor/container/cnm/cnm_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type DockerPod struct {
conTasks map[string]*container.BcsContainerTask //task for running containers, key is taskID
runningContainer map[string]*container.BcsContainerInfo //running container Name list for monitor
//device plugin manager
pluginManager *device_plugin_manager.DevicePluginManager
pluginManager *device_plugin_manager.DevicePluginManager
}

//IsHealthy check pod is healthy
Expand Down Expand Up @@ -195,39 +195,43 @@ func (p *DockerPod) Init() error {
container.EnvOperCopy(p.netTask)
var extendedErr error
//if task contains extended resources, need connect device plugin to allocate resources
for _,ex :=range p.netTask.ExtendedResources {
for _, ex := range p.netTask.ExtendedResources {
logs.Infof("task %s contains extended resource %s, then allocate it", p.netTask.TaskId, ex.Name)
deviceIds,err := p.pluginManager.ListAndWatch(ex)
if err!=nil {
logs.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
deviceIds, err := p.pluginManager.ListAndWatch(ex)
if err != nil {
extendedErr = fmt.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
p.netTask.TaskId, ex.Name, err.Error())
extendedErr = err
break
}

//allocate device
if len(deviceIds)<int(ex.Value) {
if len(deviceIds) < int(ex.Value) {
extendedErr = fmt.Errorf("extended resources %s Capacity %d, not enough", ex.Name, len(deviceIds))
logs.Errorf(extendedErr.Error())
break
}
envs,err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err!=nil {
logs.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
envs, err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err != nil {
extendedErr = fmt.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
p.netTask.TaskId, ex.Name, deviceIds[:int(ex.Value)], err.Error())
extendedErr = err
break
}

//append response docker envs to task.envs
for k,v :=range envs {
for k, v := range envs {
kv := container.BcsKV{
Key: k,
Value: v,
}
p.netTask.Env = append(p.netTask.Env, kv)
}
}
//if allocate extended resource failed, then return and exit
if extendedErr != nil {
logs.Errorf(extendedErr.Error())
p.status = container.PodStatus_FAILED
p.message = extendedErr.Error()
return extendedErr
}

//fix(developerJim): all containers in pod can not create PortMappings separately,
// so we need to copy all PortMappings from other containers to Network
Expand Down Expand Up @@ -322,32 +326,29 @@ func (p *DockerPod) Start() error {
container.EnvOperCopy(task)
var extendedErr error
//if task contains extended resources, need connect device plugin to allocate resources
for _,ex :=range task.ExtendedResources {
for _, ex := range task.ExtendedResources {
blog.Infof("task %s contains extended resource %s, then allocate it", task.TaskId, ex.Name)
deviceIds,err := p.pluginManager.ListAndWatch(ex)
if err!=nil {
logs.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
deviceIds, err := p.pluginManager.ListAndWatch(ex)
if err != nil {
extendedErr = fmt.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
task.TaskId, ex.Name, err.Error())
extendedErr = err
break
}

//allocate device
if len(deviceIds)<int(ex.Value) {
if len(deviceIds) < int(ex.Value) {
extendedErr = fmt.Errorf("extended resources %s Capacity %d, not enough", ex.Name, len(deviceIds))
logs.Errorf(extendedErr.Error())
break
}
envs,err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err!=nil {
logs.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
envs, err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err != nil {
extendedErr = fmt.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
task.TaskId, ex.Name, deviceIds[:int(ex.Value)], err.Error())
extendedErr = err
break
}

//append response docker envs to task.envs
for k,v :=range envs {
for k, v := range envs {
kv := container.BcsKV{
Key: k,
Value: v,
Expand All @@ -357,7 +358,8 @@ func (p *DockerPod) Start() error {
}

//if allocate extended resource failed, then return and exit
if extendedErr!=nil {
if extendedErr != nil {
logs.Errorf(extendedErr.Error())
task.RuntimeConf.Status = container.ContainerStatus_EXITED
task.RuntimeConf.Message = extendedErr.Error()
p.startFailedStop(extendedErr)
Expand Down
50 changes: 25 additions & 25 deletions bcs-mesos/bcs-container-executor/container/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,32 @@ type BcsVolume struct {

//BcsContainerTask task info for running container
type BcsContainerTask struct {
Name string //container name
Image string //container image
HostName string //container hostname
Hosts []string //host:ip pair for /etc/hosts in container
Command string //container command
Args []string //args for command
Env []BcsKV //environments
Volums []BcsVolume //host path mount
NetworkName string //container network name
NetworkIPAddr string //container ip address request
ForcePullImage bool //pull image every time
OOMKillDisabled bool //OOM kill feature, default is true
AutoRemove bool //remove container when exit, default false
Ulimits []BcsKV //ulimit for docker parameter
ShmSize int64 //docker hostconfig shm size, 1 = 1B
Privileged bool //setting container privileged
PublishAllPorts bool //publish all ports in container
PortBindings map[string]BcsPort //port for container reflection, only useful for docker bridge
Labels []BcsKV //label for container
Resource *bcstypes.Resource //container resource request
LimitResource *bcstypes.Resource // container resource limit
Name string //container name
Image string //container image
HostName string //container hostname
Hosts []string //host:ip pair for /etc/hosts in container
Command string //container command
Args []string //args for command
Env []BcsKV //environments
Volums []BcsVolume //host path mount
NetworkName string //container network name
NetworkIPAddr string //container ip address request
ForcePullImage bool //pull image every time
OOMKillDisabled bool //OOM kill feature, default is true
AutoRemove bool //remove container when exit, default false
Ulimits []BcsKV //ulimit for docker parameter
ShmSize int64 //docker hostconfig shm size, 1 = 1B
Privileged bool //setting container privileged
PublishAllPorts bool //publish all ports in container
PortBindings map[string]BcsPort //port for container reflection, only useful for docker bridge
Labels []BcsKV //label for container
Resource *bcstypes.Resource //container resource request
LimitResource *bcstypes.Resource // container resource limit
ExtendedResources []*comtypes.ExtendedResource //extended resources
BcsMessages []*bcstypes.BcsMessage //bcs define message
RuntimeConf *BcsContainerInfo //container runtime info
HealthCheck healthcheck.Checker //for health check
KillPolicy int //kill policy timeout, unit is seconds
BcsMessages []*bcstypes.BcsMessage //bcs define message
RuntimeConf *BcsContainerInfo //container runtime info
HealthCheck healthcheck.Checker //for health check
KillPolicy int //kill policy timeout, unit is seconds
//container network flow limit args
NetLimit *comtypes.NetLimit
TaskId string
Expand Down
Loading

0 comments on commit b385e92

Please sign in to comment.