Skip to content

Commit

Permalink
Merge pull request #925 from abstractmj/fea-mesos
Browse files Browse the repository at this point in the history
feature: adapt bcs-cpuset-device to kubernetes #924
  • Loading branch information
DeveloperJim authored Jun 24, 2021
2 parents e7b1aa4 + 93177e0 commit f08de42
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 70 deletions.
15 changes: 15 additions & 0 deletions bcs-mesos/bcs-scheduler/src/manager/store/zk/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func getAgentSchedInfoRootPath() string {
return "/" + bcsRootNode + "/" + agentSchedInfoNode
}

// SaveAgent save agent object
func (store *managerStore) SaveAgent(agent *types.Agent) error {

data, err := json.Marshal(agent)
Expand All @@ -48,6 +49,7 @@ func (store *managerStore) SaveAgent(agent *types.Agent) error {
return store.Db.Insert(path, string(data))
}

// FetchAgent get agent object
func (store *managerStore) FetchAgent(Key string) (*types.Agent, error) {

path := getAgentRootPath() + "/" + Key
Expand All @@ -69,6 +71,7 @@ func (store *managerStore) FetchAgent(Key string) (*types.Agent, error) {
return agent, nil
}

// ListAgentNodes list agent object nodes
func (store *managerStore) ListAgentNodes() ([]string, error) {

path := getAgentRootPath()
Expand All @@ -82,6 +85,7 @@ func (store *managerStore) ListAgentNodes() ([]string, error) {
return agentNodes, nil
}

// DeleteAgent delete agent by key
func (store *managerStore) DeleteAgent(key string) error {

path := getAgentRootPath() + "/" + key
Expand All @@ -93,6 +97,7 @@ func (store *managerStore) DeleteAgent(key string) error {
return nil
}

// SaveAgentSetting save agent setting info
func (store *managerStore) SaveAgentSetting(agent *commtypes.BcsClusterAgentSetting) error {

data, err := json.Marshal(agent)
Expand All @@ -105,6 +110,7 @@ func (store *managerStore) SaveAgentSetting(agent *commtypes.BcsClusterAgentSett
return store.Db.Insert(path, string(data))
}

// FetchAgentSetting get agent setting info
func (store *managerStore) FetchAgentSetting(InnerIP string) (*commtypes.BcsClusterAgentSetting, error) {

path := getAgentSettingRootPath() + "/" + InnerIP
Expand All @@ -129,6 +135,7 @@ func (store *managerStore) FetchAgentSetting(InnerIP string) (*commtypes.BcsClus
return agent, nil
}

// DeleteAgentSetting delete agent setting by inner IP
func (store *managerStore) DeleteAgentSetting(InnerIP string) error {

path := getAgentSettingRootPath() + "/" + InnerIP
Expand All @@ -142,6 +149,7 @@ func (store *managerStore) DeleteAgentSetting(InnerIP string) error {
return nil
}

// ListAgentSettingNodes list agent setting node names
func (store *managerStore) ListAgentSettingNodes() ([]string, error) {

path := getAgentSettingRootPath()
Expand All @@ -155,6 +163,7 @@ func (store *managerStore) ListAgentSettingNodes() ([]string, error) {
return agentNodes, nil
}

// ListAgentsettings get agent setting list
func (store *managerStore) ListAgentsettings() ([]*commtypes.BcsClusterAgentSetting, error) {
nodes, err := store.ListAgentSettingNodes()
if err != nil {
Expand All @@ -174,6 +183,7 @@ func (store *managerStore) ListAgentsettings() ([]*commtypes.BcsClusterAgentSett
return settings, nil
}

// SaveAgentSchedInfo save agent schedule info
func (store *managerStore) SaveAgentSchedInfo(agent *types.AgentSchedInfo) error {

data, err := json.Marshal(agent)
Expand All @@ -186,6 +196,7 @@ func (store *managerStore) SaveAgentSchedInfo(agent *types.AgentSchedInfo) error
return store.Db.Insert(path, string(data))
}

// FetchAgentSchedInfo get agent schedule info
func (store *managerStore) FetchAgentSchedInfo(HostName string) (*types.AgentSchedInfo, error) {

path := getAgentSchedInfoRootPath() + "/" + HostName
Expand All @@ -210,6 +221,7 @@ func (store *managerStore) FetchAgentSchedInfo(HostName string) (*types.AgentSch
return agent, nil
}

// DeleteAgentSchedInfo delete agent schedule info
func (store *managerStore) DeleteAgentSchedInfo(HostName string) error {

path := getAgentSchedInfoRootPath() + "/" + HostName
Expand All @@ -220,6 +232,7 @@ func (store *managerStore) DeleteAgentSchedInfo(HostName string) error {
return nil
}

// ListAgentSchedInfoNodes list agent schedule info node names
func (store *managerStore) ListAgentSchedInfoNodes() ([]string, error) {
path := getAgentSchedInfoRootPath()

Expand All @@ -232,6 +245,7 @@ func (store *managerStore) ListAgentSchedInfoNodes() ([]string, error) {
return agentSchedInfoNodes, nil
}

// ListAgentSchedInfo list agent schedule info
func (store *managerStore) ListAgentSchedInfo() ([]*types.AgentSchedInfo, error) {
nodes, err := store.ListAgentSchedInfoNodes()
if err != nil {
Expand All @@ -251,6 +265,7 @@ func (store *managerStore) ListAgentSchedInfo() ([]*types.AgentSchedInfo, error)
return schedinfos, nil
}

// ListAllAgents list all agents
func (store *managerStore) ListAllAgents() ([]*types.Agent, error) {
nodes, err := store.ListAgentNodes()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions bcs-services/bcs-cpuset-device/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func Run(op *options.Option) error {
func setConfig(conf *config.Config, op *options.Option) {
conf.DockerSocket = op.DockerSock
conf.PluginSocketDir = op.PluginSocketDir
conf.CgroupCpusetRoot = op.CgroupCpusetRoot
conf.BcsZk = op.BCSZk
conf.Engine = op.Engine
conf.ClusterID = op.ClusterID
Expand Down
1 change: 1 addition & 0 deletions bcs-services/bcs-cpuset-device/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Option struct {

DockerSock string `json:"docker_sock" value:"unix:///var/run/docker.sock" usage:"docker socket file"`
PluginSocketDir string `json:"plugin_socket_dir" value:"/var/lib/kubelet/device-plugins" usage:"device-plugin socket directory"`
CgroupCpusetRoot string `json:"cgroup_cpuset_root" value:"/sys/fs/cgroup/cpuset/docker" usage:"root path of cgroup cpuset"`
ClusterID string `json:"clusterid" value:"" usage:"mesos cluster id"`
Engine string `json:"engine" value:"k8s" usage:"enum: k8s、mesos; default: k8s"`
ReservedCPUSetList string `json:"reserved_cpuset_list" value:"" usage:"cpuset number list to be reserved, e.g. 6,7,8"`
Expand Down
2 changes: 2 additions & 0 deletions bcs-services/bcs-cpuset-device/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Config struct {
PluginSocketDir string
// DockerSocket docker socket
DockerSocket string
// CgroupCpusetRoot root path of cpuset cgroup
CgroupCpusetRoot string
// ClientCert client https certs
ClientCert *types.CertConfig `json:"-"`
// BcsZk cluster zk address
Expand Down
19 changes: 14 additions & 5 deletions bcs-services/bcs-cpuset-device/cpuset-device/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cpuset_device
import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

Expand All @@ -24,9 +25,6 @@ import (
docker "github.com/fsouza/go-dockerclient"
)

// CgroupCpusetRoot cgroup fs root path for cpuset
const CgroupCpusetRoot = "/sys/fs/cgroup/cpuset/docker"

func (s *CpusetDevicePlugin) loopUpdateCpusetNodes() {
for {
time.Sleep(s.checkInterval)
Expand Down Expand Up @@ -187,8 +185,19 @@ func (s *CpusetDevicePlugin) setContainerCpuset(c *docker.Container) {
if node == "" || cpusets == "" {
return
}
if c.HostConfig == nil {
blog.Warnf("container %s hostconfig is empty, do nothing", c.ID)
return
}
var cgroupParent string
if len(c.HostConfig.CgroupParent) != 0 {
cgroupParent = filepath.Join(s.conf.CgroupCpusetRoot, c.HostConfig.CgroupParent)
} else {
cgroupParent = s.conf.CgroupCpusetRoot
}

// set container cgroup cpuset.cpus、cpuset.mems
cpus := fmt.Sprintf("%s/%s/cpuset.cpus", CgroupCpusetRoot, c.ID)
cpus := fmt.Sprintf("%s/%s/cpuset.cpus", cgroupParent, c.ID)
fcpus, err := os.Create(cpus)
if err != nil {
blog.Errorf("open file %s error %s", cpus, err.Error())
Expand All @@ -201,7 +210,7 @@ func (s *CpusetDevicePlugin) setContainerCpuset(c *docker.Container) {
return
}

mems := fmt.Sprintf("%s/%s/cpuset.mems", CgroupCpusetRoot, c.ID)
mems := fmt.Sprintf("%s/%s/cpuset.mems", cgroupParent, c.ID)
fmems, err := os.Create(mems)
if err != nil {
blog.Errorf("open file %s error %s", cpus, err.Error())
Expand Down
118 changes: 96 additions & 22 deletions bcs-services/bcs-cpuset-device/cpuset-device/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/Tencent/bk-bcs/bcs-services/bcs-cpuset-device/config"
"github.com/Tencent/bk-bcs/bcs-services/bcs-cpuset-device/types"

"github.com/fsnotify/fsnotify"
docker "github.com/fsouza/go-dockerclient"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -39,7 +40,7 @@ const (
// KubeletSocketName socket file name for kubelet
KubeletSocketName = "kubelet.sock"
// CpusetSocketName socket file name for cpuset
CpusetSocketName = "cpuset.sock"
CpusetSocketName = "bcs-cpuset-device.sock"
// CpusetResourceName resource name for cpuset
CpusetResourceName = "bkbcs.tencent.com/cpuset"

Expand All @@ -63,6 +64,8 @@ type CpusetDevicePlugin struct {
// the grpc cpuset device socket
// serve: ListAndWatch、Allocate function
cpusetSocket string
// the grpc socket of kubelet
kubeletSocket string

// docker client
client *docker.Client
Expand All @@ -80,18 +83,22 @@ type CpusetDevicePlugin struct {

// interval for check running container
checkInterval time.Duration

// stop channel
stopCh chan struct{}
}

// NewCpusetDevicePlugin new cpuset device plugin
func NewCpusetDevicePlugin(conf *config.Config) *CpusetDevicePlugin {
c := &CpusetDevicePlugin{
resourceName: CpusetResourceName,
cpusetSocket: fmt.Sprintf("%s/%s", conf.PluginSocketDir, CpusetSocketName),
conf: conf,
dockerSocket: conf.DockerSocket,
devices: make([]*pluginapi.Device, 0),
nodes: make(map[string]*types.CpusetNode),
server: grpc.NewServer([]grpc.ServerOption{}...),
resourceName: CpusetResourceName,
cpusetSocket: fmt.Sprintf("%s/%s", conf.PluginSocketDir, CpusetSocketName),
kubeletSocket: fmt.Sprintf("%s/%s", conf.PluginSocketDir, KubeletSocketName),
conf: conf,
dockerSocket: conf.DockerSocket,
devices: make([]*pluginapi.Device, 0),
nodes: make(map[string]*types.CpusetNode),
stopCh: make(chan struct{}),
}

return c
Expand Down Expand Up @@ -126,23 +133,15 @@ func (c *CpusetDevicePlugin) Start() error {
// loop list containers to update cpuset node info
go c.loopUpdateCpusetNodes()

// start grpc server
err = c.serve()
if err != nil {
return err
}
blog.Infof("grpc serve on %s success", c.cpusetSocket)

// if device plugin in k8s cluster, then register device plugin info to kubelet
if c.conf.Engine == "k8s" {
err = c.register()
go c.registerLoop()
} else {
err = c.startServer()
if err != nil {
blog.Errorf("register kubelet failed: %s", err.Error())
return err
}
blog.Infof("Registered device plugin for '%s' with Kubelet", c.resourceName)
// else device plugin in mesos cluster, then report extended resources info to mesos scheduler
} else {
err = c.reportExtendedResources()
if err != nil {
return err
Expand All @@ -153,7 +152,8 @@ func (c *CpusetDevicePlugin) Start() error {
return nil
}

func (c *CpusetDevicePlugin) serve() error {
func (c *CpusetDevicePlugin) startServer() error {
c.server = grpc.NewServer([]grpc.ServerOption{}...)
os.Remove(c.cpusetSocket)
sock, err := net.Listen("unix", c.cpusetSocket)
if err != nil {
Expand All @@ -178,12 +178,75 @@ func (c *CpusetDevicePlugin) serve() error {
return err
}
conn.Close()
blog.Infof("grpc serve on %s success", c.cpusetSocket)
return nil
}

func (c *CpusetDevicePlugin) stopServer() {
if c.server != nil {
c.server.Stop()
c.server = nil
blog.Infof("stop grpc serve")
}
}

// registerLoop watch kubelet sock path and do register
func (c *CpusetDevicePlugin) registerLoop() {
c.stopServer()
if err := c.startServer(); err != nil {
blog.Warnf("start grpc server failed, err %s", err.Error())
time.Sleep(5 * time.Second)
go c.registerLoop()
return
}
blog.Infof("begin to register to kubelet")
if err := c.register(); err != nil {
blog.Warnf("register to kubelet failed, err %s, will try again,", err.Error())
time.Sleep(5 * time.Second)
go c.registerLoop()
return
}
blog.Infof("begin watch kubelet socket path %s", c.kubeletSocket)
fileWatcher, err := fsnotify.NewWatcher()
if err != nil {
blog.Warnf("create file watcher failed, err %s", err.Error())
time.Sleep(5 * time.Second)
go c.registerLoop()
return
}
defer fileWatcher.Close()
err = fileWatcher.Add(c.kubeletSocket)
if err != nil {
blog.Warnf("watch file %s failed, err %s", c.kubeletSocket, err.Error())
time.Sleep(5 * time.Second)
go c.registerLoop()
return
}
for {
select {
case we := <-fileWatcher.Events:
// kubelet socket path event and event type is create
if we.Name == c.kubeletSocket && (we.Op&fsnotify.Remove) == fsnotify.Remove {
blog.Infof("file watcher event: kubelet socket file %s removed, try to restart device-plugin",
c.kubeletSocket)
time.Sleep(2 * time.Second)
go c.registerLoop()
return
}
case err := <-fileWatcher.Errors:
blog.Warnf("file watcher errors %s", err.Error())
go c.registerLoop()
return
case <-c.stopCh:
blog.Infof("kubelet socket watcher exited")
return
}
}
}

// Register registers the device plugin for the given resourceName with Kubelet.
func (c *CpusetDevicePlugin) register() error {
conn, err := c.dial(pluginapi.KubeletSocket, 5*time.Second)
conn, err := c.dial(c.kubeletSocket, 5*time.Second)
if err != nil {
return err
}
Expand Down Expand Up @@ -329,7 +392,18 @@ func (c *CpusetDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.

// ListAndWatch lists devices and update that list according to the health status
func (c *CpusetDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
return s.Send(&pluginapi.ListAndWatchResponse{Devices: c.devices})
ticker := time.NewTicker(360 * time.Second)
defer ticker.Stop()
s.Send(&pluginapi.ListAndWatchResponse{Devices: c.devices})
for {
select {
case <-c.stopCh:
blog.Infof("list watch stop")
return nil
case <-ticker.C:
s.Send(&pluginapi.ListAndWatchResponse{Devices: c.devices})
}
}
}

// Allocate which return list of devices.
Expand Down
Loading

0 comments on commit f08de42

Please sign in to comment.