From adf55647f5eb6da9cf9723ddac2b4a792b00e814 Mon Sep 17 00:00:00 2001 From: AlbeeSo Date: Sun, 10 Dec 2023 16:02:57 +0800 Subject: [PATCH 1/3] fix: support sts authtype in containerized version --- pkg/mounter/fuse_containerized_mounter.go | 23 ++++++-- pkg/mounter/ossfs.go | 66 +++++++++++++---------- pkg/oss/nodeserver.go | 41 +++++++------- 3 files changed, 77 insertions(+), 53 deletions(-) diff --git a/pkg/mounter/fuse_containerized_mounter.go b/pkg/mounter/fuse_containerized_mounter.go index 9004f64bc..dd9218e1b 100644 --- a/pkg/mounter/fuse_containerized_mounter.go +++ b/pkg/mounter/fuse_containerized_mounter.go @@ -33,9 +33,20 @@ const ( FuseMountPathAnnoKey = "csi.alibabacloud.com/mount-path" ) +type AuthConfig struct { + AuthType string + // TODO: for RRSA + // RoleName string + // ServiceAccountName string +} + +const ( + AuthTypeSTS = "sts" +) + type FuseMounterType interface { name() string - buildPodSpec(source, target, fstype string, options, mountFlags []string, nodeName, volumeId string) (corev1.PodSpec, error) + buildPodSpec(source, target, fstype string, authCfg *AuthConfig, options, mountFlags []string, nodeName, volumeId string) (corev1.PodSpec, error) } type FuseContainerConfig struct { @@ -113,13 +124,14 @@ func NewContainerizedFuseMounterFactory( // This implies that mount operations will either succeed when the fuse pod is ready, // or fail and ensure that no fuse pods are left behind. func (fac *ContainerizedFuseMounterFactory) NewFuseMounter( - ctx context.Context, volumeId string, atomic bool) *ContainerizedFuseMounter { + ctx context.Context, volumeId string, authCfg *AuthConfig, atomic bool) *ContainerizedFuseMounter { return &ContainerizedFuseMounter{ ctx: ctx, atomic: atomic, volumeId: volumeId, nodeName: fac.nodeName, namespace: fac.namespace, + authCfg: authCfg, client: fac.client, log: logrus.WithFields(logrus.Fields{ "fuse-type": fac.fuseType.name(), @@ -136,6 +148,7 @@ type ContainerizedFuseMounter struct { volumeId string nodeName string namespace string + authCfg *AuthConfig client kubernetes.Interface log *logrus.Entry FuseMounterType @@ -151,7 +164,7 @@ func (mounter *ContainerizedFuseMounter) Mount(source string, target string, fst ctx, cancel := context.WithTimeout(mounter.ctx, fuseMountTimeout) defer cancel() - err := mounter.launchFusePod(ctx, source, target, fstype, options, nil) + err := mounter.launchFusePod(ctx, source, target, fstype, mounter.authCfg, options, nil) if err != nil { return err } @@ -188,7 +201,7 @@ func (mounter *ContainerizedFuseMounter) labelsAndListOptionsFor(target string) return labels, listOptions } -func (mounter *ContainerizedFuseMounter) launchFusePod(ctx context.Context, source, target, fstype string, options, mountFlags []string) error { +func (mounter *ContainerizedFuseMounter) launchFusePod(ctx context.Context, source, target, fstype string, authCfg *AuthConfig, options, mountFlags []string) error { podClient := mounter.client.CoreV1().Pods(mounter.namespace) labels, listOptions := mounter.labelsAndListOptionsFor(target) podList, err := podClient.List(ctx, listOptions) @@ -230,7 +243,7 @@ func (mounter *ContainerizedFuseMounter) launchFusePod(ctx context.Context, sour rawPod.GenerateName = fmt.Sprintf("csi-fuse-%s-", mounter.name()) rawPod.Labels = labels rawPod.Annotations = map[string]string{FuseMountPathAnnoKey: target} - rawPod.Spec, err = mounter.buildPodSpec(source, target, fstype, options, mountFlags, mounter.nodeName, mounter.volumeId) + rawPod.Spec, err = mounter.buildPodSpec(source, target, fstype, authCfg, options, mountFlags, mounter.nodeName, mounter.volumeId) if err != nil { return err } diff --git a/pkg/mounter/ossfs.go b/pkg/mounter/ossfs.go index 4a838de05..60c551577 100644 --- a/pkg/mounter/ossfs.go +++ b/pkg/mounter/ossfs.go @@ -55,7 +55,7 @@ func (f *fuseOssfs) name() string { } func (f *fuseOssfs) buildPodSpec( - source, target, fstype string, options, mountFlags []string, nodeName, volumeId string, + source, target, fstype string, authCfg *AuthConfig, options, mountFlags []string, nodeName, volumeId string, ) (spec corev1.PodSpec, _ error) { targetVolume := corev1.Volume{ @@ -78,25 +78,7 @@ func (f *fuseOssfs) buildPodSpec( }, } *metricsDirVolume.HostPath.Type = corev1.HostPathDirectoryOrCreate - passwdMountDir := "/etc/ossfs" - passwdFilename := "passwd-ossfs" - passwdSecretVolume := corev1.Volume{ - Name: "passwd-ossfs", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: OssfsCredentialSecretName, - Items: []corev1.KeyToPath{ - { - Key: fmt.Sprintf("%s.%s", nodeName, volumeId), - Path: passwdFilename, - Mode: pointer.Int32Ptr(0600), - }, - }, - Optional: pointer.BoolPtr(true), - }, - }, - } - spec.Volumes = []corev1.Volume{targetVolume, metricsDirVolume, passwdSecretVolume} + spec.Volumes = []corev1.Volume{targetVolume, metricsDirVolume} var mimeMountDir string if utils.IsFileExisting(filepath.Join(hostPrefix, OssfsDefMimeTypesFilePath)) { @@ -138,16 +120,10 @@ func (f *fuseOssfs) buildPodSpec( default: return spec, fmt.Errorf("invalid ossfs dbglevel: %q", dbglevel) } - options = append(options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename))) - args := mountutils.MakeMountArgs(source, target, "", options) - args = append(args, mountFlags...) - // FUSE foreground option - do not run as daemon - args = append(args, "-f") bidirectional := corev1.MountPropagationBidirectional container := corev1.Container{ Name: "fuse-mounter", Image: f.config.Image, - Args: args, Resources: f.config.Resources, VolumeMounts: []corev1.VolumeMount{ { @@ -157,9 +133,6 @@ func (f *fuseOssfs) buildPodSpec( }, { Name: metricsDirVolume.Name, MountPath: metricsDirVolume.HostPath.Path, - }, { - Name: passwdSecretVolume.Name, - MountPath: passwdMountDir, }, }, StartupProbe: &corev1.Probe{ @@ -184,6 +157,41 @@ func (f *fuseOssfs) buildPodSpec( } container.VolumeMounts = append(container.VolumeMounts, mimeVolumeMount) } + + if authCfg == nil || authCfg.AuthType != AuthTypeSTS{ + passwdMountDir := "/etc/ossfs" + passwdFilename := "passwd-ossfs" + passwdSecretVolume := corev1.Volume{ + Name: "passwd-ossfs", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: OssfsCredentialSecretName, + Items: []corev1.KeyToPath{ + { + Key: fmt.Sprintf("%s.%s", nodeName, volumeId), + Path: passwdFilename, + Mode: pointer.Int32Ptr(0600), + }, + }, + Optional: pointer.BoolPtr(true), + }, + }, + } + spec.Volumes = append(spec.Volumes, passwdSecretVolume) + passwdVolumeMont := corev1.VolumeMount{ + Name: passwdSecretVolume.Name, + MountPath: passwdMountDir, + } + container.VolumeMounts = append(container.VolumeMounts, passwdVolumeMont) + options = append(options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename))) + } + + args := mountutils.MakeMountArgs(source, target, "", options) + args = append(args, mountFlags...) + // FUSE foreground option - do not run as daemon + args = append(args, "-f") + container.Args = args + spec.Containers = []corev1.Container{container} spec.RestartPolicy = corev1.RestartPolicyOnFailure spec.NodeName = nodeName diff --git a/pkg/oss/nodeserver.go b/pkg/oss/nodeserver.go index 452cb7aa5..203418058 100644 --- a/pkg/oss/nodeserver.go +++ b/pkg/oss/nodeserver.go @@ -206,7 +206,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // When useSharedPath options is set to false, // mount operations need to be atomic to ensure that no fuse pods are left behind in case of failure. // Because kubelet will not call NodeUnpublishVolume when NodePublishVolume never succeeded. - ossMounter = ns.ossfsMounterFac.NewFuseMounter(ctx, req.VolumeId, !opt.UseSharedPath) + authCfg := &mounter.AuthConfig{AuthType: opt.AuthType} + ossMounter = ns.ossfsMounterFac.NewFuseMounter(ctx, req.VolumeId, authCfg, !opt.UseSharedPath) default: return nil, status.Errorf(codes.InvalidArgument, "unknown fuseType: %q", opt.FuseType) } @@ -234,8 +235,14 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis mountOptions = req.VolumeCapability.GetMount().MountFlags } - // If you do not use sts authentication, save ak - if opt.AuthType != "sts" { + switch opt.AuthType { + case mounter.AuthTypeSTS: + if opt.FuseType == OssFsType { + mountOptions = append(mountOptions, GetRAMRoleOption()) + } else if opt.FuseType == JindoFsType { + mountOptions = append(mountOptions, "fs.oss.provider.endpoint=ECS_ROLE") + } + default: if opt.FuseType == OssFsType { // ossfs fuse pod will mount the secret to access credentials err := mounter.SetupOssfsCredentialSecret(ctx, ns.clientset, ns.nodeName, req.VolumeId, opt.Bucket, opt.AkID, opt.AkSecret) @@ -245,12 +252,6 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } else if opt.FuseType == JindoFsType { mountOptions = append(mountOptions, fmt.Sprintf("fs.oss.accessKeyId=%s,fs.oss.accessKeySecret=%s", opt.AkID, opt.AkSecret)) } - } else { - if opt.FuseType == OssFsType { - mountOptions = append(mountOptions, GetRAMRoleOption()) - } else if opt.FuseType == JindoFsType { - mountOptions = append(mountOptions, "fs.oss.provider.endpoint=ECS_ROLE") - } } if opt.ReadOnly { @@ -326,15 +327,17 @@ func checkOssOptions(opt *Options) error { return nil } - // if not input ak from user, use the default ak value - if opt.AkID == "" || opt.AkSecret == "" { - ac := utils.GetEnvAK() - opt.AkID = ac.AccessKeyID - opt.AkSecret = ac.AccessKeySecret - } - if opt.AkID == "" || opt.AkSecret == "" { - if opt.AuthType == "" { - return errors.New("Oss Parametes error: AK and authType are both empty ") + switch opt.AuthType { + case mounter.AuthTypeSTS: + default: + // if not input ak from user, use the default ak value + if opt.AkID == "" || opt.AkSecret == "" { + ac := utils.GetEnvAK() + opt.AkID = ac.AccessKeyID + opt.AkSecret = ac.AccessKeySecret + } + if opt.AkID == "" || opt.AkSecret == "" { + return errors.New("Oss Parametes error: AK and authType are both empty or invalid ") } } @@ -412,5 +415,5 @@ func (ns *nodeServer) cleanupMountPoint(ctx context.Context, volumeId string, mo if err != nil { return err } - return ns.ossfsMounterFac.NewFuseMounter(ctx, volumeId, false).Unmount(mountpoint) + return ns.ossfsMounterFac.NewFuseMounter(ctx, volumeId, nil, false).Unmount(mountpoint) } From b2b72f037ce5b923b30577aa9d520eb46802036d Mon Sep 17 00:00:00 2001 From: AlbeeSo Date: Mon, 11 Dec 2023 11:24:56 +0800 Subject: [PATCH 2/3] update-gofmt --- pkg/mounter/ossfs.go | 4 ++-- pkg/oss/nodeserver_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/mounter/ossfs.go b/pkg/mounter/ossfs.go index 60c551577..ce361789f 100644 --- a/pkg/mounter/ossfs.go +++ b/pkg/mounter/ossfs.go @@ -158,7 +158,7 @@ func (f *fuseOssfs) buildPodSpec( container.VolumeMounts = append(container.VolumeMounts, mimeVolumeMount) } - if authCfg == nil || authCfg.AuthType != AuthTypeSTS{ + if authCfg == nil || authCfg.AuthType != AuthTypeSTS { passwdMountDir := "/etc/ossfs" passwdFilename := "passwd-ossfs" passwdSecretVolume := corev1.Volume{ @@ -191,7 +191,7 @@ func (f *fuseOssfs) buildPodSpec( // FUSE foreground option - do not run as daemon args = append(args, "-f") container.Args = args - + spec.Containers = []corev1.Container{container} spec.RestartPolicy = corev1.RestartPolicyOnFailure spec.NodeName = nodeName diff --git a/pkg/oss/nodeserver_test.go b/pkg/oss/nodeserver_test.go index b915ad687..a53abe3ed 100644 --- a/pkg/oss/nodeserver_test.go +++ b/pkg/oss/nodeserver_test.go @@ -39,7 +39,7 @@ func TestGetDiskVolumeOptions(t *testing.T) { options.URL = "1.1.1.1" options.AkID = "" err = checkOssOptions(options) - assert.Equal(t, "Oss Parametes error: AK and authType are both empty ", err.Error()) + assert.Equal(t, "Oss Parametes error: AK and authType are both empty or invalid ", err.Error()) options.AkID = "2222" // reset AkSecret in checkOssOptions when AkID = "" From 2c15f4d0f6d2f127836fc3f2f0ff89b5b024a785 Mon Sep 17 00:00:00 2001 From: AlbeeSo Date: Tue, 12 Dec 2023 14:58:03 +0800 Subject: [PATCH 3/3] seal buildAuthSpec --- pkg/mounter/ossfs.go | 63 +++++++++++++++++++++---------------- pkg/mounter/ossfs_test.go | 66 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 27 deletions(-) create mode 100644 pkg/mounter/ossfs_test.go diff --git a/pkg/mounter/ossfs.go b/pkg/mounter/ossfs.go index ce361789f..373f7a96a 100644 --- a/pkg/mounter/ossfs.go +++ b/pkg/mounter/ossfs.go @@ -158,33 +158,7 @@ func (f *fuseOssfs) buildPodSpec( container.VolumeMounts = append(container.VolumeMounts, mimeVolumeMount) } - if authCfg == nil || authCfg.AuthType != AuthTypeSTS { - passwdMountDir := "/etc/ossfs" - passwdFilename := "passwd-ossfs" - passwdSecretVolume := corev1.Volume{ - Name: "passwd-ossfs", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: OssfsCredentialSecretName, - Items: []corev1.KeyToPath{ - { - Key: fmt.Sprintf("%s.%s", nodeName, volumeId), - Path: passwdFilename, - Mode: pointer.Int32Ptr(0600), - }, - }, - Optional: pointer.BoolPtr(true), - }, - }, - } - spec.Volumes = append(spec.Volumes, passwdSecretVolume) - passwdVolumeMont := corev1.VolumeMount{ - Name: passwdSecretVolume.Name, - MountPath: passwdMountDir, - } - container.VolumeMounts = append(container.VolumeMounts, passwdVolumeMont) - options = append(options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename))) - } + buildAuthSpec(nodeName, volumeId, authCfg, &spec, &container, &options) args := mountutils.MakeMountArgs(source, target, "", options) args = append(args, mountFlags...) @@ -271,3 +245,38 @@ func CleanupOssfsCredentialSecret(ctx context.Context, clientset kubernetes.Inte } return err } + +func buildAuthSpec(nodeName, volumeId string, authCfg *AuthConfig, + spec *corev1.PodSpec, container *corev1.Container, options *[]string) { + if spec == nil || container == nil || options == nil { + return + } + if authCfg != nil && authCfg.AuthType == AuthTypeSTS { + return + } + passwdMountDir := "/etc/ossfs" + passwdFilename := "passwd-ossfs" + passwdSecretVolume := corev1.Volume{ + Name: "passwd-ossfs", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: OssfsCredentialSecretName, + Items: []corev1.KeyToPath{ + { + Key: fmt.Sprintf("%s.%s", nodeName, volumeId), + Path: passwdFilename, + Mode: pointer.Int32Ptr(0600), + }, + }, + Optional: pointer.BoolPtr(true), + }, + }, + } + spec.Volumes = append(spec.Volumes, passwdSecretVolume) + passwdVolumeMont := corev1.VolumeMount{ + Name: passwdSecretVolume.Name, + MountPath: passwdMountDir, + } + container.VolumeMounts = append(container.VolumeMounts, passwdVolumeMont) + *options = append(*options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename))) +} diff --git a/pkg/mounter/ossfs_test.go b/pkg/mounter/ossfs_test.go new file mode 100644 index 000000000..57b21c2f0 --- /dev/null +++ b/pkg/mounter/ossfs_test.go @@ -0,0 +1,66 @@ +package mounter + +import ( + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/pointer" +) + +func Test_buildAuthSpec(t *testing.T) { + nodeName := "test-node-name" + volumeId := "test-pv-name" + authCfg := &AuthConfig{} + container := corev1.Container{ + Name: "fuse-mounter", + Image: "test-image", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "test-mounts", + MountPath: "target", + }, + }, + } + targetVolume := corev1.Volume{ + Name: "test-mounts", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "target", + Type: new(corev1.HostPathType), + }, + }, + } + spec := corev1.PodSpec{} + spec.Volumes = []corev1.Volume{targetVolume} + options := []string{"allow_other", "dbglevel=debug"} + // ak + passwdMountDir := "/etc/ossfs" + passwdFilename := "passwd-ossfs" + passwdSecretVolume := corev1.Volume{ + Name: "passwd-ossfs", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: OssfsCredentialSecretName, + Items: []corev1.KeyToPath{ + { + Key: fmt.Sprintf("%s.%s", nodeName, volumeId), + Path: passwdFilename, + Mode: pointer.Int32Ptr(0600), + }, + }, + Optional: pointer.BoolPtr(true), + }, + }, + } + passwdVolumeMont := corev1.VolumeMount{ + Name: passwdSecretVolume.Name, + MountPath: passwdMountDir, + } + buildAuthSpec(nodeName, volumeId, authCfg, &spec, &container, &options) + assert.Contains(t, spec.Volumes, passwdSecretVolume) + assert.Contains(t, container.VolumeMounts, passwdVolumeMont) + assert.Contains(t, options, fmt.Sprintf("passwd_file=%s", filepath.Join(passwdMountDir, passwdFilename))) +}