Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk: remove shell usage #989

Merged
merged 19 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deploy/ecs/csi-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ spec:
name: efc-metrics-dir
- mountPath: /host/run/csi-tool
name: run-csi
- mountPath: /host/sys/fs/cgroup/blkio/kubepods.slice
name: cgroup-blkio
- mountPath: /etc/csi-plugin/config
name: csi-plugin-cm
- name: host-mnt
Expand Down Expand Up @@ -194,6 +196,10 @@ spec:
hostPath:
path: /var/lib/kubelet
type: Directory
- name: cgroup-blkio
hostPath:
path: /sys/fs/cgroup/blkio/kubepods.slice
type: Directory
- name: host-dev
hostPath:
path: /dev
Expand Down
6 changes: 6 additions & 0 deletions deploy/nonecs/csi-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ spec:
name: efc-metrics-dir
- mountPath: /host/run/csi-tool
name: run-csi
- mountPath: /host/sys/fs/cgroup/blkio/kubepods.slice
name: cgroup-blkio
- mountPath: /etc/csi-plugin/config
name: csi-plugin-cm
- name: host-mnt
Expand Down Expand Up @@ -178,6 +180,10 @@ spec:
hostPath:
path: /var/lib/kubelet
type: Directory
- name: cgroup-blkio
hostPath:
path: /sys/fs/cgroup/blkio/kubepods.slice
type: Directory
- name: host-dev
hostPath:
path: /dev
Expand Down
3 changes: 1 addition & 2 deletions pkg/disk/bdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,7 @@ func checkVfhpOnlineReconcile() {
}

