Skip to content

Commit

Permalink
Use OperationRef instead of DataloadRef and DataBackupRef (#2637)
Browse files Browse the repository at this point in the history
* Use OperationRef instead of DataloadRef and DataBackupRef

Signed-off-by: xliuqq <xlzq1992@gmail.com>

* renaming constants names

Signed-off-by: xliuqq <xlzq1992@gmail.com>

---------

Signed-off-by: xliuqq <xlzq1992@gmail.com>
  • Loading branch information
xliuqq authored Mar 1, 2023
1 parent 2c134c8 commit cac476a
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 20 deletions.
33 changes: 33 additions & 0 deletions api/v1alpha1/dataset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ type DatasetStatus struct {

// DataLoadRef specifies the running DataLoad job that targets this Dataset.
// This is mainly used as a lock to prevent concurrent DataLoad jobs.
// Deprecated, use OperationRef instead
DataLoadRef string `json:"dataLoadRef,omitempty"`

// DataMigrateRef specifies the running DataMigrate job that targets this Dataset.
Expand All @@ -219,8 +220,13 @@ type DatasetStatus struct {

// DataBackupRef specifies the running Backup job that targets this Dataset.
// This is mainly used as a lock to prevent concurrent DataBackup jobs.
// Deprecated, use OperationRef instead
DataBackupRef string `json:"dataBackupRef,omitempty"`

// OperationRef specifies the Operation that targets this Dataset.
// This is mainly used as a lock to prevent concurrent same Operation jobs.
OperationRef map[string]string `json:"operationRef,omitempty"`

// DatasetRef specifies the datasets namespaced name mounting this Dataset.
DatasetRef []string `json:"datasetRef,omitempty"`
}
Expand Down Expand Up @@ -331,3 +337,30 @@ func (dataset *Dataset) CanbeBound(name string, namespace string, category commo
func (dataset *Dataset) IsExclusiveMode() bool {
return dataset.Spec.PlacementMode == DefaultMode || dataset.Spec.PlacementMode == ExclusiveMode
}

// GetLockedNameForOperation get the name of operation for certain type running on this dataset, otherwise return empty string
func (dataset *Dataset) GetLockedNameForOperation(operationType string) string {
if dataset.Status.OperationRef == nil {
return ""
}

return dataset.Status.OperationRef[operationType]
}

// LockOperation lock Dataset for operation
func (dataset *Dataset) LockOperation(operationType string, name string) {
if dataset.Status.OperationRef == nil {
dataset.Status.OperationRef = map[string]string{}
}

dataset.Status.OperationRef[operationType] = name
}

// ReleaseOperation release Dataset for operation
func (dataset *Dataset) ReleaseOperation(operationType string) {
if dataset.Status.OperationRef == nil {
return
}

dataset.Status.OperationRef[operationType] = ""
}
20 changes: 18 additions & 2 deletions api/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions charts/fluid/fluid/crds/data.fluid.io_datasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ spec:
dataBackupRef:
description: DataBackupRef specifies the running Backup job that targets
this Dataset. This is mainly used as a lock to prevent concurrent
DataBackup jobs.
DataBackup jobs. Deprecated, use OperationRef instead
type: string
dataLoadRef:
description: DataLoadRef specifies the running DataLoad job that targets
this Dataset. This is mainly used as a lock to prevent concurrent
DataLoad jobs.
DataLoad jobs. Deprecated, use OperationRef instead
type: string
dataMigrateRef:
description: DataMigrateRef specifies the running DataMigrate job
Expand Down Expand Up @@ -492,6 +492,13 @@ spec:
type: boolean
type: object
type: array
operationRef:
additionalProperties:
type: string
description: OperationRef specifies the Operation that targets this
Dataset. This is mainly used as a lock to prevent concurrent same
Operation jobs.
type: object
phase:
description: 'Dataset Phase. One of the four phases: `Pending`, `Bound`,
`NotBound` and `Failed`'
Expand Down
11 changes: 9 additions & 2 deletions config/crd/bases/data.fluid.io_datasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ spec:
dataBackupRef:
description: DataBackupRef specifies the running Backup job that targets
this Dataset. This is mainly used as a lock to prevent concurrent
DataBackup jobs.
DataBackup jobs. Deprecated, use OperationRef instead
type: string
dataLoadRef:
description: DataLoadRef specifies the running DataLoad job that targets
this Dataset. This is mainly used as a lock to prevent concurrent
DataLoad jobs.
DataLoad jobs. Deprecated, use OperationRef instead
type: string
dataMigrateRef:
description: DataMigrateRef specifies the running DataMigrate job
Expand Down Expand Up @@ -492,6 +492,13 @@ spec:
type: boolean
type: object
type: array
operationRef:
additionalProperties:
type: string
description: OperationRef specifies the Operation that targets this
Dataset. This is mainly used as a lock to prevent concurrent same
Operation jobs.
type: object
phase:
description: 'Dataset Phase. One of the four phases: `Pending`, `Bound`,
`NotBound` and `Failed`'
Expand Down
8 changes: 5 additions & 3 deletions pkg/controllers/v1alpha1/databackup/databackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ func (r *DataBackupReconciler) releaseLockOnTargetDataset(ctx reconcileRequestCo
// other error
return err
}
if dataset.Status.DataBackupRef != utils.GetDataBackupRef(ctx.DataBackup.Name, ctx.DataBackup.Namespace) {
log.Info("Found DataBackuRef inconsistent with the reconciling DataBack, won't release this lock, ignore it", "DataBackupRef", dataset.Status.DataBackupRef)
currentRef := dataset.GetLockedNameForOperation(cdatabackup.DataBackupLockName)

if currentRef != utils.GetDataBackupRef(ctx.DataBackup.Name, ctx.DataBackup.Namespace) {
log.Info("Found DataBackupRef inconsistent with the reconciling DataBack, won't release this lock, ignore it", "DataBackupRef", currentRef)
return nil
}
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.DataBackupRef = ""
datasetToUpdate.ReleaseOperation(cdatabackup.DataBackupLockName)
if !reflect.DeepEqual(datasetToUpdate.Status, dataset) {
if err := r.Status().Update(ctx, datasetToUpdate); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/v1alpha1/databackup/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *DataBackupReconcilerImplement) reconcilePendingDataBackup(ctx reconcile
log := ctx.Log.WithName("reconcilePendingDataBackup")
targetDataset := ctx.Dataset
// 1. Check if there's any Backuping pods(conflict DataBackup)
conflictDataBackupRef := targetDataset.Status.DataBackupRef
conflictDataBackupRef := targetDataset.GetLockedNameForOperation(cdatabackup.DataBackupLockName)
myDataBackupRef := utils.GetDataBackupRef(ctx.DataBackup.Name, ctx.DataBackup.Namespace)
if len(conflictDataBackupRef) != 0 && conflictDataBackupRef != myDataBackupRef {
log.V(1).Info("Found other DataBackups that is in Executing phase, will backoff", "other DataBackup", conflictDataBackupRef)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (r *DataBackupReconcilerImplement) reconcilePendingDataBackup(ctx reconcile
// the losers not need to backup again
log.Info("No conflicts detected, try to lock the target dataset")
datasetToUpdate := targetDataset.DeepCopy()
datasetToUpdate.Status.DataBackupRef = myDataBackupRef
datasetToUpdate.LockOperation(cdatabackup.DataBackupLockName, myDataBackupRef)
if !reflect.DeepEqual(targetDataset.Status, datasetToUpdate.Status) {
if err := r.Client.Status().Update(context.TODO(), datasetToUpdate); err != nil {
log.V(1).Info("fail to get target dataset's lock, will requeue")
Expand Down
13 changes: 8 additions & 5 deletions pkg/controllers/v1alpha1/dataload/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (r *DataLoadReconcilerImplement) reconcilePendingDataLoad(ctx cruntime.Reco
log.V(1).Info("get target dataset", "targetDataset", targetDataset)

// 3. Check if there's any Executing DataLoad jobs(conflict DataLoad)
conflictDataLoadRef := targetDataset.Status.DataLoadRef
conflictDataLoadRef := targetDataset.GetLockedNameForOperation(cdataload.DataLoadLockName)
myDataLoadRef := utils.GetDataLoadRef(targetDataload.Name, targetDataload.Namespace)
if len(conflictDataLoadRef) != 0 && conflictDataLoadRef != myDataLoadRef {
log.V(1).Info("Found other DataLoads that is in Executing phase, will backoff", "other DataLoad", conflictDataLoadRef)
Expand All @@ -216,7 +216,8 @@ func (r *DataLoadReconcilerImplement) reconcilePendingDataLoad(ctx cruntime.Reco
// the losers have to requeue and go through the whole reconciliation loop.
log.Info("No conflicts detected, try to lock the target dataset")
datasetToUpdate := targetDataset.DeepCopy()
datasetToUpdate.Status.DataLoadRef = myDataLoadRef
datasetToUpdate.LockOperation(cdataload.DataLoadLockName, myDataLoadRef)

if !reflect.DeepEqual(targetDataset.Status, datasetToUpdate.Status) {
if err = r.Client.Status().Update(context.TODO(), datasetToUpdate); err != nil {
log.V(1).Info("fail to get target dataset's lock, will requeue")
Expand Down Expand Up @@ -367,12 +368,14 @@ func (r *DataLoadReconcilerImplement) releaseLockOnTargetDataset(ctx cruntime.Re
// other error
return err
}
if dataset.Status.DataLoadRef != utils.GetDataLoadRef(targetDataload.Name, targetDataload.Namespace) {
log.Info("Found DataLoadRef inconsistent with the reconciling DataLoad, won't release this lock, ignore it", "DataLoadRef", dataset.Status.DataLoadRef)

currentRef := dataset.GetLockedNameForOperation(cdataload.DataLoadLockName)
if currentRef != utils.GetDataLoadRef(targetDataload.Name, targetDataload.Namespace) {
log.Info("Found DataLoadRef inconsistent with the reconciling DataLoad, won't release this lock, ignore it", "DataLoadRef", currentRef)
return nil
}
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Status.DataLoadRef = ""
datasetToUpdate.ReleaseOperation(cdataload.DataLoadLockName)
if !reflect.DeepEqual(datasetToUpdate.Status, dataset) {
if err := r.Status().Update(ctx, datasetToUpdate); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/databackup/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package databackup

const (
DataBackupLockName = "DataBackup"

Finalizer = "fluid-databackup-controller-finalizer"
AlluxioBackupPathPod = "/alluxio_backups"
GooseFSBackupPathPod = "/goosefs_backups"
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataload/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
package dataload

const (
DataLoadLockName = "DataLoad"

DataloadFinalizer = "fluid-dataload-controller-finalizer"
DataloadChart = "fluid-dataloader"
DataloadDefaultImage = "registry.cn-hangzhou.aliyuncs.com/fluid/fluid-dataloader"
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/databackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
// GetDataBackupRef returns the identity of the Backup by combining its namespace and name.
// The identity is used for identifying current lock holder on the target dataset.
func GetDataBackupRef(name, namespace string) string {
return fmt.Sprintf("%s-%s", namespace, name)
// namespace may contain '-', use '/' as separator
return fmt.Sprintf("%s/%s", namespace, name)
}

// GetDataBackup gets the DataBackup given its name and namespace
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/databackup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestGetDataBackupRef(t *testing.T) {
name: "test",
namespace: "default",
},
want: "default-test",
want: "default/test",
},
}
for _, tt := range tests {
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/dataload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ func GetDataLoadJobName(releaseName string) string {
// GetDataLoadRef returns the identity of the DataLoad by combining its namespace and name.
// The identity is used for identifying current lock holder on the target dataset.
func GetDataLoadRef(name, namespace string) string {
return fmt.Sprintf("%s-%s", namespace, name)
// namespace may contain '-', use '/' as separator
return fmt.Sprintf("%s/%s", namespace, name)
}
2 changes: 1 addition & 1 deletion pkg/utils/dataload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestGetDataLoadRef(t *testing.T) {
name: "hbase",
namespace: "default",
},
want: "default-hbase",
want: "default/hbase",
},
}
for _, tt := range tests {
Expand Down

0 comments on commit cac476a

Please sign in to comment.