Skip to content

Commit

Permalink
feat: support pitr (apecloud#6779)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Mar 8, 2024
1 parent 6a563a4 commit e5a3019
Show file tree
Hide file tree
Showing 12 changed files with 761 additions and 105 deletions.
64 changes: 2 additions & 62 deletions controllers/apps/transformer_component_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ import (
const (
// componentPhaseTransition the event reason indicates that the component transits to a new phase.
componentPhaseTransition = "ComponentPhaseTransition"

// podContainerFailedTimeout the timeout for container of pod failures, the component phase will be set to Failed/Abnormal after this time.
podContainerFailedTimeout = 10 * time.Second

// podScheduledFailedTimeout timeout for scheduling failure.
podScheduledFailedTimeout = 30 * time.Second
)

// componentStatusTransformer computes the current status: read the underlying rsm status and update the component status
Expand Down Expand Up @@ -569,7 +563,7 @@ func hasFailedAndTimedOutPod(pods []*corev1.Pod) (bool, appsv1alpha1.ComponentMe
requeueAfter time.Duration
)
for _, pod := range pods {
isFailed, isTimedOut, messageStr := isPodFailedAndTimedOut(pod)
isFailed, isTimedOut, messageStr := intctrlutil.IsPodFailedAndTimedOut(pod)
if !isFailed {
continue
}
Expand All @@ -581,65 +575,11 @@ func hasFailedAndTimedOutPod(pods []*corev1.Pod) (bool, appsv1alpha1.ComponentMe
}
}
if hasFailedPod && !hasTimedOutPod {
requeueAfter = podContainerFailedTimeout
requeueAfter = intctrlutil.PodContainerFailedTimeout
}
return hasTimedOutPod, messages, requeueAfter
}

// isPodFailedAndTimedOut checks if the pod is failed and timed out.
func isPodFailedAndTimedOut(pod *corev1.Pod) (bool, bool, string) {
if isFailed, isTimedOut, message := isPodScheduledFailedAndTimedOut(pod); isFailed {
return isFailed, isTimedOut, message
}
initContainerFailed, message := isAnyContainerFailed(pod.Status.InitContainerStatuses)
if initContainerFailed {
return initContainerFailed, isContainerFailedAndTimedOut(pod, corev1.PodInitialized), message
}
containerFailed, message := isAnyContainerFailed(pod.Status.ContainerStatuses)
if containerFailed {
return containerFailed, isContainerFailedAndTimedOut(pod, corev1.ContainersReady), message
}
return false, false, ""
}

// isPodScheduledFailedAndTimedOut checks whether the unscheduled pod has timed out.
func isPodScheduledFailedAndTimedOut(pod *corev1.Pod) (bool, bool, string) {
for _, cond := range pod.Status.Conditions {
if cond.Type != corev1.PodScheduled {
continue
}
if cond.Status == corev1.ConditionTrue {
return false, false, ""
}
return true, time.Now().After(cond.LastTransitionTime.Add(podScheduledFailedTimeout)), cond.Message
}
return false, false, ""
}

// isAnyContainerFailed checks whether any container in the list is failed.
func isAnyContainerFailed(containersStatus []corev1.ContainerStatus) (bool, string) {
for _, v := range containersStatus {
waitingState := v.State.Waiting
if waitingState != nil && waitingState.Message != "" {
return true, waitingState.Message
}
terminatedState := v.State.Terminated
if terminatedState != nil && terminatedState.Message != "" {
return true, terminatedState.Message
}
}
return false, ""
}

// isContainerFailedAndTimedOut checks whether the failed container has timed out.
func isContainerFailedAndTimedOut(pod *corev1.Pod, podConditionType corev1.PodConditionType) bool {
containerReadyCondition := intctrlutil.GetPodCondition(&pod.Status, podConditionType)
if containerReadyCondition == nil || containerReadyCondition.LastTransitionTime.IsZero() {
return false
}
return time.Now().After(containerReadyCondition.LastTransitionTime.Add(podContainerFailedTimeout))
}

// newComponentStatusHandler creates a new componentStatusHandler
func newComponentStatusHandler(reqCtx intctrlutil.RequestCtx,
cli client.Client,
Expand Down
136 changes: 116 additions & 20 deletions controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

vsv1beta1 "github.com/kubernetes-csi/external-snapshotter/client/v3/apis/volumesnapshot/v1beta1"
vsv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -51,7 +52,6 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/dataprotection/action"
dpbackup "github.com/apecloud/kubeblocks/pkg/dataprotection/backup"
dperrors "github.com/apecloud/kubeblocks/pkg/dataprotection/errors"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
dputils "github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils/boolptr"
Expand Down Expand Up @@ -119,6 +119,11 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return r.handleCompletedPhase(reqCtx, backup)
case dpv1alpha1.BackupPhaseDeleting:
return r.handleDeletingPhase(reqCtx, backup)
case dpv1alpha1.BackupPhaseFailed:
if backup.Labels[dptypes.BackupTypeLabelKey] == string(dpv1alpha1.BackupTypeContinuous) {
return r.handleRunningPhase(reqCtx, backup)
}
return intctrlutil.Reconciled()
default:
return intctrlutil.Reconciled()
}
Expand All @@ -131,7 +136,9 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(maxConcurDataProtectionReconKey),
}).
Owns(&appsv1.StatefulSet{}).
Owns(&batchv1.Job{}).
Watches(&corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(r.filterBackupPods)).
Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(r.parseBackupJob))

