Skip to content

Commit

Permalink
feature: 支持容器扩展资源调度机制 TencentBlueKing#424
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg committed Apr 20, 2020
1 parent c2b571d commit 82d8341
Show file tree
Hide file tree
Showing 21 changed files with 265 additions and 29 deletions.
4 changes: 3 additions & 1 deletion bcs-common/pkg/mesosdriver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (m *MesosDriverClient) getModuleAddr(clusterid string) (string, error) {

//update agent external resources
func (m *MesosDriverClient) UpdateAgentExtendedResources(er *commtypes.ExtendedResource) error {
_, err := m.requestMesosApiserver(m.conf.ClusterId, http.MethodPut, "agentsettings/extendedresources", nil)
by,_ := json.Marshal(er)
_, err := m.requestMesosApiserver(m.conf.ClusterId, http.MethodPut, "agentsettings/extendedresource", by)
if err != nil {
blog.Errorf("update agent %s external resources error %s", er.InnerIP, err.Error())
return err
Expand All @@ -106,6 +107,7 @@ func (m *MesosDriverClient) requestMesosApiserver(clusterid, method, url string,
}
uri := fmt.Sprintf("%s/mesosdriver/v4/%s", addr, url)
m.cli.SetHeader("BCS-ClusterID", clusterid)
blog.V(3).Infof("request %s body(%s)",uri, string(payload))

var by []byte
switch method {
Expand Down
4 changes: 2 additions & 2 deletions bcs-mesos/bcs-container-executor/container/cni/cni_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package cni

import (
"bk-bcs/bcs-common/common/blog"
comtypes "bk-bcs/bcs-common/common/types"
"bk-bcs/bcs-mesos/bcs-container-executor/container"
device_plugin_manager "bk-bcs/bcs-mesos/bcs-container-executor/device-plugin-manager"
Expand Down Expand Up @@ -275,6 +274,7 @@ func (p *CNIPod) Init() error {
for _, task := range p.conTasks {
p.netTask.Labels = append(p.netTask.Labels, task.Labels...)
}

//step 2: starting network container
var createErr error
if p.networkName == "host" {
Expand Down Expand Up @@ -391,7 +391,7 @@ func (p *CNIPod) Start() error {
var extendedErr error
//if task contains extended resources, need connect device plugin to allocate resources
for _,ex :=range task.ExtendedResources {
blog.Infof("task %s contains extended resource %s, then allocate it", task.TaskId, ex.Name)
logs.Infof("task %s contains extended resource %s, then allocate it", task.TaskId, ex.Name)
deviceIds,err := p.pluginManager.ListAndWatch(ex)
if err!=nil {
logs.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
Expand Down
35 changes: 35 additions & 0 deletions bcs-mesos/bcs-container-executor/container/cnm/cnm_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,41 @@ func (p *DockerPod) Init() error {
p.netTask.Env = append(p.netTask.Env, envHost)
//assignment for environments
container.EnvOperCopy(p.netTask)
var extendedErr error
//if task contains extended resources, need connect device plugin to allocate resources
for _,ex :=range p.netTask.ExtendedResources {
logs.Infof("task %s contains extended resource %s, then allocate it", p.netTask.TaskId, ex.Name)
deviceIds,err := p.pluginManager.ListAndWatch(ex)
if err!=nil {
logs.Errorf("task %s ListAndWatch extended resources %s failed, err: %s\n",
p.netTask.TaskId, ex.Name, err.Error())
extendedErr = err
break
}

//allocate device
if len(deviceIds)<int(ex.Value) {
extendedErr = fmt.Errorf("extended resources %s Capacity %d, not enough", ex.Name, len(deviceIds))
logs.Errorf(extendedErr.Error())
break
}
envs,err := p.pluginManager.Allocate(ex, deviceIds[:int(ex.Value)])
if err!=nil {
logs.Errorf("task %s extended resources %s Allocate deviceIds(%v) failed, err: %s\n",
p.netTask.TaskId, ex.Name, deviceIds[:int(ex.Value)], err.Error())
extendedErr = err
break
}

//append response docker envs to task.envs
for k,v :=range envs {
kv := container.BcsKV{
Key: k,
Value: v,
}
p.netTask.Env = append(p.netTask.Env, kv)
}
}

//fix(developerJim): all containers in pod can not create PortMappings separately,
// so we need to copy all PortMappings from other containers to Network
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,15 @@ func (s *Scheduler) setVersionWithPodSpec(version *types.Version, spec *bcstype.
container.Resources.Cpus, _ = strconv.ParseFloat(c.Resources.Requests.Cpu, 64)
container.Resources.Mem, _ = strconv.ParseFloat(c.Resources.Requests.Mem, 64)
container.Resources.Disk, _ = strconv.ParseFloat(c.Resources.Requests.Storage, 64)
//extended resources
container.ExtendedResources = c.Resources.ExtendedResources

container.DataClass = &types.DataClass{
Resources: new(types.Resource),
Msgs: []*types.BcsMessage{},
}

//extended resources
container.DataClass.ExtendedResources = c.Resources.ExtendedResources
//request resources
container.DataClass.Resources = container.Resources
//limit resources
container.DataClass.LimitResources = container.LimitResoures

//set network flow limit parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (s *Scheduler) initActions() {
httpserver.NewAction("POST", "/agentsettings/enable", nil, s.enableAgentListHandler),
httpserver.NewAction("POST", "/agentsettings/disable", nil, s.disableAgentListHandler),
httpserver.NewAction("PUT", "/agentsettings/taint", nil, s.taintAgentsHandler),
httpserver.NewAction("PUT", "/agentsettings/extendedresource", nil, s.updateExtendedResourcesHandler),
/*================= agentsetting ====================*/

/*-------------- custom resource deprecated from 1.15.x -----------------*/
Expand Down Expand Up @@ -568,6 +569,29 @@ func (s *Scheduler) taintAgentsHandler(req *restful.Request, resp *restful.Respo
resp.Write([]byte(reply))
}

func (s *Scheduler) updateExtendedResourcesHandler(req *restful.Request, resp *restful.Response) {
if s.GetHost() == "" {
blog.Error("no scheduler is connected by driver")
err := bhttp.InternalError(common.BcsErrCommHttpDo, common.BcsErrCommHttpDoStr+"scheduler not exist")
resp.Write([]byte(err.Error()))
return
}

body, _ := s.getRequestInfo(req)
url := s.GetHost() + "/v1/agentsettings/extendedresource"
blog.Infof("put url(%s) body(%s)", url, string(body))

reply, err := s.client.PUT(url, nil, body)
if err != nil {
blog.Error("request to url(%s) failed! err(%s)", url, err.Error())
err = bhttp.InternalError(common.BcsErrCommHttpDo, common.BcsErrCommHttpDoStr+err.Error())
resp.Write([]byte(err.Error()))
return
}

resp.Write([]byte(reply))
}

func (s *Scheduler) GetClusterResourcesHandler(req *restful.Request, resp *restful.Response) {

blog.V(3).Infof("get cluster resources")
Expand Down
2 changes: 1 addition & 1 deletion bcs-mesos/bcs-scheduler/src/manager/sched/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (r *Router) initRoutes() {
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/enable", nil, r.enableAgentList))
r.actions = append(r.actions, httpserver.NewAction("POST", "/agentsettings/disable", nil, r.disableAgentList))
r.actions = append(r.actions, httpserver.NewAction("PUT", "/agentsettings/taint", nil, r.taintAgents))
r.actions = append(r.actions, httpserver.NewAction("PUT", "/agentsettings/extendedresource", nil, r.taintAgents))
r.actions = append(r.actions, httpserver.NewAction("PUT", "/agentsettings/extendedresource", nil, r.updateExtendedResource))
/*-------------- agent setting ---------------*/

/*-------------- custom resource -----------------*/
Expand Down
4 changes: 4 additions & 0 deletions bcs-mesos/bcs-scheduler/src/manager/sched/offer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package offer
import (
typesplugin "bk-bcs/bcs-common/common/plugin"
commtype "bk-bcs/bcs-common/common/types"
"bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/mesos"
"bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"container/list"
Expand Down Expand Up @@ -116,4 +117,7 @@ type OfferPara struct {
//DefaultLostSlaveGracePeriod
//if you don't specify, it will the const DefaultOfferLifePeriod
OfferlifePeriod int

//store
Store store.Store
}
25 changes: 23 additions & 2 deletions bcs-mesos/bcs-scheduler/src/manager/sched/offer/offerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bk-bcs/bcs-common/common/blog"
typesplugin "bk-bcs/bcs-common/common/plugin"
commtype "bk-bcs/bcs-common/common/types"
"bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/mesos"
"bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"container/list"
Expand Down Expand Up @@ -57,7 +58,10 @@ type offerPool struct {

offerEvents chan []*mesos.Offer

//scheduler manager
scheduler SchedManager
//store
store store.Store

lostSlaveGracePeriod int
offerLifePeriod int
Expand All @@ -77,6 +81,7 @@ func NewOfferPool(para *OfferPara) OfferPool {
scheduler: para.Sched,
lostSlaves: make(map[string]int64, 0),
offerEvents: make(chan []*mesos.Offer, DefaultOfferEventLength),
store: para.Store,
}

if para.LostSlaveGracePeriod > 0 {
Expand Down Expand Up @@ -567,7 +572,14 @@ func (p *offerPool) setInnerOffersAttributes(offers []*mesos.Offer) {
}

if setting == nil {
blog.V(3).Infof("FetchAgentSetting ip %s is nil", ip)
blog.Infof("Fetch AgentSetting %s is nil, then create it", ip)
setting = &commtype.BcsClusterAgentSetting{
InnerIP: ip,
}
err = p.store.SaveAgentSetting(setting)
if err!=nil {
blog.Errorf("save agentsetting %s error %s", ip, err.Error())
}
continue
}

Expand Down Expand Up @@ -751,6 +763,9 @@ func (p *offerPool) addOfferAttributes(offer *mesos.Offer, agentSetting *commtyp
blog.Errorf("FetchTaskGroup %s failed: %s", id, err.Error())
continue
}
if pod.Status!=types.TASKGROUP_STATUS_RUNNING {
blog.V(3).Infof("taskgroup %s status %s ")
}

pods = append(pods, pod)
}
Expand All @@ -769,21 +784,27 @@ func (p *offerPool) addOfferAttributes(offer *mesos.Offer, agentSetting *commtyp
}
}
}

by,_ := json.Marshal(allocatedResources)
blog.Infof("extended resources %s", string(by))
//extended resources, agentsetting have total extended resources
for _, ex := range agentSetting.ExtendedResources {
//if the extended resources have allocated, then minus it
allocated := allocatedResources[ex.Name]
var value float64
if allocated != nil {
value = ex.Capacity - allocated.Value
} else {
value = ex.Capacity
}
//current device plugin socket set int mesos.resource.Role parameter
socket := ex.Socket
r := &mesos.Resource{
Name: &ex.Name,
Type: mesos.Value_SCALAR.Enum(),
Scalar: &mesos.Value_Scalar{
Value: &value,
},
Role: &socket,
}
offer.Resources = append(offer.Resources, r)
blog.Infof("offer(%s:%s) add Extended Resources(%s:%f) from agentsetting",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewScheduler(config util.Scheduler, store store.Store) *Scheduler {
lostSlave: make(map[string]int64),
}

para := &offer.OfferPara{Sched: s}
para := &offer.OfferPara{Sched: s, Store: store}
s.offerPool = offer.NewOfferPool(para)

//if config.ClientCertDir != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/mesos"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/sched"
"bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"bk-bcs/bcs-mesos/bcs-scheduler/src/util"
"fmt"
"github.com/golang/protobuf/proto"
"net/http"
Expand Down Expand Up @@ -200,6 +201,7 @@ func (s *Scheduler) DeleteTaskGroup(app *types.Application, taskGroup *types.Tas
s.UpdateAgentSchedInfo(taskGroup.HostName, taskGroup.ID, nil)
//}

//update app taskgroup index info
if app != nil {
delete := -1
for index, currPod := range app.Pods {
Expand All @@ -214,6 +216,42 @@ func (s *Scheduler) DeleteTaskGroup(app *types.Application, taskGroup *types.Tas
app.UpdateTime = time.Now().Unix()
app.Pods = append(app.Pods[:delete], app.Pods[delete+1:]...)
}

//update agentsetting taskgroup index info
nodeIp := taskGroup.GetAgentIp()
if nodeIp=="" {
blog.Errorf("taskgroup %s don't have nodeIp", taskGroup.ID)
return nil
}
//lock agentsetting
util.Lock.Lock(bcstype.BcsClusterAgentSetting{}, nodeIp)
defer util.Lock.UnLock(bcstype.BcsClusterAgentSetting{}, nodeIp)

agentsetting,err := s.store.FetchAgentSetting(nodeIp)
if err!=nil {
blog.Errorf("fetch agentsetting %s failed: %s", nodeIp, err.Error())
return nil
}
if agentsetting==nil {
blog.Errorf("fetch agentsetting %s Not found", nodeIp)
return nil
}
delete := -1
for index, currPod := range agentsetting.Pods {
if currPod == taskGroup.ID {
delete = index
break
}
}
if delete == -1 {
return nil
}
agentsetting.Pods = append(agentsetting.Pods[:delete], agentsetting.Pods[delete+1:]...)
err = s.store.SaveAgentSetting(agentsetting)
if err!=nil {
blog.Errorf("save agentsetting %s failed: %s", nodeIp, err.Error())
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
package scheduler

import (
"net/http"
"time"

"bk-bcs/bcs-common/common/blog"
commtypes "bk-bcs/bcs-common/common/types"
"bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/offer"
"bk-bcs/bcs-mesos/bcs-scheduler/src/manager/sched/task"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/mesos"
"bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"net/http"
"time"
"bk-bcs/bcs-mesos/bcs-scheduler/src/util"
)

// The goroutine function for launch application transaction
Expand Down Expand Up @@ -187,9 +190,23 @@ func (s *Scheduler) doLaunchTrans(trans *Transaction, outOffer *offer.Offer, sta
s.DeclineResource(offer.Id.Value)
return
}

opData.LaunchedNum++
taskGroupInfos = append(taskGroupInfos, taskGroupInfo)

//lock agentsetting
util.Lock.Lock(commtypes.BcsClusterAgentSetting{}, taskGroup.GetAgentIp())
//update agentsettings taskgroup index info
agentsetting,_ := s.store.FetchAgentSetting(taskGroup.GetAgentIp())
if agentsetting!=nil {
agentsetting.Pods = append(agentsetting.Pods, taskGroup.ID)
err := s.store.SaveAgentSetting(agentsetting)
if err!=nil {
blog.Errorf("save agentsetting %s pods error %s", agentsetting.InnerIP, err.Error())
}
} else {
blog.Errorf("fetch agentsetting %s Not Found", taskGroup.GetAgentIp())
}
util.Lock.UnLock(commtypes.BcsClusterAgentSetting{}, taskGroup.GetAgentIp())
}

if len(taskGroupInfos) <= 0 {
Expand Down
Loading

0 comments on commit 82d8341

Please sign in to comment.