From 287af6daebf427e05e1c4f52c5e7f9adaae266fd Mon Sep 17 00:00:00 2001 From: zwwhdls Date: Tue, 28 Feb 2023 17:54:36 +0800 Subject: [PATCH 1/3] allocate metrics port for multi runtime Signed-off-by: zwwhdls --- charts/juicefs/templates/fuse/daemonset.yaml | 4 +- .../templates/worker/statefuleset.yaml | 6 +- charts/juicefs/values.yaml | 4 +- cmd/juicefs/app/juicefs.go | 23 ++++- pkg/ddc/jindo/types.go | 3 +- pkg/ddc/juicefs/const.go | 7 +- pkg/ddc/juicefs/port_parser.go | 84 +++++++++++++++++++ pkg/ddc/juicefs/shutdown.go | 35 ++++++++ pkg/ddc/juicefs/transform.go | 77 ++++++++++++++++- pkg/ddc/juicefs/transform_fuse.go | 6 +- pkg/ddc/juicefs/type.go | 23 ++--- pkg/ddc/juicefs/utils.go | 21 +++++ 12 files changed, 269 insertions(+), 24 deletions(-) create mode 100644 pkg/ddc/juicefs/port_parser.go diff --git a/charts/juicefs/templates/fuse/daemonset.yaml b/charts/juicefs/templates/fuse/daemonset.yaml index 40e91bf3522..1ba792be2a4 100644 --- a/charts/juicefs/templates/fuse/daemonset.yaml +++ b/charts/juicefs/templates/fuse/daemonset.yaml @@ -125,9 +125,11 @@ spec: successThreshold: 1 timeoutSeconds: 1 ports: - - containerPort: 9567 + {{- if .Values.fuse.metricsPort }} + - containerPort: {{ .Values.fuse.metricsPort }} name: metrics protocol: TCP + {{- end }} securityContext: runAsUser: 0 {{- if .Values.fuse.privileged }} diff --git a/charts/juicefs/templates/worker/statefuleset.yaml b/charts/juicefs/templates/worker/statefuleset.yaml index 33b15b7a0af..538a418bdf4 100644 --- a/charts/juicefs/templates/worker/statefuleset.yaml +++ b/charts/juicefs/templates/worker/statefuleset.yaml @@ -82,9 +82,11 @@ spec: {{- if .Values.worker.privileged }} privileged: true {{- end }} - {{- if .Values.worker.ports }} ports: -{{ toYaml .Values.worker.ports | trim | indent 10 }} + {{- if .Values.worker.metricsPort }} + - containerPort: {{ .Values.worker.metricsPort }} + name: metrics + protocol: TCP {{- end }} env: - name: JFS_FOREGROUND diff --git a/charts/juicefs/values.yaml b/charts/juicefs/values.yaml index d5328d91706..fded4fdaa5c 100644 --- a/charts/juicefs/values.yaml +++ b/charts/juicefs/values.yaml @@ -37,13 +37,13 @@ worker: imagePullPolicy: "" nodeSelector: "" cacheDir: "" - ports: [] envs: [] command: "" mountPath: /mnt/jfs statCmd: "stat -c %i /mnt/jfs" podManagementPolicy: Parallel hostNetwork: false + metricsPort: 9567 resources: requests: # cpu: "0.5" @@ -57,6 +57,7 @@ worker: volumes: [] volumeMounts: [] + configs: name: "" accesskeySecret: "" @@ -75,6 +76,7 @@ configs: fuse: hostNetwork: false + metricsPort: 9567 subPath: "" criticalPod: false enabled: true diff --git a/cmd/juicefs/app/juicefs.go b/cmd/juicefs/app/juicefs.go index 2379cfe9792..d5fee75b726 100644 --- a/cmd/juicefs/app/juicefs.go +++ b/cmd/juicefs/app/juicefs.go @@ -19,16 +19,19 @@ package app import ( "os" - "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/spf13/cobra" zapOpt "go.uber.org/zap" "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/net" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" juicefsctl "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/juicefs" @@ -47,8 +50,10 @@ var ( enableLeaderElection bool leaderElectionNamespace string development bool + portRange string maxConcurrentReconciles int pprofAddr string + portAllocatePolicy string ) var startCmd = &cobra.Command{ @@ -69,6 +74,8 @@ func init() { startCmd.Flags().StringVarP(&pprofAddr, "pprof-addr", "", "", "The address for pprof to use while exporting profiling results") startCmd.Flags().BoolVarP(&development, "development", "", true, "Enable development mode for fluid controller.") startCmd.Flags().BoolVar(&eventDriven, "event-driven", true, "The reconciler's loop strategy. if it's false, it indicates period driven.") + startCmd.Flags().StringVar(&portRange, "runtime-node-port-range", "14000-15999", "Set available port range for JuiceFS") + startCmd.Flags().StringVar(&portAllocatePolicy, "port-allocate-policy", "random", "Set port allocating policy, available choice is bitmap or random(default random).") } func handle() { @@ -116,6 +123,20 @@ func handle() { os.Exit(1) } + pr, err := net.ParsePortRange(portRange) + if err != nil { + setupLog.Error(err, "can't parse port range. Port range must be like -") + os.Exit(1) + } + setupLog.Info("port range parsed", "port range", pr.String()) + + err = portallocator.SetupRuntimePortAllocator(mgr.GetClient(), pr, portAllocatePolicy, juicefs.GetReservedPorts) + if err != nil { + setupLog.Error(err, "failed to setup runtime port allocator") + os.Exit(1) + } + setupLog.Info("Set up runtime port allocator", "policy", portAllocatePolicy) + setupLog.Info("starting juicefsruntime-controller") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem juicefsruntime-controller") diff --git a/pkg/ddc/jindo/types.go b/pkg/ddc/jindo/types.go index 8270f20fc16..a0572cd81c2 100644 --- a/pkg/ddc/jindo/types.go +++ b/pkg/ddc/jindo/types.go @@ -17,8 +17,9 @@ limitations under the License. package jindo import ( - "github.com/fluid-cloudnative/fluid/pkg/common" v1 "k8s.io/api/core/v1" + + "github.com/fluid-cloudnative/fluid/pkg/common" ) type Jindo struct { diff --git a/pkg/ddc/juicefs/const.go b/pkg/ddc/juicefs/const.go index 66443142060..90cec2dae73 100644 --- a/pkg/ddc/juicefs/const.go +++ b/pkg/ddc/juicefs/const.go @@ -31,9 +31,10 @@ const ( PodRoleType = "role" - WorkerPodRole = "juicefs-worker" - EnterpriseEdition = "enterprise" - CommunityEdition = "community" + WorkerPodRole = "juicefs-worker" + EnterpriseEdition = "enterprise" + CommunityEdition = "community" + DefaultMetricsPort = 9567 MetadataSyncNotDoneMsg = "[Calculating]" CheckMetadataSyncDoneTimeoutMillisec = 500 diff --git a/pkg/ddc/juicefs/port_parser.go b/pkg/ddc/juicefs/port_parser.go new file mode 100644 index 00000000000..05582761c38 --- /dev/null +++ b/pkg/ddc/juicefs/port_parser.go @@ -0,0 +1,84 @@ +/* + Copyright 2023 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package juicefs + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" + + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" +) + +// GetReservedPorts defines restoration logic for JuiceFSRuntime +func GetReservedPorts(client client.Client) (ports []int, err error) { + var datasets v1alpha1.DatasetList + err = client.List(context.TODO(), &datasets) + if err != nil { + return nil, errors.Wrap(err, "can't list datasets when GetReservedPorts") + } + + for _, dataset := range datasets.Items { + if len(dataset.Status.Runtimes) != 0 { + // Assume there is only one runtime with category "Accelerate" + accelerateRuntime := dataset.Status.Runtimes[0] + if accelerateRuntime.Type != common.JuiceFSRuntime { + continue + } + configMapName := fmt.Sprintf("%s-%s-values", accelerateRuntime.Name, accelerateRuntime.Type) + configMap, err := kubeclient.GetConfigmapByName(client, configMapName, accelerateRuntime.Namespace) + if err != nil { + return nil, errors.Wrap(err, "GetConfigMapByName when GetReservedPorts") + } + + if configMap == nil { + continue + } + + reservedPorts, err := parsePortsFromConfigMap(configMap) + if err != nil { + return nil, errors.Wrap(err, "parsePortsFromConfigMap when GetReservedPorts") + } + ports = append(ports, reservedPorts...) + } + } + return ports, nil +} + +// parsePortsFromConfigMap extracts port usage information given a configMap +func parsePortsFromConfigMap(configMap *corev1.ConfigMap) (ports []int, err error) { + var value JuiceFS + if v, ok := configMap.Data["data"]; ok { + if err := yaml.Unmarshal([]byte(v), &value); err != nil { + return nil, err + } + + if value.Worker.HostNetwork && value.Worker.MetricsPort != nil { + ports = append(ports, *value.Worker.MetricsPort) + } + if value.Fuse.HostNetwork && value.Fuse.MetricsPort != nil { + ports = append(ports, *value.Fuse.MetricsPort) + } + } + return ports, nil +} diff --git a/pkg/ddc/juicefs/shutdown.go b/pkg/ddc/juicefs/shutdown.go index 876c780c6e7..79cff2f8f7f 100644 --- a/pkg/ddc/juicefs/shutdown.go +++ b/pkg/ddc/juicefs/shutdown.go @@ -23,6 +23,7 @@ import ( "regexp" "strings" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/util/retry" @@ -31,6 +32,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/ddc/juicefs/operations" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/lifecycle" @@ -54,6 +56,11 @@ func (j *JuiceFSEngine) Shutdown() (err error) { return } + err = j.releasePorts() + if err != nil { + return + } + err = j.destroyMaster() if err != nil { return @@ -80,6 +87,34 @@ func (j *JuiceFSEngine) destroyMaster() (err error) { return } +func (j *JuiceFSEngine) releasePorts() (err error) { + var valueConfigMapName = j.getConfigmapName() + + allocator, err := portallocator.GetRuntimePortAllocator() + if err != nil { + return errors.Wrap(err, "GetRuntimePortAllocator when releasePorts") + } + + cm, err := kubeclient.GetConfigmapByName(j.Client, valueConfigMapName, j.namespace) + if err != nil { + return errors.Wrap(err, "GetConfigmapByName when releasePorts") + } + + // The value configMap is not found + if cm == nil { + j.Log.Info("value configMap not found, there might be some unreleased ports", "valueConfigMapName", valueConfigMapName) + return nil + } + + portsToRelease, err := parsePortsFromConfigMap(cm) + if err != nil { + return errors.Wrap(err, "parsePortsFromConfigMap when releasePorts") + } + + allocator.ReleaseReservedPorts(portsToRelease) + return nil +} + // cleanupCache cleans up the cache func (j *JuiceFSEngine) cleanupCache() (err error) { runtime, err := j.getRuntime() diff --git a/pkg/ddc/juicefs/transform.go b/pkg/ddc/juicefs/transform.go index 19686521641..9493d0a6c87 100644 --- a/pkg/ddc/juicefs/transform.go +++ b/pkg/ddc/juicefs/transform.go @@ -24,6 +24,7 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/transfromer" ) @@ -53,13 +54,20 @@ func (j *JuiceFSEngine) transform(runtime *datav1alpha1.JuiceFSRuntime) (value * // transform toleration j.transformTolerations(dataset, value) - // transform the fuse value.Fuse = Fuse{ Privileged: true, } value.Worker = Worker{ Privileged: true, } + + // allocate ports + err = j.allocatePorts(dataset, runtime, value) + if err != nil { + return + } + + // transform the fuse err = j.transformFuse(runtime, dataset, value) if err != nil { return @@ -89,7 +97,6 @@ func (j *JuiceFSEngine) transformWorkers(runtime *datav1alpha1.JuiceFSRuntime, v imagePullPolicy := runtime.Spec.JuiceFSVersion.ImagePullPolicy value.Worker.Envs = runtime.Spec.Worker.Env - value.Worker.Ports = runtime.Spec.Worker.Ports value.Image, value.ImageTag, value.ImagePullPolicy = j.parseRuntimeImage(image, imageTag, imagePullPolicy) @@ -151,3 +158,69 @@ func (j *JuiceFSEngine) transformPodMetadata(runtime *datav1alpha1.JuiceFSRuntim return nil } + +func (j *JuiceFSEngine) allocatePorts(dataset *datav1alpha1.Dataset, runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) error { + if j.getEdition(dataset.Spec.Mounts[0], dataset.Spec.SharedEncryptOptions) == EnterpriseEdition { + // enterprise edition do not need metrics port + return nil + } + fuseMetricsPort := GetMetricsPort(dataset.Spec.Mounts[0].Options) + workerMetricsPort := DefaultMetricsPort + if runtime.Spec.Worker.Options == nil { + workerMetricsPort = fuseMetricsPort + } + + // if not use hostnetwork then use default port + // use hostnetwork to choose port from port allocator + + expectedPortNum := 2 + if !datav1alpha1.IsHostNetwork(runtime.Spec.Worker.NetworkMode) { + value.Worker.MetricsPort = &workerMetricsPort + expectedPortNum-- + } + if !datav1alpha1.IsHostNetwork(runtime.Spec.Fuse.NetworkMode) { + value.Fuse.MetricsPort = &fuseMetricsPort + expectedPortNum-- + } + if expectedPortNum == 0 { + return nil + } + + allocator, err := portallocator.GetRuntimePortAllocator() + if err != nil { + j.Log.Error(err, "can't get runtime port allocator") + return err + } + + allocatedPorts, err := allocator.GetAvailablePorts(expectedPortNum) + if err != nil { + j.Log.Error(err, "can't get available ports", "expected port num", expectedPortNum) + return err + } + + index := 0 + value.Worker.MetricsPort = &allocatedPorts[index] + index++ + value.Fuse.MetricsPort = &allocatedPorts[index] + return nil +} + +func (j *JuiceFSEngine) getEdition(mount datav1alpha1.Mount, SharedEncryptOptions []datav1alpha1.EncryptOption) (edition string) { + edition = EnterpriseEdition + + for _, encryptOption := range SharedEncryptOptions { + if encryptOption.Name == JuiceMetaUrl { + edition = CommunityEdition + break + } + } + + for _, encryptOption := range mount.EncryptOptions { + if encryptOption.Name == JuiceMetaUrl { + edition = CommunityEdition + break + } + } + + return +} diff --git a/pkg/ddc/juicefs/transform_fuse.go b/pkg/ddc/juicefs/transform_fuse.go index 25ca367b78b..54fb030827c 100644 --- a/pkg/ddc/juicefs/transform_fuse.go +++ b/pkg/ddc/juicefs/transform_fuse.go @@ -301,11 +301,13 @@ func (j *JuiceFSEngine) genMount(value *JuiceFS, runtime *datav1alpha1.JuiceFSRu optionMap["attr-cache"] = "7200" optionMap["entry-cache"] = "7200" } + + // set metrics port if _, ok := optionMap["metrics"]; !ok { - optionMap["metrics"] = "0.0.0.0:9567" + optionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", *value.Fuse.MetricsPort) } if _, ok := workerOptionMap["metrics"]; !ok { - workerOptionMap["metrics"] = "0.0.0.0:9567" + workerOptionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", *value.Worker.MetricsPort) } mountArgs = []string{common.JuiceFSCeMountPath, value.Source, value.Fuse.MountPath, "-o", strings.Join(genOption(optionMap), ",")} mountArgsWorker = []string{common.JuiceFSCeMountPath, value.Source, value.Worker.MountPath, "-o", strings.Join(genOption(workerOptionMap), ",")} diff --git a/pkg/ddc/juicefs/type.go b/pkg/ddc/juicefs/type.go index 668b3575560..9cd11391ae0 100644 --- a/pkg/ddc/juicefs/type.go +++ b/pkg/ddc/juicefs/type.go @@ -59,17 +59,17 @@ type Configs struct { } type Worker struct { - Privileged bool `json:"privileged"` - Image string `json:"image,omitempty"` - NodeSelector map[string]string `json:"nodeSelector,omitempty"` - ImageTag string `json:"imageTag,omitempty"` - ImagePullPolicy string `json:"imagePullPolicy,omitempty"` - Resources common.Resources `json:"resources,omitempty"` - Envs []corev1.EnvVar `json:"envs,omitempty"` - Ports []corev1.ContainerPort `json:"ports,omitempty"` - VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - Volumes []corev1.Volume `json:"volumes,omitempty"` - HostNetwork bool `json:"hostNetwork,omitempty"` + Privileged bool `json:"privileged"` + Image string `json:"image,omitempty"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + ImageTag string `json:"imageTag,omitempty"` + ImagePullPolicy string `json:"imagePullPolicy,omitempty"` + Resources common.Resources `json:"resources,omitempty"` + Envs []corev1.EnvVar `json:"envs,omitempty"` + VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` + Volumes []corev1.Volume `json:"volumes,omitempty"` + HostNetwork bool `json:"hostNetwork,omitempty"` + MetricsPort *int `json:"metricsPort,omitempty"` MountPath string `json:"mountPath,omitempty"` StatCmd string `json:"statCmd,omitempty"` @@ -91,6 +91,7 @@ type Fuse struct { VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` Volumes []corev1.Volume `json:"volumes,omitempty"` HostNetwork bool `json:"hostNetwork,omitempty"` + MetricsPort *int `json:"metricsPort,omitempty"` SubPath string `json:"subPath,omitempty"` MountPath string `json:"mountPath,omitempty"` diff --git a/pkg/ddc/juicefs/utils.go b/pkg/ddc/juicefs/utils.go index a3725993fd3..0adea6591f4 100644 --- a/pkg/ddc/juicefs/utils.go +++ b/pkg/ddc/juicefs/utils.go @@ -19,6 +19,7 @@ package juicefs import ( "context" "fmt" + "regexp" "strconv" "strings" @@ -283,3 +284,23 @@ func ParseSubPathFromMountPoint(mountPoint string) (string, error) { } return jPath[1], nil } + +func GetMetricsPort(options map[string]string) int { + port := int64(9567) + if options == nil { + return int(port) + } + + for k, v := range options { + if k == "metrics" { + re := regexp.MustCompile(`.*:([0-9]{1,6})`) + match := re.FindStringSubmatch(v) + if len(match) > 0 { + port, _ = strconv.ParseInt(match[1], 10, 32) + break + } + } + } + + return int(port) +} From eb9fd800e9840899a1b7c1417b3a44354e326e83 Mon Sep 17 00:00:00 2001 From: zwwhdls Date: Wed, 1 Mar 2023 11:24:32 +0800 Subject: [PATCH 2/3] add unit test Signed-off-by: zwwhdls --- pkg/ddc/juicefs/data_load_test.go | 7 +- pkg/ddc/juicefs/master_internal_test.go | 22 ++++- pkg/ddc/juicefs/port_parser_test.go | 118 ++++++++++++++++++++++++ pkg/ddc/juicefs/transform.go | 5 +- pkg/ddc/juicefs/transform_fuse.go | 12 ++- pkg/ddc/juicefs/transform_fuse_test.go | 14 ++- pkg/ddc/juicefs/transform_test.go | 70 +++++++++++++- pkg/ddc/juicefs/utils.go | 13 +-- pkg/ddc/juicefs/utils_test.go | 63 +++++++++++++ 9 files changed, 304 insertions(+), 20 deletions(-) create mode 100644 pkg/ddc/juicefs/port_parser_test.go diff --git a/pkg/ddc/juicefs/data_load_test.go b/pkg/ddc/juicefs/data_load_test.go index e763a805b4e..41a53c9e402 100644 --- a/pkg/ddc/juicefs/data_load_test.go +++ b/pkg/ddc/juicefs/data_load_test.go @@ -12,12 +12,13 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/brahma-adshonor/gohook" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" @@ -33,6 +34,7 @@ user: 0 group: 0 fsGroup: 0 fuse: + metricsPort: 14001 prepare: subPath: /dir1 name: pics @@ -52,8 +54,11 @@ fuse: statCmd: stat -c %i /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse enabled: true criticalPod: true + hostNetwork: true worker: + metricsPort: 14000 cacheDir: /tmp/jfs-cache + hostNetwork: true placement: Exclusive Events: diff --git a/pkg/ddc/juicefs/master_internal_test.go b/pkg/ddc/juicefs/master_internal_test.go index 144878af3f4..5a681561a15 100644 --- a/pkg/ddc/juicefs/master_internal_test.go +++ b/pkg/ddc/juicefs/master_internal_test.go @@ -21,16 +21,20 @@ import ( "fmt" "testing" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubectl" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/net" + + "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubectl" "github.com/brahma-adshonor/gohook" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" ) @@ -65,6 +69,9 @@ func TestSetupMasterInternal(t *testing.T) { mockExecInstallReleaseErr := func(name string, namespace string, valueFile string, chartName string) error { return errors.New("fail to install dataload chart") } + mockExecCreateConfigMapFromFileErr := func(name string, key, fileName string, namespace string) (err error) { + return errors.New("fail to exec command") + } wrappedUnhookCheckRelease := func() { err := gohook.UnHook(helm.CheckRelease) @@ -134,10 +141,17 @@ func TestSetupMasterInternal(t *testing.T) { }, }, } - ////portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, GetReservedPorts) + err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + if err != nil { + t.Fatal(err.Error()) + } + err = gohook.Hook(kubectl.CreateConfigMapFromFile, mockExecCreateConfigMapFromFileErr, nil) + if err != nil { + t.Fatal(err.Error()) + } // check release found - err := gohook.Hook(helm.CheckRelease, mockExecCheckReleaseCommonFound, nil) + err = gohook.Hook(helm.CheckRelease, mockExecCheckReleaseCommonFound, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/juicefs/port_parser_test.go b/pkg/ddc/juicefs/port_parser_test.go new file mode 100644 index 00000000000..ca085ebf591 --- /dev/null +++ b/pkg/ddc/juicefs/port_parser_test.go @@ -0,0 +1,118 @@ +/* + Copyright 2023 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package juicefs + +import ( + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestGetReservedPorts(t *testing.T) { + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myjfs-juicefs-values", + Namespace: "fluid", + }, + Data: map[string]string{ + "data": valuesConfigMapData, + }, + } + dataSets := []*v1alpha1.Dataset{ + { + ObjectMeta: metav1.ObjectMeta{Name: "myjfs", Namespace: "fluid"}, + Status: v1alpha1.DatasetStatus{ + Runtimes: []v1alpha1.Runtime{{ + Name: "myjfs", + Namespace: "fluid", + Type: "juicefs", + }}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "no-runtime", Namespace: "fluid"}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "runtime-type", Namespace: "fluid"}, + Status: v1alpha1.DatasetStatus{ + Runtimes: []v1alpha1.Runtime{{Type: "not-juicefs"}}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "no-map", Namespace: "fluid"}, + Status: v1alpha1.DatasetStatus{ + Runtimes: []v1alpha1.Runtime{{Type: "juicefs"}}, + }, + }, + } + runtimeObjs := []runtime.Object{} + runtimeObjs = append(runtimeObjs, configMap) + for _, dataSet := range dataSets { + runtimeObjs = append(runtimeObjs, dataSet.DeepCopy()) + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, runtimeObjs...) + wantPorts := []int{14000, 14001} + ports, err := GetReservedPorts(fakeClient) + if err != nil { + t.Errorf("GetReservedPorts failed.") + } + if !reflect.DeepEqual(ports, wantPorts) { + t.Errorf("gotPorts = %v, want %v", ports, wantPorts) + } + +} + +func Test_parsePortsFromConfigMap(t *testing.T) { + type args struct { + configMap *corev1.ConfigMap + } + tests := []struct { + name string + args args + wantPorts []int + wantErr bool + }{ + { + name: "parsePortsFromConfigMap", + args: args{configMap: &corev1.ConfigMap{ + Data: map[string]string{ + "data": valuesConfigMapData, + }, + }}, + wantPorts: []int{14000, 14001}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotPorts, err := parsePortsFromConfigMap(tt.args.configMap) + if (err != nil) != tt.wantErr { + t.Errorf("parsePortsFromConfigMap() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotPorts, tt.wantPorts) { + t.Errorf("parsePortsFromConfigMap() gotPorts = %v, want %v", gotPorts, tt.wantPorts) + } + }) + } +} diff --git a/pkg/ddc/juicefs/transform.go b/pkg/ddc/juicefs/transform.go index 9493d0a6c87..8c5587c7d74 100644 --- a/pkg/ddc/juicefs/transform.go +++ b/pkg/ddc/juicefs/transform.go @@ -164,7 +164,10 @@ func (j *JuiceFSEngine) allocatePorts(dataset *datav1alpha1.Dataset, runtime *da // enterprise edition do not need metrics port return nil } - fuseMetricsPort := GetMetricsPort(dataset.Spec.Mounts[0].Options) + fuseMetricsPort, err := GetMetricsPort(dataset.Spec.Mounts[0].Options) + if err != nil { + return err + } workerMetricsPort := DefaultMetricsPort if runtime.Spec.Worker.Options == nil { workerMetricsPort = fuseMetricsPort diff --git a/pkg/ddc/juicefs/transform_fuse.go b/pkg/ddc/juicefs/transform_fuse.go index 54fb030827c..010f468c3a2 100644 --- a/pkg/ddc/juicefs/transform_fuse.go +++ b/pkg/ddc/juicefs/transform_fuse.go @@ -304,10 +304,18 @@ func (j *JuiceFSEngine) genMount(value *JuiceFS, runtime *datav1alpha1.JuiceFSRu // set metrics port if _, ok := optionMap["metrics"]; !ok { - optionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", *value.Fuse.MetricsPort) + metricsPort := DefaultMetricsPort + if value.Fuse.MetricsPort != nil { + metricsPort = *value.Fuse.MetricsPort + } + optionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", metricsPort) } if _, ok := workerOptionMap["metrics"]; !ok { - workerOptionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", *value.Worker.MetricsPort) + metricsPort := DefaultMetricsPort + if value.Fuse.MetricsPort != nil { + metricsPort = *value.Worker.MetricsPort + } + workerOptionMap["metrics"] = fmt.Sprintf("0.0.0.0:%d", metricsPort) } mountArgs = []string{common.JuiceFSCeMountPath, value.Source, value.Fuse.MountPath, "-o", strings.Join(genOption(optionMap), ",")} mountArgsWorker = []string{common.JuiceFSCeMountPath, value.Source, value.Worker.MountPath, "-o", strings.Join(genOption(workerOptionMap), ",")} diff --git a/pkg/ddc/juicefs/transform_fuse_test.go b/pkg/ddc/juicefs/transform_fuse_test.go index 64b411c5323..9977b2c8590 100644 --- a/pkg/ddc/juicefs/transform_fuse_test.go +++ b/pkg/ddc/juicefs/transform_fuse_test.go @@ -20,16 +20,18 @@ import ( "encoding/base64" "testing" - "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/go-logr/logr" + "github.com/fluid-cloudnative/fluid/pkg/common" + "k8s.io/apimachinery/pkg/api/resource" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" ) @@ -119,9 +121,11 @@ func TestTransformFuse(t *testing.T) { }}, }}, }}, - juicefsValue: &JuiceFS{}, - expect: "", - wantErr: false, + juicefsValue: &JuiceFS{ + Worker: Worker{}, + }, + expect: "", + wantErr: false, }, { name: "test-secret-wrong-1", diff --git a/pkg/ddc/juicefs/transform_test.go b/pkg/ddc/juicefs/transform_test.go index 22d8e4f7b2f..148c83f2caf 100644 --- a/pkg/ddc/juicefs/transform_test.go +++ b/pkg/ddc/juicefs/transform_test.go @@ -22,15 +22,23 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" + "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" ) +var dummy = func(client client.Client) (ports []int, err error) { + return []int{14000, 14001}, nil +} + func TestJuiceFSEngine_transform(t *testing.T) { juicefsSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -224,3 +232,63 @@ func TestJuiceFSEngine_transformPodMetadata(t *testing.T) { } } } + +func TestJuiceFSEngine_allocatePorts(t *testing.T) { + pr := net.ParsePortRangeOrDie("14000-15999") + err := portallocator.SetupRuntimePortAllocator(nil, pr, "bitmap", dummy) + if err != nil { + t.Fatal(err.Error()) + } + type args struct { + dataset *datav1alpha1.Dataset + runtime *datav1alpha1.JuiceFSRuntime + value *JuiceFS + } + tests := []struct { + name string + args args + wantErr bool + wantWorkerMetricsPort bool + wantFuseMetricsPort bool + }{ + { + name: "test", + args: args{ + dataset: &datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{ + Mounts: []datav1alpha1.Mount{{EncryptOptions: []datav1alpha1.EncryptOption{{Name: JuiceMetaUrl}}}}, + }, + }, + runtime: &datav1alpha1.JuiceFSRuntime{}, + value: &JuiceFS{}, + }, + wantErr: false, + wantWorkerMetricsPort: true, + wantFuseMetricsPort: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + j := &JuiceFSEngine{} + if err := j.allocatePorts(tt.args.dataset, tt.args.runtime, tt.args.value); (err != nil) != tt.wantErr { + t.Errorf("allocatePorts() error = %v, wantErr %v", err, tt.wantErr) + } + if tt.wantWorkerMetricsPort { + if tt.args.value.Worker.MetricsPort == nil { + t.Error("allocatePorts() got worker port nil") + } + if *tt.args.value.Worker.MetricsPort < 14000 || *tt.args.value.Worker.MetricsPort > 15999 { + t.Errorf("allocatePorts() got worker port = %v, but want in range [14000, 15999]", *tt.args.value.Worker.MetricsPort) + } + } + if tt.wantFuseMetricsPort { + if tt.args.value.Fuse.MetricsPort == nil { + t.Error("allocatePorts() got fuse port nil") + } + if *tt.args.value.Fuse.MetricsPort < 14000 || *tt.args.value.Fuse.MetricsPort > 15999 { + t.Errorf("allocatePorts() got fuse port = %v, but want in range [14000, 15999]", *tt.args.value.Fuse.MetricsPort) + } + } + }) + } +} diff --git a/pkg/ddc/juicefs/utils.go b/pkg/ddc/juicefs/utils.go index 0adea6591f4..9b531877b28 100644 --- a/pkg/ddc/juicefs/utils.go +++ b/pkg/ddc/juicefs/utils.go @@ -285,22 +285,23 @@ func ParseSubPathFromMountPoint(mountPoint string) (string, error) { return jPath[1], nil } -func GetMetricsPort(options map[string]string) int { +func GetMetricsPort(options map[string]string) (int, error) { port := int64(9567) if options == nil { - return int(port) + return int(port), nil } for k, v := range options { if k == "metrics" { re := regexp.MustCompile(`.*:([0-9]{1,6})`) match := re.FindStringSubmatch(v) - if len(match) > 0 { - port, _ = strconv.ParseInt(match[1], 10, 32) - break + if len(match) == 0 { + return DefaultMetricsPort, fmt.Errorf("invalid metrics port: %s", v) } + port, _ = strconv.ParseInt(match[1], 10, 32) + break } } - return int(port) + return int(port), nil } diff --git a/pkg/ddc/juicefs/utils_test.go b/pkg/ddc/juicefs/utils_test.go index ff0270b936b..149ba9a2376 100644 --- a/pkg/ddc/juicefs/utils_test.go +++ b/pkg/ddc/juicefs/utils_test.go @@ -966,3 +966,66 @@ func TestJuiceFSEngine_GetEdition(t *testing.T) { }) } } + +func TestGetMetricsPort(t *testing.T) { + type args struct { + options map[string]string + } + tests := []struct { + name string + args args + want int + wantErr bool + }{ + { + name: "test", + args: args{ + options: map[string]string{ + "metrics": "0.0.0.0:9567", + }, + }, + want: 9567, + wantErr: false, + }, + { + name: "test-default", + args: args{ + options: map[string]string{}, + }, + want: 9567, + wantErr: false, + }, + { + name: "test-wrong1", + args: args{ + options: map[string]string{ + "metrics": "0.0.0.0:test", + }, + }, + want: 9567, + wantErr: true, + }, + { + name: "test-wrong2", + args: args{ + options: map[string]string{ + "metrics": "0.0.0.0", + }, + }, + want: 9567, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetMetricsPort(tt.args.options) + if (err != nil) != tt.wantErr { + t.Errorf("GetMetricsPort() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("GetMetricsPort() got = %v, want %v", got, tt.want) + } + }) + } +} From 1f0e50649ce9cb000b3ae70b82f99e5cfa3632e4 Mon Sep 17 00:00:00 2001 From: zwwhdls Date: Wed, 1 Mar 2023 13:28:31 +0800 Subject: [PATCH 3/3] fix unit test Signed-off-by: zwwhdls --- pkg/ddc/juicefs/data_load_test.go | 100 +++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/pkg/ddc/juicefs/data_load_test.go b/pkg/ddc/juicefs/data_load_test.go index 41a53c9e402..1a4b3e6e5b9 100644 --- a/pkg/ddc/juicefs/data_load_test.go +++ b/pkg/ddc/juicefs/data_load_test.go @@ -25,43 +25,87 @@ import ( ) var valuesConfigMapData = ` -fullnameOverride: test-dataset +cacheDirs: + "1": + path: /jfs/cache3:/jfs/cache4 + type: hostPath +configs: + accesskeySecret: jfs-secret + accesskeySecretKey: accesskey + bucket: http://minio.default.svc.cluster.local:9000/minio/test2 + formatCmd: /usr/local/bin/juicefs format --trash-days=0 --access-key=${ACCESS_KEY} + --secret-key=${SECRET_KEY} --storage=minio --bucket=http://minio.default.svc.cluster.local:9000/minio/test2 + ${METAURL} minio + metaurlSecret: jfs-secret + metaurlSecretKey: metaurl + name: minio + secretkeySecret: jfs-secret + secretkeySecretKey: secretkey + storage: minio edition: community -image: juicedata/juicefs-csi-driver -imageTag: v0.11.0 -imagePullPolicy: IfNotPresent -user: 0 -group: 0 fsGroup: 0 +fullnameOverride: jfsdemo fuse: metricsPort: 14001 - prepare: - subPath: /dir1 - name: pics - accesskeySecret: juicefs-secret - secretkeySecret: juicefs-secret - bucket: http://xx.xx.xx.xx/pics - metaurlSecret: juicefs-secret - storage: minio - image: juicedata/juicefs-csi-driver - nodeSelector: - fluid.io/f-fluid-test-dataset: "true" - imageTag: v0.11.0 - mountPath: /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse - cacheDir: /tmp/jfs-cache - hostMountPath: /runtime-mnt/juicefs/fluid/test-dataset - command: /bin/mount.juicefs redis://xx.xx.xx.xx:6379/1 /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse -o metrics=0.0.0.0:9567,subdir=/dir1,cache-size=4096,free-space-ratio=0.1,cache-dir=/tmp/jfs-cache - statCmd: stat -c %i /runtime-mnt/juicefs/fluid/test-dataset/juicefs-fuse - enabled: true + command: /bin/mount.juicefs ${METAURL} /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse + -o metrics=0.0.0.0:9567,cache-size=1024,free-space-ratio=0.1,cache-dir=/jfs/cache3:/jfs/cache4 criticalPod: true + enabled: true + hostMountPath: /runtime-mnt/juicefs/default/jfsdemo hostNetwork: true + image: registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse + imagePullPolicy: IfNotPresent + imageTag: v1.0.0-4.8.0 + mountPath: /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse + nodeSelector: + fluid.io/f-default-jfsdemo: "true" + privileged: true + resources: {} + statCmd: stat -c %i /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse + volumeMounts: + - mountPath: /jfs/cache3:/jfs/cache4 + name: cache-dir-1 + volumes: + - hostPath: + path: /jfs/cache3:/jfs/cache4 + type: DirectoryOrCreate + name: cache-dir-1 +group: 0 +image: registry.cn-hangzhou.aliyuncs.com/juicefs/juicefs-fuse +imagePullPolicy: IfNotPresent +imagePullSecrets: null +imageTag: v1.0.0-4.8.0 +owner: + apiVersion: data.fluid.io/v1alpha1 + blockOwnerDeletion: false + controller: true + enabled: true + kind: JuiceFSRuntime + name: jfsdemo + uid: 9ae3312b-d5b6-4a3d-895c-7712bfa7d74e +placement: Exclusive +runtimeIdentity: + name: jfsdemo + namespace: default +source: ${METAURL} +user: 0 worker: metricsPort: 14000 - cacheDir: /tmp/jfs-cache + command: /bin/mount.juicefs ${METAURL} /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse + -o cache-size=1024,free-space-ratio=0.1,cache-dir=/jfs/cache1:/jfs/cache2,metrics=0.0.0.0:9567 hostNetwork: true -placement: Exclusive - -Events: + mountPath: /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse + privileged: true + resources: {} + statCmd: stat -c %i /runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse + volumeMounts: + - mountPath: /jfs/cache1:/jfs/cache2 + name: cache-dir-1 + volumes: + - hostPath: + path: /jfs/cache1:/jfs/cache2 + type: DirectoryOrCreate + name: cache-dir-1 ` func TestJuiceFSEngine_CreateDataLoadJob(t *testing.T) {