if dputils.SupportsVolumeSnapshotV1() {
Expand All @@ -142,6 +149,30 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return b.Complete(r)
}

func (r *BackupReconciler) filterBackupPods(ctx context.Context, obj client.Object) []reconcile.Request {
var requests []reconcile.Request
labels := obj.GetLabels()
if v, ok := labels[constant.AppManagedByLabelKey]; !ok || v != constant.AppName {
return requests
}
backupName, ok := labels[dptypes.BackupNameLabelKey]
if !ok {
return requests
}
for _, v := range obj.GetOwnerReferences() {
if v.Kind == constant.StatefulSetKind && v.Name == backupName {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: backupName,
},
})
break
}
}
return requests
}

func (r *BackupReconciler) parseBackupJob(_ context.Context, object client.Object) []reconcile.Request {
job := object.(*batchv1.Job)
var requests []reconcile.Request
Expand Down Expand Up @@ -230,9 +261,6 @@ func (r *BackupReconciler) handleNewPhase(
backup *dpv1alpha1.Backup) (ctrl.Result, error) {
request, err := r.prepareBackupRequest(reqCtx, backup)
if err != nil {
if intctrlutil.IsTargetError(err, dperrors.ErrorTypeWaitForExternalHandler) {
return RecorderEventAndRequeue(reqCtx, r.Recorder, backup, err)
}
return r.updateStatusIfFailed(reqCtx, backup.DeepCopy(), backup, err)
}

Expand Down Expand Up @@ -294,22 +322,10 @@ func (r *BackupReconciler) prepareBackupRequest(
actionSet, err := dputils.GetActionSetByName(reqCtx, r.Client, backupMethod.ActionSetName)
if err != nil {
return nil, err
} else if actionSet.Spec.BackupType != dpv1alpha1.BackupTypeFull {
// TODO: refactor it if supports other backup type.
return nil, intctrlutil.NewErrorf(dperrors.ErrorTypeWaitForExternalHandler,
`wait for external handler to handle this backup type "%s"`, actionSet.Spec.BackupType)
}
request.ActionSet = actionSet
}

if !snapshotVolumes && backupPolicy.Spec.UseKopia {
acked := backup.Annotations[dptypes.GeminiAcknowledgedAnnotationKey]
if acked != trueVal {
return nil, intctrlutil.NewErrorf(dperrors.ErrorTypeWaitForExternalHandler,
`wait for external handler to handle this backup because policy.spec.useKopia is true`)
}
}

// check encryption config
if backupPolicy.Spec.EncryptionConfig != nil {
secretKeyRef := backupPolicy.Spec.EncryptionConfig.PassPhraseSecretKeyRef
Expand Down Expand Up @@ -399,11 +415,16 @@ func (r *BackupReconciler) handleRunningPhase(
backup *dpv1alpha1.Backup) (ctrl.Result, error) {
request, err := r.prepareBackupRequest(reqCtx, backup)
if err != nil {
// external controller is already processing it, only mark reconciled
if intctrlutil.IsTargetError(err, dperrors.ErrorTypeWaitForExternalHandler) {
return r.updateStatusIfFailed(reqCtx, backup.DeepCopy(), backup, err)
}

if request.ActionSet != nil && request.ActionSet.Spec.BackupType == dpv1alpha1.BackupTypeContinuous {
// check if the continuous backup is completed.
if completed, err := r.checkIsCompletedDuringRunning(reqCtx, request); err != nil {
return RecorderEventAndRequeue(reqCtx, r.Recorder, backup, err)
} else if completed {
return intctrlutil.Reconciled()
}
return r.updateStatusIfFailed(reqCtx, backup.DeepCopy(), backup, err)
}

// there are actions not completed, continue to handle following actions
Expand Down Expand Up @@ -473,6 +494,55 @@ func (r *BackupReconciler) handleRunningPhase(
return intctrlutil.Reconciled()
}

// checkIsCompletedDuringRunning when continuous schedule is disabled or cluster has been deleted,
// backup phase should be Completed.
func (r *BackupReconciler) checkIsCompletedDuringRunning(reqCtx intctrlutil.RequestCtx,
request *dpbackup.Request) (bool, error) {
backupScheduleList := &dpv1alpha1.BackupScheduleList{}
if err := r.Client.List(reqCtx.Ctx, backupScheduleList, client.MatchingLabels{
dptypes.BackupPolicyLabelKey: request.Backup.Spec.BackupPolicyName,
}); err != nil {
return false, err
}
var (
enabled *bool
targetClusterExists = true
)
// check if Continuous backupMethod is enabled
for _, v := range backupScheduleList.Items {
for _, method := range v.Spec.Schedules {
if method.BackupMethod == request.Spec.BackupMethod {
enabled = method.Enabled
break
}
}
}
// check if target cluster exits
clusterName := request.Labels[constant.AppInstanceLabelKey]
if clusterName != "" {
cluster := &appsv1alpha1.Cluster{}
var err error
targetClusterExists, err = intctrlutil.CheckResourceExists(reqCtx.Ctx, r.Client,
client.ObjectKey{Name: clusterName, Namespace: request.Namespace}, cluster)
if err != nil {
return false, err
}
}
if boolptr.IsSetToTrue(enabled) && targetClusterExists {
return false, nil
}
patch := client.MergeFrom(request.Backup.DeepCopy())
request.Status.Phase = dpv1alpha1.BackupPhaseCompleted
request.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now().UTC()}
_ = dpbackup.SetExpirationByCreationTime(request.Backup)
if !request.Status.StartTimestamp.IsZero() {
// round the duration to a multiple of seconds.
duration := request.Status.CompletionTimestamp.Sub(request.Status.StartTimestamp.Time).Round(time.Second)
request.Status.Duration = &metav1.Duration{Duration: duration}
}
return true, r.Client.Status().Patch(reqCtx.Ctx, request.Backup, patch)
}

// handleCompletedPhase handles the backup object in completed phase.
// It will delete the reference workloads.
func (r *BackupReconciler) handleCompletedPhase(
Expand All @@ -490,6 +560,9 @@ func (r *BackupReconciler) updateStatusIfFailed(
original *dpv1alpha1.Backup,
backup *dpv1alpha1.Backup,
err error) (ctrl.Result, error) {
if intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeRequeue) {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
sendWarningEventForError(r.Recorder, backup, err)
backup.Status.Phase = dpv1alpha1.BackupPhaseFailed
backup.Status.FailureReason = err.Error()
Expand Down Expand Up @@ -522,11 +595,34 @@ func (r *BackupReconciler) deleteVolumeSnapshots(reqCtx intctrlutil.RequestCtx,
return deleter.DeleteVolumeSnapshots(backup)
}

// deleteExternalStatefulSet deletes the external statefulSet.
func (r *BackupReconciler) deleteExternalStatefulSet(reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
key := client.ObjectKey{
Namespace: backup.Namespace,
Name: backup.Name,
}
sts := &appsv1.StatefulSet{}
if err := r.Client.Get(reqCtx.Ctx, key, sts); err != nil {
return client.IgnoreNotFound(err)
}

patch := client.MergeFrom(sts.DeepCopy())
controllerutil.RemoveFinalizer(sts, dptypes.DataProtectionFinalizerName)
if err := r.Client.Patch(reqCtx.Ctx, sts, patch); err != nil {
return err
}
reqCtx.Log.V(1).Info("delete statefulSet", "statefulSet", sts)
return intctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, sts)
}

// deleteExternalResources deletes the external workloads that execute backup.
// Currently, it only supports two types of workloads: job.
func (r *BackupReconciler) deleteExternalResources(
reqCtx intctrlutil.RequestCtx, backup *dpv1alpha1.Backup) error {
return r.deleteExternalJobs(reqCtx, backup)
if err := r.deleteExternalJobs(reqCtx, backup); err != nil {
return err
}
return r.deleteExternalStatefulSet(reqCtx, backup)
}

// PatchBackupObjectMeta patches backup object metaObject include cluster snapshot.
Expand Down
3 changes: 3 additions & 0 deletions controllers/dataprotection/backupschedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (r *BackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}

if err = r.handleSchedule(reqCtx, backupSchedule); err != nil {
if intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeRequeue) {
return intctrlutil.RequeueAfter(reconcileInterval, reqCtx.Log, "")
}
return r.patchStatusFailed(reqCtx, backupSchedule, "HandleBackupScheduleFailed", err)
}

Expand Down
11 changes: 4 additions & 7 deletions controllers/dataprotection/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dperrors "github.com/apecloud/kubeblocks/pkg/dataprotection/errors"
dprestore "github.com/apecloud/kubeblocks/pkg/dataprotection/restore"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -88,7 +87,7 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
case "":
return r.newAction(reqCtx, restore)
case dpv1alpha1.RestorePhaseRunning:
return r.inProgressAction(reqCtx, restore)
return r.handleRunningPhase(reqCtx, restore)
case dpv1alpha1.RestorePhaseCompleted:
if err = r.deleteExternalResources(reqCtx, restore); err != nil {
return intctrlutil.RequeueWithError(err, reqCtx.Log, "")
Expand Down Expand Up @@ -170,14 +169,10 @@ func (r *RestoreReconciler) newAction(reqCtx intctrlutil.RequestCtx, restore *dp
return intctrlutil.Reconciled()
}

func (r *RestoreReconciler) inProgressAction(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) (ctrl.Result, error) {
func (r *RestoreReconciler) handleRunningPhase(reqCtx intctrlutil.RequestCtx, restore *dpv1alpha1.Restore) (ctrl.Result, error) {
restoreMgr := dprestore.NewRestoreManager(restore, r.Recorder, r.Scheme)
// validate if the restore.spec is valid and build restore manager.
err := r.validateAndBuildMGR(reqCtx, restoreMgr)
// skip processing for ErrorTypeWaitForExternalHandler when Restore is Running
if intctrlutil.IsTargetError(err, dperrors.ErrorTypeWaitForExternalHandler) {
return intctrlutil.Reconciled()
}
if err == nil {
saName := restore.Spec.ServiceAccountName
if saName == "" {
Expand Down Expand Up @@ -209,6 +204,7 @@ func (r *RestoreReconciler) inProgressAction(reqCtx intctrlutil.RequestCtx, rest
}

func (r *RestoreReconciler) HandleRestoreActions(reqCtx intctrlutil.RequestCtx, restoreMgr *dprestore.RestoreManager) error {
reqCtx.Log.V(1).Info("start to prepare data", "restore", reqCtx.Req.NamespacedName)
// 1. handle the prepareData stage.
isCompleted, err := r.prepareData(reqCtx, restoreMgr)
if err != nil {
Expand All @@ -218,6 +214,7 @@ func (r *RestoreReconciler) HandleRestoreActions(reqCtx intctrlutil.RequestCtx,
if !isCompleted {
return nil
}
reqCtx.Log.V(1).Info("start to restore data after ready", "restore", reqCtx.Req.NamespacedName)
// 2. handle the postReady stage.
isCompleted, err = r.postReady(reqCtx, restoreMgr)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions controllers/dataprotection/volumepopulator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dperrors "github.com/apecloud/kubeblocks/pkg/dataprotection/errors"
dprestore "github.com/apecloud/kubeblocks/pkg/dataprotection/restore"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
Expand Down Expand Up @@ -88,7 +87,7 @@ func (r *VolumePopulatorReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return intctrlutil.RequeueWithError(patchErr, reqCtx.Log, "")
}
return intctrlutil.Reconciled()
} else if intctrlutil.IsTargetError(err, dperrors.ErrorTypeWaitForExternalHandler) && r.ContainPopulatingCondition(pvc) {
} else if r.ContainPopulatingCondition(pvc) {
// ignore the error if external controller handles it.
return intctrlutil.Reconciled()
}
Expand Down
Loading

0 comments on commit e5a3019

Please sign in to comment.