Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remount old mount point when csi plugin unexpect exit #282

Merged
merged 10 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/cephfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
nodeID = flag.String("nodeid", "", "node id")
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
huaizong marked this conversation as resolved.
Show resolved Hide resolved
)

func init() {
Expand All @@ -49,6 +50,7 @@ func main() {
}
//update plugin name
cephfs.PluginFolder = cephfs.PluginFolder + *driverName
cephfs.MountCacheDir = *mountCacheDir

cp, err := util.CreatePersistanceStorage(cephfs.PluginFolder, *metadataStorage, *driverName)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
klog.Fatalf("failed to write ceph configuration file: %v", err)
}

if err := remountHisMountedPath(driverName, version, nodeID, cachePersister); err != nil {
gman0 marked this conversation as resolved.
Show resolved Hide resolved
huaizong marked this conversation as resolved.
Show resolved Hide resolved
klog.Warningf("failed to remounted history mounted path: %v", err)
//ignore remount fail
}
// Initialize default library driver

fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
Expand Down
333 changes: 333 additions & 0 deletions pkg/cephfs/mountcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
package cephfs

import (
"encoding/base64"
"os"
"sync"
"syscall"
"time"

"github.com/ceph/ceph-csi/pkg/util"
"github.com/pkg/errors"
"k8s.io/klog"
)

type volumeMountEntry struct {
huaizong marked this conversation as resolved.
Show resolved Hide resolved
NodeID string `json:"nodeID"`
DriverName string `json:"driverName"`
DriverVersion string `json:"driverVersion"`

Namespace string `json:"namespace"`
huaizong marked this conversation as resolved.
Show resolved Hide resolved

VolumeID string `json:"volumeID"`
Secrets map[string]string `json:"secrets"`
StagingPath string `json:"stagingPath"`
TargetPaths map[string]bool `json:"targetPaths"`
CreateTime time.Time `json:"createTime"`
LastMountTime time.Time `json:"lastMountTime"`
LoadCount uint64 `json:"loadCount"`
}

type volumeMountCacheMap struct {
DriverName string
huaizong marked this conversation as resolved.
Show resolved Hide resolved
DriverVersion string
NodeID string
MountFailNum int64
MountSuccNum int64
Volumes map[string]volumeMountEntry
NodeCacheStore util.NodeCache
MetadataStore util.CachePersister
}

var (
MountCacheDir = ""
volumeMountCachePrefix = "cephfs-mount-cache-"
volumeMountCache volumeMountCacheMap
volumeMountCacheMtx sync.Mutex
)

func remountHisMountedPath(name string, v string, nodeID string, cachePersister util.CachePersister) error {
huaizong marked this conversation as resolved.
Show resolved Hide resolved
volumeMountCache.Volumes = make(map[string]volumeMountEntry)
volumeMountCache.NodeID = nodeID
volumeMountCache.DriverName = name
volumeMountCache.DriverVersion = v
volumeMountCache.MountSuccNum = 0
volumeMountCache.MountFailNum = 0

volumeMountCache.MetadataStore = cachePersister

volumeMountCache.NodeCacheStore.BasePath = MountCacheDir
volumeMountCache.NodeCacheStore.CacheDir = ""

if len(MountCacheDir) == 0 {
//if mount cache dir unset, disable remount
klog.Infof("mount-cache: mountcachedir no define disalbe mount cache.")
return nil
}

klog.Infof("mount-cache: MountCacheDir: %s", MountCacheDir)
if err := os.MkdirAll(volumeMountCache.NodeCacheStore.BasePath, 0755); err != nil {
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.NodeCacheStore.BasePath, err)
return err
}
me := &volumeMountEntry{}
ce := &controllerCacheEntry{}
err := volumeMountCache.NodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
volID := me.VolumeID
klog.Infof("mount-cache: load %v", me)
if err := volumeMountCache.MetadataStore.Get(volID, ce); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.Infof("cephfs: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID)
}
}
} else {
if err := mountOneCacheEntry(ce, me); err == nil {
volumeMountCache.MountSuccNum++
volumeMountCache.Volumes[me.VolumeID] = *me
} else {
volumeMountCache.MountFailNum++
}
}
return nil
})
if err != nil {
klog.Infof("mount-cache: metastore list cache fail %v", err)
return err
}
if volumeMountCache.MountFailNum > volumeMountCache.MountSuccNum {
return errors.New("mount-cache: too many volumes mount fail")
}
klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", volumeMountCache.MountSuccNum, volumeMountCache.MountFailNum)
return nil
}

func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountEntry) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

var err error
volID := ce.VolumeID
volOptions := ce.VolOptions

adminCr, err := getAdminCredentials(decodeCredentials(me.Secrets))
huaizong marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
entity, err := getCephUser(&volOptions, adminCr, volID)
if err != nil {
klog.Infof("mount-cache: failed to get ceph user: %s %v", volID, me.StagingPath)
}
cr := entity.toCredentials()

if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volID)
}

err = cleanupMountPoint(me.StagingPath)
if err != nil {
klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err)
return err
}

isMnt, err := isMountPoint(me.StagingPath)
if err != nil {
isMnt = false
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
}