func checkVfhpOnline() {
cmd := fmt.Sprintf("%s iohub-vfhp-helper -s", NsenterCmd)
_, err := utils.Run(cmd)
err := utils.CommandOnNode("iohub-vfhp-helper", "-s").Run()
if err == nil {
isVF = false
return
Expand Down
245 changes: 93 additions & 152 deletions pkg/disk/bdfcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package disk
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
Expand All @@ -15,8 +14,9 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/mount-utils"
)

// ObjReference reference for bdf volume
Expand Down Expand Up @@ -60,30 +60,44 @@ func BdfHealthCheck() {
log.Infof("Bdf Health Check Starting, with Interval %d minutes...", interTime)

// running in loop
// if bdf hang exist, unused not be checked.
// if bdf hang, unused is not checked.
checkDone := make(chan struct{})
isHang := false
for {
if !IsVFNode() {
break
}
isHang := checkBdfHang(recorder)
if doUnusedCheck && !isHang {
if !isHang {
go checkBdfHangCmd(checkDone)
timer := time.NewTimer(10 * time.Second)
select {
case <-checkDone:
timer.Stop()
case <-timer.C:
isHang = true
}
} else {
select {
case <-checkDone:
isHang = false
default:
}
}
if isHang {
notifyBdfHang(recorder)
} else if doUnusedCheck {
checkDiskUnused(recorder)
}
time.Sleep(time.Duration(interTime) * time.Minute)
}
}

// check if bdf hang in host
// record events if bdf hang;
func checkBdfHang(recorder record.EventRecorder) bool {
if isHang, err := isBdfHang(); isHang {
errMsg := fmt.Sprintf("Find BDF Hang in Node %s, with message: %v", GlobalConfigVar.NodeID, err)
log.Errorf(errMsg)
utils.CreateEvent(recorder, ObjReference, v1.EventTypeWarning, BdfVolumeHang, errMsg)
DingTalk(errMsg)
return true
}
return false
func notifyBdfHang(recorder record.EventRecorder) {
errMsg := fmt.Sprintf("Find BDF Hang in Node %s", GlobalConfigVar.NodeID)
log.Errorf(errMsg)
utils.CreateEvent(recorder, ObjReference, v1.EventTypeWarning, BdfVolumeHang, errMsg)
DingTalk(errMsg)
}

// check disk attached but not used in host;
Expand All @@ -104,177 +118,104 @@ func checkDiskUnused(recorder record.EventRecorder) {
}

// go routine for bdf check command;
func checkBdfHangCmd() error {
chckHang := "cat /sys/block/*/serial &"
err := utils.RunTimeout(chckHang, 3)
if err != nil {
log.Errorf("BdfCheck: command exec: %s with error: %v", chckHang, err)
return err
}
return nil
}

// check bdf hang exist in host
// suppose cat /sys/block/ will be hang, if bdf hang;
func isBdfHang() (bool, error) {
cmdHang := "ps -ef | grep \"cat /sys/block/\" | grep -v grep | wc -l"
psOut, err := utils.Run(cmdHang)
func checkBdfHangCmd(finished chan<- struct{}) {
files, err := filepath.Glob("/sys/block/*/serial")
if err != nil {
return true, err
panic(err) // the only error Glob can return is ErrBadPattern, which should not happen
}
if strings.TrimSpace(psOut) != "0" {
return true, fmt.Errorf("Process cat /sys/block/ already exist ")
for _, file := range files {
os.ReadFile(file) // ignore error
}
finished <- struct{}{}
}

// run cat /sys/block/*/serial
// go routine avoid command hang
// sleep 50ms sometimes too short to wait command stop
go checkBdfHangCmd()
time.Sleep(time.Duration(50) * time.Millisecond)

psOut, err = utils.Run(cmdHang)
if err != nil {
return true, err
}
if strings.TrimSpace(psOut) != "0" {
// double check if bdf is hang
// sleep 3s to wait cat command finished.
time.Sleep(time.Duration(2) * time.Second)
psOut, err = utils.Run(cmdHang)
if err != nil {
return true, err
}
if strings.TrimSpace(psOut) != "0" {
return true, fmt.Errorf("Process cat /sys/block/ exist after exec ")
}
}
return false, nil
// cut off the tailing numbers from device name
func devicePathIgnorePartition(devicePath string) string {
return strings.TrimRightFunc(devicePath, func(r rune) bool {
return r >= '0' && r <= '9'
})
}

// get disk unused
func getDiskUnUsedAndAddTag() ([]string, error) {
files, err := ioutil.ReadDir("/dev/")
files, err := filepath.Glob("/dev/vd*")
if err != nil {
return nil, err
panic(err) // the only error Glob can return is ErrBadPattern, which should not happen
}

DeviceMap := map[string]bool{}
for _, f := range files {
if !strings.HasPrefix(f.Name(), "vd") {
continue
}
if strings.TrimSpace(f.Name()) == "" {
continue
}
if f.Name() == "vda" || f.Name() == "vda1" || f.Name() == "vdb1" {
continue
}
if f.Name() == "vdb" && utils.IsFileExisting("/dev/vdb1") {
continue
}
if f.Name() == "vdb" {
cmd := "mount | grep \"vdb on /var/lib/kubelet type\" | wc -l"
if out, err := utils.Run(cmd); err == nil && strings.TrimSpace(out) != "0" {
continue
}
}
devPath := filepath.Join("/dev/", f.Name())
DeviceMap[devPath] = true
}

FileSystemDeviceMap, BlockMntMap, err := getDeviceMounted()
inUseDevices, err := getDeviceInUse(mountInfoPath)
if err != nil {
return nil, err
}

// Delete Filesystem device
for fsDev := range FileSystemDeviceMap {
if _, ok := DeviceMap[fsDev]; ok {
delete(DeviceMap, fsDev)
} else {
log.Warnf("BdfCheck: Device %s is not find under /dev, but is mounted by path", fsDev)
}
}
// Delete Block device
for blockDev := range BlockMntMap {
if blockDev == "" {
unusedDevices := []string{}
for _, f := range files {
f = devicePathIgnorePartition(f)
fname := filepath.Base(f)
if fname == "vda" {
// this is likely the system disk, skip
continue
}
if _, ok := DeviceMap[blockDev]; ok {
delete(DeviceMap, blockDev)
} else {
log.Warnf("BdfCheck: Device %s is not find under /dev, but is mounted by block", blockDev)
if _, ok := inUseDevices[f]; !ok {
unusedDevices = append(unusedDevices, f)
}
}

// check Device unused;
if len(DeviceMap) != 0 {
// wait for mount finished
time.Sleep(time.Duration(2) * time.Second)
unUsedDevices := []string{}
FileSystemDeviceMap, BlockMntMap, err := getDeviceMounted()
if err != nil {
return nil, err
}
for key := range DeviceMap {
if utils.IsFileExisting(key) && doubleCheckDeviceUnUsed(FileSystemDeviceMap, BlockMntMap, key) {
unUsedDevices = append(unUsedDevices, key)
}
}
if len(unUsedDevices) == 0 {
return nil, nil
}
if len(unusedDevices) == 0 {
return nil, nil
}

// there are unUsedDevices in host;
diskIDList, err := addDiskBdfTag(unUsedDevices)
return unUsedDevices, fmt.Errorf("UnUsedDisks: %v, Udpate Tags: %v", diskIDList, err)
// wait for possible ongoing mount/detach to finished
time.Sleep(2 * time.Second)
inUseDevices, err = getDeviceInUse(mountInfoPath)
if err != nil {
return nil, err
}
return nil, nil
}

// filesystem not mounted, block volume not mounted
func doubleCheckDeviceUnUsed(FileSystemDeviceMap map[string]bool, BlockMntMap map[string]bool, deviceName string) bool {
if _, ok := FileSystemDeviceMap[deviceName]; ok {
return false
stillUnusedDevices := []string{}
for _, dev := range unusedDevices {
if _, ok := inUseDevices[dev]; !ok && utils.IsFileExisting(dev) {
stillUnusedDevices = append(stillUnusedDevices, dev)
}
}
if _, ok := BlockMntMap[deviceName]; ok {
return false
if len(stillUnusedDevices) == 0 {
return nil, nil
}
return true

// there are unUsedDevices in host;
diskIDList, err := addDiskBdfTag(stillUnusedDevices)
return stillUnusedDevices, fmt.Errorf("UnUsedDisks: %v, Udpate Tags: %v", diskIDList, err)
}

// get device mounted as filesystem or block volume
func getDeviceMounted() (map[string]bool, map[string]bool, error) {
// Get all mounted device by filesystem
FileSystemDeviceMap := map[string]bool{}
fsCheckCmd := "mount | grep /var/lib/kubelet/plugins/kubernetes.io/csi/pv/ | awk '{print $1}'"
out, err := utils.Run(fsCheckCmd)
func getDeviceInUse(mountinfoPath string) (map[string]struct{}, error) {
mnts, err := mount.ParseMountInfo(mountinfoPath)
if err != nil {
return nil, nil, err
}
for _, deviceStr := range strings.Split(out, "\n") {
if strings.TrimSpace(deviceStr) == "" {
continue
}
FileSystemDeviceMap[deviceStr] = true
return nil, err
}

// Get all mounted device by block
BlockMntMap := map[string]bool{}
blockCheckCmd := "findmnt -o TARGET,SOURCE | grep \"devtmpfs\\[\" | grep /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/staging | awk '{print $NF}' | awk -F[ '{print $2}'"
blockOut, err := utils.Run(blockCheckCmd)
if err != nil {
return nil, nil, err
inUseDevices := map[string]struct{}{}
addDevice := func(device string) {
inUseDevices[devicePathIgnorePartition(device)] = struct{}{}
}
for _, deviceStr := range strings.Split(blockOut, "\n") {
if strings.HasSuffix(deviceStr, "]") {
strLen := len(deviceStr)
devPath := filepath.Join("/dev/", deviceStr[0:strLen-1])
BlockMntMap[devPath] = true

for _, mnt := range mnts {
if mnt.Source == "devtmpfs" {
// volumeDevices case
if strings.HasPrefix(mnt.MountPoint, utils.KubeletRootDir+"/plugins/kubernetes.io/csi/volumeDevices/staging") {
// devtmpfs is usually mounted on /dev
addDevice("/dev" + mnt.Root)
}
} else if strings.HasPrefix(mnt.Source, "/dev/") {
// normal mount case
if strings.HasPrefix(mnt.MountPoint, utils.KubeletRootDir+"/plugins/kubernetes.io/csi/pv/") ||
mnt.MountPoint == utils.KubeletRootDir {

addDevice(mnt.Source)
}
}
}

return FileSystemDeviceMap, BlockMntMap, nil
return inUseDevices, nil
}

func addDiskBdfTag(devices []string) ([]string, error) {
Expand Down
Loading