From cac476a6a0a4200efe57235872dff98762a8528e Mon Sep 17 00:00:00 2001 From: xliu1992 Date: Wed, 1 Mar 2023 10:44:41 +0800 Subject: [PATCH] Use OperationRef instead of DataloadRef and DataBackupRef (#2637) * Use OperationRef instead of DataloadRef and DataBackupRef Signed-off-by: xliuqq * renaming constants names Signed-off-by: xliuqq --------- Signed-off-by: xliuqq --- api/v1alpha1/dataset_types.go | 33 +++++++++++++++++++ api/v1alpha1/openapi_generated.go | 20 +++++++++-- api/v1alpha1/zz_generated.deepcopy.go | 7 ++++ .../fluid/crds/data.fluid.io_datasets.yaml | 11 +++++-- config/crd/bases/data.fluid.io_datasets.yaml | 11 +++++-- .../databackup/databackup_controller.go | 8 +++-- .../v1alpha1/databackup/implement.go | 4 +-- .../v1alpha1/dataload/implement.go | 13 +++++--- pkg/databackup/constants.go | 2 ++ pkg/dataload/constants.go | 2 ++ pkg/utils/databackup.go | 3 +- pkg/utils/databackup_test.go | 2 +- pkg/utils/dataload.go | 3 +- pkg/utils/dataload_test.go | 2 +- 14 files changed, 101 insertions(+), 20 deletions(-) diff --git a/api/v1alpha1/dataset_types.go b/api/v1alpha1/dataset_types.go index 5a993c6be9e..f4338573c7a 100644 --- a/api/v1alpha1/dataset_types.go +++ b/api/v1alpha1/dataset_types.go @@ -211,6 +211,7 @@ type DatasetStatus struct { // DataLoadRef specifies the running DataLoad job that targets this Dataset. // This is mainly used as a lock to prevent concurrent DataLoad jobs. + // Deprecated, use OperationRef instead DataLoadRef string `json:"dataLoadRef,omitempty"` // DataMigrateRef specifies the running DataMigrate job that targets this Dataset. @@ -219,8 +220,13 @@ type DatasetStatus struct { // DataBackupRef specifies the running Backup job that targets this Dataset. // This is mainly used as a lock to prevent concurrent DataBackup jobs. + // Deprecated, use OperationRef instead DataBackupRef string `json:"dataBackupRef,omitempty"` + // OperationRef specifies the Operation that targets this Dataset. + // This is mainly used as a lock to prevent concurrent same Operation jobs. + OperationRef map[string]string `json:"operationRef,omitempty"` + // DatasetRef specifies the datasets namespaced name mounting this Dataset. DatasetRef []string `json:"datasetRef,omitempty"` } @@ -331,3 +337,30 @@ func (dataset *Dataset) CanbeBound(name string, namespace string, category commo func (dataset *Dataset) IsExclusiveMode() bool { return dataset.Spec.PlacementMode == DefaultMode || dataset.Spec.PlacementMode == ExclusiveMode } + +// GetLockedNameForOperation get the name of operation for certain type running on this dataset, otherwise return empty string +func (dataset *Dataset) GetLockedNameForOperation(operationType string) string { + if dataset.Status.OperationRef == nil { + return "" + } + + return dataset.Status.OperationRef[operationType] +} + +// LockOperation lock Dataset for operation +func (dataset *Dataset) LockOperation(operationType string, name string) { + if dataset.Status.OperationRef == nil { + dataset.Status.OperationRef = map[string]string{} + } + + dataset.Status.OperationRef[operationType] = name +} + +// ReleaseOperation release Dataset for operation +func (dataset *Dataset) ReleaseOperation(operationType string) { + if dataset.Status.OperationRef == nil { + return + } + + dataset.Status.OperationRef[operationType] = "" +} diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index 29299b762ea..e91c1e808b9 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -1761,7 +1761,7 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetStatus(ref common.Refere }, "dataLoadRef": { SchemaProps: spec.SchemaProps{ - Description: "DataLoadRef specifies the running DataLoad job that targets this Dataset. This is mainly used as a lock to prevent concurrent DataLoad jobs.", + Description: "DataLoadRef specifies the running DataLoad job that targets this Dataset. This is mainly used as a lock to prevent concurrent DataLoad jobs. Deprecated, use OperationRef instead", Type: []string{"string"}, Format: "", }, @@ -1775,11 +1775,27 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DatasetStatus(ref common.Refere }, "dataBackupRef": { SchemaProps: spec.SchemaProps{ - Description: "DataBackupRef specifies the running Backup job that targets this Dataset. This is mainly used as a lock to prevent concurrent DataBackup jobs.", + Description: "DataBackupRef specifies the running Backup job that targets this Dataset. This is mainly used as a lock to prevent concurrent DataBackup jobs. Deprecated, use OperationRef instead", Type: []string{"string"}, Format: "", }, }, + "operationRef": { + SchemaProps: spec.SchemaProps{ + Description: "OperationRef specifies the Operation that targets this Dataset. This is mainly used as a lock to prevent concurrent same Operation jobs.", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, "datasetRef": { SchemaProps: spec.SchemaProps{ Description: "DatasetRef specifies the datasets namespaced name mounting this Dataset.", diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 409918be2d6..153e17c03a0 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -808,6 +808,13 @@ func (in *DatasetStatus) DeepCopyInto(out *DatasetStatus) { *out = new(HCFSStatus) **out = **in } + if in.OperationRef != nil { + in, out := &in.OperationRef, &out.OperationRef + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.DatasetRef != nil { in, out := &in.DatasetRef, &out.DatasetRef *out = make([]string, len(*in)) diff --git a/charts/fluid/fluid/crds/data.fluid.io_datasets.yaml b/charts/fluid/fluid/crds/data.fluid.io_datasets.yaml index 2f65859289d..7d5bb06c388 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_datasets.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_datasets.yaml @@ -405,12 +405,12 @@ spec: dataBackupRef: description: DataBackupRef specifies the running Backup job that targets this Dataset. This is mainly used as a lock to prevent concurrent - DataBackup jobs. + DataBackup jobs. Deprecated, use OperationRef instead type: string dataLoadRef: description: DataLoadRef specifies the running DataLoad job that targets this Dataset. This is mainly used as a lock to prevent concurrent - DataLoad jobs. + DataLoad jobs. Deprecated, use OperationRef instead type: string dataMigrateRef: description: DataMigrateRef specifies the running DataMigrate job @@ -492,6 +492,13 @@ spec: type: boolean type: object type: array + operationRef: + additionalProperties: + type: string + description: OperationRef specifies the Operation that targets this + Dataset. This is mainly used as a lock to prevent concurrent same + Operation jobs. + type: object phase: description: 'Dataset Phase. One of the four phases: `Pending`, `Bound`, `NotBound` and `Failed`' diff --git a/config/crd/bases/data.fluid.io_datasets.yaml b/config/crd/bases/data.fluid.io_datasets.yaml index 2f65859289d..7d5bb06c388 100644 --- a/config/crd/bases/data.fluid.io_datasets.yaml +++ b/config/crd/bases/data.fluid.io_datasets.yaml @@ -405,12 +405,12 @@ spec: dataBackupRef: description: DataBackupRef specifies the running Backup job that targets this Dataset. This is mainly used as a lock to prevent concurrent - DataBackup jobs. + DataBackup jobs. Deprecated, use OperationRef instead type: string dataLoadRef: description: DataLoadRef specifies the running DataLoad job that targets this Dataset. This is mainly used as a lock to prevent concurrent - DataLoad jobs. + DataLoad jobs. Deprecated, use OperationRef instead type: string dataMigrateRef: description: DataMigrateRef specifies the running DataMigrate job @@ -492,6 +492,13 @@ spec: type: boolean type: object type: array + operationRef: + additionalProperties: + type: string + description: OperationRef specifies the Operation that targets this + Dataset. This is mainly used as a lock to prevent concurrent same + Operation jobs. + type: object phase: description: 'Dataset Phase. One of the four phases: `Pending`, `Bound`, `NotBound` and `Failed`' diff --git a/pkg/controllers/v1alpha1/databackup/databackup_controller.go b/pkg/controllers/v1alpha1/databackup/databackup_controller.go index ce2dd061a08..a7e00023147 100644 --- a/pkg/controllers/v1alpha1/databackup/databackup_controller.go +++ b/pkg/controllers/v1alpha1/databackup/databackup_controller.go @@ -253,12 +253,14 @@ func (r *DataBackupReconciler) releaseLockOnTargetDataset(ctx reconcileRequestCo // other error return err } - if dataset.Status.DataBackupRef != utils.GetDataBackupRef(ctx.DataBackup.Name, ctx.DataBackup.Namespace) { - log.Info("Found DataBackuRef inconsistent with the reconciling DataBack, won't release this lock, ignore it", "DataBackupRef", dataset.Status.DataBackupRef) + currentRef := dataset.GetLockedNameForOperation(cdatabackup.DataBackupLockName) + + if currentRef != utils.GetDataBackupRef(ctx.DataBackup.Name, ctx.DataBackup.Namespace) { + log.Info("Found DataBackupRef inconsistent with the reconciling DataBack, won't release this lock, ignore it", "DataBackupRef", currentRef) return nil } datasetToUpdate := dataset.DeepCopy() - datasetToUpdate.Status.DataBackupRef = "" + datasetToUpdate.ReleaseOperation(cdatabackup.DataBackupLockName) if !reflect.DeepEqual(datasetToUpdate.Status, dataset) { if err := r.Status().Update(ctx, datasetToUpdate); err != nil { return err diff --git a/pkg/controllers/v1alpha1/databackup/implement.go b/pkg/controllers/v1alpha1/databackup/implement.go index 06cca82c7c5..cf228798d1f 100644 --- a/pkg/controllers/v1alpha1/databackup/implement.go +++ b/pkg/controllers/v1alpha1/databackup/implement.go @@ -101,7 +101,7 @@ func (r *DataBackupReconcilerImplement) reconcilePendingDataBackup(ctx reconcile log := ctx.Log.WithName("reconcilePendingDataBackup") targetDataset := ctx.Dataset // 1. Check if there's any Backuping pods(conflict DataBackup) - conflictDataBackupRef := targetDataset.Status.DataBackupRef + conflictDataBackupRef := targetDataset.GetLockedNameForOperation(cdatabackup.DataBackupLockName) myDataBackupRef := utils.GetDataBackupRef(ctx.DataBackup.Name, ctx.DataBackup.Namespace) if len(conflictDataBackupRef) != 0 && conflictDataBackupRef != myDataBackupRef { log.V(1).Info("Found other DataBackups that is in Executing phase, will backoff", "other DataBackup", conflictDataBackupRef) @@ -186,7 +186,7 @@ func (r *DataBackupReconcilerImplement) reconcilePendingDataBackup(ctx reconcile // the losers not need to backup again log.Info("No conflicts detected, try to lock the target dataset") datasetToUpdate := targetDataset.DeepCopy() - datasetToUpdate.Status.DataBackupRef = myDataBackupRef + datasetToUpdate.LockOperation(cdatabackup.DataBackupLockName, myDataBackupRef) if !reflect.DeepEqual(targetDataset.Status, datasetToUpdate.Status) { if err := r.Client.Status().Update(context.TODO(), datasetToUpdate); err != nil { log.V(1).Info("fail to get target dataset's lock, will requeue") diff --git a/pkg/controllers/v1alpha1/dataload/implement.go b/pkg/controllers/v1alpha1/dataload/implement.go index adf007dd50a..dda52787a8b 100644 --- a/pkg/controllers/v1alpha1/dataload/implement.go +++ b/pkg/controllers/v1alpha1/dataload/implement.go @@ -189,7 +189,7 @@ func (r *DataLoadReconcilerImplement) reconcilePendingDataLoad(ctx cruntime.Reco log.V(1).Info("get target dataset", "targetDataset", targetDataset) // 3. Check if there's any Executing DataLoad jobs(conflict DataLoad) - conflictDataLoadRef := targetDataset.Status.DataLoadRef + conflictDataLoadRef := targetDataset.GetLockedNameForOperation(cdataload.DataLoadLockName) myDataLoadRef := utils.GetDataLoadRef(targetDataload.Name, targetDataload.Namespace) if len(conflictDataLoadRef) != 0 && conflictDataLoadRef != myDataLoadRef { log.V(1).Info("Found other DataLoads that is in Executing phase, will backoff", "other DataLoad", conflictDataLoadRef) @@ -216,7 +216,8 @@ func (r *DataLoadReconcilerImplement) reconcilePendingDataLoad(ctx cruntime.Reco // the losers have to requeue and go through the whole reconciliation loop. log.Info("No conflicts detected, try to lock the target dataset") datasetToUpdate := targetDataset.DeepCopy() - datasetToUpdate.Status.DataLoadRef = myDataLoadRef + datasetToUpdate.LockOperation(cdataload.DataLoadLockName, myDataLoadRef) + if !reflect.DeepEqual(targetDataset.Status, datasetToUpdate.Status) { if err = r.Client.Status().Update(context.TODO(), datasetToUpdate); err != nil { log.V(1).Info("fail to get target dataset's lock, will requeue") @@ -367,12 +368,14 @@ func (r *DataLoadReconcilerImplement) releaseLockOnTargetDataset(ctx cruntime.Re // other error return err } - if dataset.Status.DataLoadRef != utils.GetDataLoadRef(targetDataload.Name, targetDataload.Namespace) { - log.Info("Found DataLoadRef inconsistent with the reconciling DataLoad, won't release this lock, ignore it", "DataLoadRef", dataset.Status.DataLoadRef) + + currentRef := dataset.GetLockedNameForOperation(cdataload.DataLoadLockName) + if currentRef != utils.GetDataLoadRef(targetDataload.Name, targetDataload.Namespace) { + log.Info("Found DataLoadRef inconsistent with the reconciling DataLoad, won't release this lock, ignore it", "DataLoadRef", currentRef) return nil } datasetToUpdate := dataset.DeepCopy() - datasetToUpdate.Status.DataLoadRef = "" + datasetToUpdate.ReleaseOperation(cdataload.DataLoadLockName) if !reflect.DeepEqual(datasetToUpdate.Status, dataset) { if err := r.Status().Update(ctx, datasetToUpdate); err != nil { return err diff --git a/pkg/databackup/constants.go b/pkg/databackup/constants.go index 5daf741452c..03ec37b7e3c 100644 --- a/pkg/databackup/constants.go +++ b/pkg/databackup/constants.go @@ -17,6 +17,8 @@ limitations under the License. package databackup const ( + DataBackupLockName = "DataBackup" + Finalizer = "fluid-databackup-controller-finalizer" AlluxioBackupPathPod = "/alluxio_backups" GooseFSBackupPathPod = "/goosefs_backups" diff --git a/pkg/dataload/constants.go b/pkg/dataload/constants.go index 953772b1467..673d49665a1 100644 --- a/pkg/dataload/constants.go +++ b/pkg/dataload/constants.go @@ -16,6 +16,8 @@ limitations under the License. package dataload const ( + DataLoadLockName = "DataLoad" + DataloadFinalizer = "fluid-dataload-controller-finalizer" DataloadChart = "fluid-dataloader" DataloadDefaultImage = "registry.cn-hangzhou.aliyuncs.com/fluid/fluid-dataloader" diff --git a/pkg/utils/databackup.go b/pkg/utils/databackup.go index 073051d3d5c..5886ad877ae 100644 --- a/pkg/utils/databackup.go +++ b/pkg/utils/databackup.go @@ -16,7 +16,8 @@ import ( // GetDataBackupRef returns the identity of the Backup by combining its namespace and name. // The identity is used for identifying current lock holder on the target dataset. func GetDataBackupRef(name, namespace string) string { - return fmt.Sprintf("%s-%s", namespace, name) + // namespace may contain '-', use '/' as separator + return fmt.Sprintf("%s/%s", namespace, name) } // GetDataBackup gets the DataBackup given its name and namespace diff --git a/pkg/utils/databackup_test.go b/pkg/utils/databackup_test.go index 77f9941624c..b17db2d5e86 100644 --- a/pkg/utils/databackup_test.go +++ b/pkg/utils/databackup_test.go @@ -265,7 +265,7 @@ func TestGetDataBackupRef(t *testing.T) { name: "test", namespace: "default", }, - want: "default-test", + want: "default/test", }, } for _, tt := range tests { diff --git a/pkg/utils/dataload.go b/pkg/utils/dataload.go index fd095e8f612..bc3664f292d 100644 --- a/pkg/utils/dataload.go +++ b/pkg/utils/dataload.go @@ -48,5 +48,6 @@ func GetDataLoadJobName(releaseName string) string { // GetDataLoadRef returns the identity of the DataLoad by combining its namespace and name. // The identity is used for identifying current lock holder on the target dataset. func GetDataLoadRef(name, namespace string) string { - return fmt.Sprintf("%s-%s", namespace, name) + // namespace may contain '-', use '/' as separator + return fmt.Sprintf("%s/%s", namespace, name) } diff --git a/pkg/utils/dataload_test.go b/pkg/utils/dataload_test.go index 3feedb704c7..375c432a56b 100644 --- a/pkg/utils/dataload_test.go +++ b/pkg/utils/dataload_test.go @@ -168,7 +168,7 @@ func TestGetDataLoadRef(t *testing.T) { name: "hbase", namespace: "default", }, - want: "default-hbase", + want: "default/hbase", }, } for _, tt := range tests {