if !isMnt {
m, err := newMounter(&volOptions)
if err != nil {
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
return err
}
if err := m.mount(me.StagingPath, cr, &volOptions); err != nil {
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
return err
}
}
for targetPath, readOnly := range me.TargetPaths {
if err := cleanupMountPoint(targetPath); err == nil {
if err := bindMount(me.StagingPath, targetPath, readOnly); err != nil {
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
volID, me.StagingPath, targetPath, readOnly, err)
} else {
klog.Infof("mount-cache: succ bind-mount volume %s: %s %s %v",
huaizong marked this conversation as resolved.
Show resolved Hide resolved
volID, me.StagingPath, targetPath, readOnly)
}
}
}
return nil
}

func cleanupMountPoint(mountPoint string) error {
if _, err := os.Stat(mountPoint); err != nil {
if IsCorruptedMnt(err) {
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
err := execCommandErr("umount", mountPoint)
if err != nil {
klog.Infof("mount-cache: unmount %s fail %v", mountPoint, err)
//ignore error return err
}
}
}
if _, err := os.Stat(mountPoint); err != nil {
klog.Errorf("mount-cache: mount point %s stat fail %v", mountPoint, err)
return err
}
return nil
}

func IsCorruptedMnt(err error) bool {
huaizong marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
huaizong marked this conversation as resolved.
Show resolved Hide resolved
underlyingError = pe.Err
}

return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}

func genVolumeMountCacheFileName(volID string) string {
cachePath := volumeMountCachePrefix + volID
return cachePath
}

func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
if len(MountCacheDir) == 0 {
//if mount cache dir unset, disable remount
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

lastTargetPaths := make(map[string]bool)
me, ok := volumeMountCache.Volumes[volID]
if ok {
if me.StagingPath == stagingTargetPath {
klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID)
return nil
}
lastTargetPaths = me.TargetPaths
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
}

me = volumeMountEntry{NodeID: mc.NodeID, DriverName: mc.DriverName, DriverVersion: mc.DriverVersion}

me.VolumeID = volID
me.Secrets = encodeCredentials(secrets)
me.StagingPath = stagingTargetPath
me.TargetPaths = lastTargetPaths

curTime := time.Now()
me.CreateTime = curTime
me.CreateTime = curTime
huaizong marked this conversation as resolved.
Show resolved Hide resolved
me.LoadCount = 0
volumeMountCache.Volumes[volID] = me
if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
klog.Errorf("mount-cache: node stage volume failed to store a cache entry for volume %s: %v", volID, err)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return err
}
klog.Infof("mount-cache: node stage volume succ to store a cache entry for volume %s: %v", volID, me)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return nil
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}

func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string, stagingTargetPath string) error {
if len(MountCacheDir) == 0 {
//if mount cache dir unset, disable remount
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
delete(volumeMountCache.Volumes, volID)
if err := mc.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil {
klog.Infof("mount-cache: node unstage volume failed to delete cache entry for volume %s: %s %v", volID, stagingTargetPath, err)
return err
}
return nil
}

func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error {
if len(MountCacheDir) == 0 {
//if mount cache dir unset, disable remount
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

_, ok := volumeMountCache.Volumes[volID]
if !ok {
klog.Errorf("mount-cache: node publish volume failed to find cache entry for volume %s", volID)
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
}
volumeMountCache.Volumes[volID].TargetPaths[targetPath] = readOnly
return mc.updateNodeCache(volID)
}

func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error {
if len(MountCacheDir) == 0 {
//if mount cache dir unset, disable remount
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

_, ok := volumeMountCache.Volumes[volID]
if !ok {
klog.Errorf("mount-cache: node unpublish volume failed to find cache entry for volume %s", volID)
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
}
delete(volumeMountCache.Volumes[volID].TargetPaths, targetPath)
return mc.updateNodeCache(volID)
}

func (mc *volumeMountCacheMap) updateNodeCache(volID string) error {
me := volumeMountCache.Volumes[volID]
if err := volumeMountCache.NodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}
if err := mc.NodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
klog.Errorf("mount-cache: mount cache failed to update for volume %s: %v", volID, err)
return err
}
return nil
}

func encodeCredentials(input map[string]string) (output map[string]string) {
output = make(map[string]string)
for key, value := range input {
nKey := base64.StdEncoding.EncodeToString([]byte(key))
nValue := base64.StdEncoding.EncodeToString([]byte(value))
output[nKey] = nValue
}
return output
}

func decodeCredentials(input map[string]string) (output map[string]string) {
output = make(map[string]string)
for key, value := range input {
nKey, err := base64.StdEncoding.DecodeString(key)
if err != nil {
klog.Errorf("mount-cache: decode secret fail")
continue
}
nValue, err := base64.StdEncoding.DecodeString(value)
if err != nil {
klog.Errorf("mount-cache: decode secret fail")
continue
}
output[string(nKey)] = string(nValue)
}
return output
}
38 changes: 38 additions & 0 deletions pkg/cephfs/mountcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cephfs

import (
"testing"
)

func init() {
}

func TestMountOneCacheEntry(t *testing.T) {
}

func TestRemountHisMountedPath(t *testing.T) {
}

func TestNodeStageVolume(t *testing.T) {
}

func TestNodeUnStageVolume(t *testing.T) {
}

func TestNodePublishVolume(t *testing.T) {
}

func TestNodeUnpublishVolume(t *testing.T) {
}

func TestEncodeDecodeCredentials(t *testing.T) {
secrets := make(map[string]string)
secrets["user_1"] = "value_1"
enSecrets := encodeCredentials(secrets)
deSecrets := decodeCredentials(enSecrets)
for key, value := range secrets {
if deSecrets[key] != value {
t.Errorf("key %s value %s not equal %s after encode decode", key, value, deSecrets[key])
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading