Skip to content

Commit

Permalink
Merge pull request #925 from AlbeeSo/fix/authtype-sts
Browse files Browse the repository at this point in the history
fix: support sts authtype in containerized version
  • Loading branch information
k8s-ci-robot committed Dec 12, 2023
2 parents 74a68b8 + 2c15f4d commit ef311fd
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 54 deletions.
23 changes: 18 additions & 5 deletions pkg/mounter/fuse_containerized_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,20 @@ const (
FuseMountPathAnnoKey = "csi.alibabacloud.com/mount-path"
)

type AuthConfig struct {
AuthType string
// TODO: for RRSA
// RoleName string
// ServiceAccountName string
}

const (
AuthTypeSTS = "sts"
)

type FuseMounterType interface {
name() string
buildPodSpec(source, target, fstype string, options, mountFlags []string, nodeName, volumeId string) (corev1.PodSpec, error)
buildPodSpec(source, target, fstype string, authCfg *AuthConfig, options, mountFlags []string, nodeName, volumeId string) (corev1.PodSpec, error)
}

type FuseContainerConfig struct {
Expand Down Expand Up @@ -113,13 +124,14 @@ func NewContainerizedFuseMounterFactory(
// This implies that mount operations will either succeed when the fuse pod is ready,
// or fail and ensure that no fuse pods are left behind.
func (fac *ContainerizedFuseMounterFactory) NewFuseMounter(
ctx context.Context, volumeId string, atomic bool) *ContainerizedFuseMounter {
ctx context.Context, volumeId string, authCfg *AuthConfig, atomic bool) *ContainerizedFuseMounter {
return &ContainerizedFuseMounter{
ctx: ctx,
atomic: atomic,
volumeId: volumeId,
nodeName: fac.nodeName,
namespace: fac.namespace,
authCfg: authCfg,
client: fac.client,
log: logrus.WithFields(logrus.Fields{
"fuse-type": fac.fuseType.name(),
Expand All @@ -136,6 +148,7 @@ type ContainerizedFuseMounter struct {
volumeId string
nodeName string
namespace string
authCfg *AuthConfig
client kubernetes.Interface
log *logrus.Entry
FuseMounterType
Expand All @@ -151,7 +164,7 @@ func (mounter *ContainerizedFuseMounter) Mount(source string, target string, fst

ctx, cancel := context.WithTimeout(mounter.ctx, fuseMountTimeout)
defer cancel()
err := mounter.launchFusePod(ctx, source, target, fstype, options, nil)
err := mounter.launchFusePod(ctx, source, target, fstype, mounter.authCfg, options, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -188,7 +201,7 @@ func (mounter *ContainerizedFuseMounter) labelsAndListOptionsFor(target string)
return labels, listOptions
}

func (mounter *ContainerizedFuseMounter) launchFusePod(ctx context.Context, source, target, fstype string, options, mountFlags []string) error {
func (mounter *ContainerizedFuseMounter) launchFusePod(ctx context.Context, source, target, fstype string, authCfg *AuthConfig, options, mountFlags []string) error {
podClient := mounter.client.CoreV1().Pods(mounter.namespace)
labels, listOptions := mounter.labelsAndListOptionsFor(target)
podList, err := podClient.List(ctx, listOptions)
Expand Down Expand Up @@ -230,7 +243,7 @@ func (mounter *ContainerizedFuseMounter) launchFusePod(ctx context.Context, sour
rawPod.GenerateName = fmt.Sprintf("csi-fuse-%s-", mounter.name())
rawPod.Labels = labels
rawPod.Annotations = map[string]string{FuseMountPathAnnoKey: target}
rawPod.Spec, err = mounter.buildPodSpec(source, target, fstype, options, mountFlags, mounter.nodeName, mounter.volumeId)
rawPod.Spec, err = mounter.buildPodSpec(source, target, fstype, authCfg, options, mountFlags, mounter.nodeName, mounter.volumeId)
if err != nil {
return err
}
Expand Down
75 changes: 46 additions & 29 deletions pkg/mounter/ossfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *fuseOssfs) name() string {
}

func (f *fuseOssfs) buildPodSpec(
source, target, fstype string, options, mountFlags []string, nodeName, volumeId string,
source, target, fstype string, authCfg *AuthConfig, options, mountFlags []string, nodeName, volumeId string,
) (spec corev1.PodSpec, _ error) {

targetVolume := corev1.Volume{
Expand All @@ -78,25 +78,7 @@ func (f *fuseOssfs) buildPodSpec(
},
}
*metricsDirVolume.HostPath.Type = corev1.HostPathDirectoryOrCreate
passwdMountDir := "/etc/ossfs"
passwdFilename := "passwd-ossfs"
passwdSecretVolume := corev1.Volume{
Name: "passwd-ossfs",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: OssfsCredentialSecretName,
Items: []corev1.KeyToPath{
{
Key: fmt.Sprintf("%s.%s", nodeName, volumeId),
Path: passwdFilename,
Mode: pointer.Int32Ptr(0600),
},
},
Optional: pointer.BoolPtr(true),
},
},
}
spec.Volumes = []corev1.Volume{targetVolume, metricsDirVolume, passwdSecretVolume}
spec.Volumes = []corev1.Volume{targetVolume, metricsDirVolume}

var mimeMountDir string
if utils.IsFileExisting(filepath.Join(hostPrefix, OssfsDefMimeTypesFilePath)) {
Expand Down Expand Up @@ -138,16 +120,10 @@ func (f *fuseOssfs) buildPodSpec(
default:
return spec, fmt.Errorf("invalid ossfs dbglevel: %q", dbglevel)
}
options = append(options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename)))
args := mountutils.MakeMountArgs(source, target, "", options)
args = append(args, mountFlags...)
// FUSE foreground option - do not run as daemon
args = append(args, "-f")
bidirectional := corev1.MountPropagationBidirectional
container := corev1.Container{
Name: "fuse-mounter",
Image: f.config.Image,
Args: args,
Resources: f.config.Resources,
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -157,9 +133,6 @@ func (f *fuseOssfs) buildPodSpec(
}, {
Name: metricsDirVolume.Name,
MountPath: metricsDirVolume.HostPath.Path,
}, {
Name: passwdSecretVolume.Name,
MountPath: passwdMountDir,
},
},
StartupProbe: &corev1.Probe{
Expand All @@ -184,6 +157,15 @@ func (f *fuseOssfs) buildPodSpec(
}
container.VolumeMounts = append(container.VolumeMounts, mimeVolumeMount)
}

buildAuthSpec(nodeName, volumeId, authCfg, &spec, &container, &options)

args := mountutils.MakeMountArgs(source, target, "", options)
args = append(args, mountFlags...)
// FUSE foreground option - do not run as daemon
args = append(args, "-f")
container.Args = args

spec.Containers = []corev1.Container{container}
spec.RestartPolicy = corev1.RestartPolicyOnFailure
spec.NodeName = nodeName
Expand Down Expand Up @@ -263,3 +245,38 @@ func CleanupOssfsCredentialSecret(ctx context.Context, clientset kubernetes.Inte
}
return err
}

func buildAuthSpec(nodeName, volumeId string, authCfg *AuthConfig,
spec *corev1.PodSpec, container *corev1.Container, options *[]string) {
if spec == nil || container == nil || options == nil {
return
}
if authCfg != nil && authCfg.AuthType == AuthTypeSTS {
return
}
passwdMountDir := "/etc/ossfs"
passwdFilename := "passwd-ossfs"
passwdSecretVolume := corev1.Volume{
Name: "passwd-ossfs",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: OssfsCredentialSecretName,
Items: []corev1.KeyToPath{
{
Key: fmt.Sprintf("%s.%s", nodeName, volumeId),
Path: passwdFilename,
Mode: pointer.Int32Ptr(0600),
},
},
Optional: pointer.BoolPtr(true),
},
},
}
spec.Volumes = append(spec.Volumes, passwdSecretVolume)
passwdVolumeMont := corev1.VolumeMount{
Name: passwdSecretVolume.Name,
MountPath: passwdMountDir,
}
container.VolumeMounts = append(container.VolumeMounts, passwdVolumeMont)
*options = append(*options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename)))
}
66 changes: 66 additions & 0 deletions pkg/mounter/ossfs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package mounter

import (
"fmt"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"
)

func Test_buildAuthSpec(t *testing.T) {
nodeName := "test-node-name"
volumeId := "test-pv-name"
authCfg := &AuthConfig{}
container := corev1.Container{
Name: "fuse-mounter",
Image: "test-image",
VolumeMounts: []corev1.VolumeMount{
{
Name: "test-mounts",
MountPath: "target",
},
},
}
targetVolume := corev1.Volume{
Name: "test-mounts",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "target",
Type: new(corev1.HostPathType),
},
},
}
spec := corev1.PodSpec{}
spec.Volumes = []corev1.Volume{targetVolume}
options := []string{"allow_other", "dbglevel=debug"}
// ak
passwdMountDir := "/etc/ossfs"
passwdFilename := "passwd-ossfs"
passwdSecretVolume := corev1.Volume{
Name: "passwd-ossfs",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: OssfsCredentialSecretName,
Items: []corev1.KeyToPath{
{
Key: fmt.Sprintf("%s.%s", nodeName, volumeId),
Path: passwdFilename,
Mode: pointer.Int32Ptr(0600),
},
},
Optional: pointer.BoolPtr(true),
},
},
}
passwdVolumeMont := corev1.VolumeMount{
Name: passwdSecretVolume.Name,
MountPath: passwdMountDir,
}
buildAuthSpec(nodeName, volumeId, authCfg, &spec, &container, &options)
assert.Contains(t, spec.Volumes, passwdSecretVolume)
assert.Contains(t, container.VolumeMounts, passwdVolumeMont)
assert.Contains(t, options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename)))
}
41 changes: 22 additions & 19 deletions pkg/oss/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
// When useSharedPath options is set to false,
// mount operations need to be atomic to ensure that no fuse pods are left behind in case of failure.
// Because kubelet will not call NodeUnpublishVolume when NodePublishVolume never succeeded.
ossMounter = ns.ossfsMounterFac.NewFuseMounter(ctx, req.VolumeId, !opt.UseSharedPath)
authCfg := &mounter.AuthConfig{AuthType: opt.AuthType}
ossMounter = ns.ossfsMounterFac.NewFuseMounter(ctx, req.VolumeId, authCfg, !opt.UseSharedPath)
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown fuseType: %q", opt.FuseType)
}
Expand Down Expand Up @@ -234,8 +235,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
mountOptions = req.VolumeCapability.GetMount().MountFlags
}

// If you do not use sts authentication, save ak
if opt.AuthType != "sts" {
switch opt.AuthType {
case mounter.AuthTypeSTS:
if opt.FuseType == OssFsType {
mountOptions = append(mountOptions, GetRAMRoleOption())
} else if opt.FuseType == JindoFsType {
mountOptions = append(mountOptions, "fs.oss.provider.endpoint=ECS_ROLE")
}
default:
if opt.FuseType == OssFsType {
// ossfs fuse pod will mount the secret to access credentials
err := mounter.SetupOssfsCredentialSecret(ctx, ns.clientset, ns.nodeName, req.VolumeId, opt.Bucket, opt.AkID, opt.AkSecret)
Expand All @@ -245,12 +252,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
} else if opt.FuseType == JindoFsType {
mountOptions = append(mountOptions, fmt.Sprintf("fs.oss.accessKeyId=%s,fs.oss.accessKeySecret=%s", opt.AkID, opt.AkSecret))
}
} else {
if opt.FuseType == OssFsType {
mountOptions = append(mountOptions, GetRAMRoleOption())
} else if opt.FuseType == JindoFsType {
mountOptions = append(mountOptions, "fs.oss.provider.endpoint=ECS_ROLE")
}
}

if opt.ReadOnly {
Expand Down Expand Up @@ -326,15 +327,17 @@ func checkOssOptions(opt *Options) error {
return nil
}

// if not input ak from user, use the default ak value
if opt.AkID == "" || opt.AkSecret == "" {
ac := utils.GetEnvAK()
opt.AkID = ac.AccessKeyID
opt.AkSecret = ac.AccessKeySecret
}
if opt.AkID == "" || opt.AkSecret == "" {
if opt.AuthType == "" {
return errors.New("Oss Parametes error: AK and authType are both empty ")
switch opt.AuthType {
case mounter.AuthTypeSTS:
default:
// if not input ak from user, use the default ak value
if opt.AkID == "" || opt.AkSecret == "" {
ac := utils.GetEnvAK()
opt.AkID = ac.AccessKeyID
opt.AkSecret = ac.AccessKeySecret
}
if opt.AkID == "" || opt.AkSecret == "" {
return errors.New("Oss Parametes error: AK and authType are both empty or invalid ")
}
}

Expand Down Expand Up @@ -412,5 +415,5 @@ func (ns *nodeServer) cleanupMountPoint(ctx context.Context, volumeId string, mo
if err != nil {
return err
}
return ns.ossfsMounterFac.NewFuseMounter(ctx, volumeId, false).Unmount(mountpoint)
return ns.ossfsMounterFac.NewFuseMounter(ctx, volumeId, nil, false).Unmount(mountpoint)
}
2 changes: 1 addition & 1 deletion pkg/oss/nodeserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestGetDiskVolumeOptions(t *testing.T) {
options.URL = "1.1.1.1"
options.AkID = ""
err = checkOssOptions(options)
assert.Equal(t, "Oss Parametes error: AK and authType are both empty ", err.Error())
assert.Equal(t, "Oss Parametes error: AK and authType are both empty or invalid ", err.Error())

options.AkID = "2222"
// reset AkSecret in checkOssOptions when AkID = ""
Expand Down

0 comments on commit ef311fd

Please sign in to comment.