Skip to content

Commit

Permalink
feat: support to specify namespaces that the operator should manage (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf authored Mar 1, 2024
1 parent 3e1aeb1 commit 15b0752
Show file tree
Hide file tree
Showing 34 changed files with 162 additions and 83 deletions.
8 changes: 8 additions & 0 deletions cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func main() {
"The leader election ID prefix for controller manager. "+
"This ID must be unique to controller manager.")

flag.String(constant.ManagedNamespacesFlag, "",
"The namespaces that the operator will manage, multiple namespaces are separated by commas.")

opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -171,6 +174,11 @@ func main() {
os.Exit(1)
}

managedNamespaces := viper.GetString(strings.ReplaceAll(constant.ManagedNamespacesFlag, "-", "_"))
if len(managedNamespaces) > 0 {
setupLog.Info(fmt.Sprintf("managed namespaces: %s", managedNamespaces))
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Expand Down
8 changes: 8 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ func setupFlags() {

flag.String(kubeContextsFlagKey.String(), "", "Kube contexts the manager will talk to.")

flag.String(constant.ManagedNamespacesFlag, "",
"The namespaces that the operator will manage, multiple namespaces are separated by commas.")

opts := zap.Options{
Development: false,
}
Expand Down Expand Up @@ -257,6 +260,11 @@ func main() {
os.Exit(1)
}

managedNamespaces := viper.GetString(strings.ReplaceAll(constant.ManagedNamespacesFlag, "-", "_"))
if len(managedNamespaces) > 0 {
setupLog.Info(fmt.Sprintf("managed namespaces: %s", managedNamespaces))
}

metricsAddr = viper.GetString(metricsAddrFlagKey.viperName())
probeAddr = viper.GetString(probeAddrFlagKey.viperName())
enableLeaderElection = viper.GetBool(leaderElectFlagKey.viperName())
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/backuppolicytemplate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *BackupPolicyTemplateReconciler) Reconcile(ctx context.Context, req reco

// SetupWithManager sets up the controller with the Manager.
func (r *BackupPolicyTemplateReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.BackupPolicyTemplate{}).
Complete(r)
}
4 changes: 3 additions & 1 deletion controllers/apps/class_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,7 @@ func (r *ComponentClassReconciler) Reconcile(ctx context.Context, req reconcile.

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentClassReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).For(&appsv1alpha1.ComponentClassDefinition{}).Complete(r)
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.ComponentClassDefinition{}).
Complete(r)
}
2 changes: 1 addition & 1 deletion controllers/apps/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Cluster{}).
Owns(&appsv1alpha1.Component{}).
Owns(&corev1.Service{}). // cluster services
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/clusterdefinition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *ClusterDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Re

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.ClusterDefinition{}).
Complete(r)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/clusterversion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (r *ClusterVersionReconciler) Reconcile(ctx context.Context, req ctrl.Reque

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterVersionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.ClusterVersion{}).
Complete(r)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/component_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *ComponentReconciler) SetupWithManager(mgr ctrl.Manager) error {
if retryDurationMS != 0 {
requeueDuration = time.Millisecond * time.Duration(retryDurationMS)
}
b := ctrl.NewControllerManagedBy(mgr).
b := intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Component{}).
Watches(&workloads.ReplicatedStateMachine{}, handler.EnqueueRequestsFromMapFunc(r.filterComponentResources)).
Owns(&corev1.Service{}).
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/componentdefinition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *ComponentDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.

// SetupWithManager sets up the controller with the Manager.
func (r *ComponentDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.ComponentDefinition{}).
Complete(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *ConfigConstraintReconciler) Reconcile(ctx context.Context, req ctrl.Req

// SetupWithManager sets up the controller with the Manager.
func (r *ConfigConstraintReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.ConfigConstraint{}).
// for other resource
Owns(&corev1.ConfigMap{}).
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/configuration/configuration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *ConfigurationReconciler) runTasks(taskCtx TaskContext, tasks []Task) (e

// SetupWithManager sets up the controller with the Manager.
func (r *ConfigurationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Configuration{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/configuration/reconfigure_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *ReconfigureReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// SetupWithManager sets up the controller with the Manager.
func (r *ReconfigureReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&corev1.ConfigMap{}).
WithEventFilter(predicate.NewPredicateFuncs(checkConfigurationObject)).
Complete(r)
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/opsdefinition_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *OpsDefinitionReconciler) updateStatusUnavailable(reqCtx intctrlutil.Req

// SetupWithManager sets up the controller with the Manager.
func (r *OpsDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.OpsDefinition{}).
Complete(r)
}
2 changes: 1 addition & 1 deletion controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (r *OpsRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// SetupWithManager sets up the controller with the Manager.
func (r *OpsRequestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.OpsRequest{}).
Watches(&appsv1alpha1.Cluster{}, handler.EnqueueRequestsFromMapFunc(r.parseFirstOpsRequest)).
Watches(&workloadsv1alpha1.ReplicatedStateMachine{}, handler.EnqueueRequestsFromMapFunc(r.parseFirstOpsRequestForRSM)).
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/servicedescriptor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *ServiceDescriptorReconciler) Reconcile(ctx context.Context, req ctrl.Re

// SetupWithManager sets up the controller with the Manager.
func (r *ServiceDescriptorReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.ServiceDescriptor{}).
Complete(r)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/systemaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (r *SystemAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// SetupWithManager sets up the controller with the Manager.
func (r *SystemAccountReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&appsv1alpha1.Cluster{}).
Owns(&corev1.Secret{}).
Watches(&batchv1.Job{}, r.jobCompletionHandler()).
Expand Down
5 changes: 3 additions & 2 deletions controllers/dataprotection/actionset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (r *ActionSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// SetupWithManager sets up the controller with the Manager.
func (r *ActionSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&dpv1alpha1.ActionSet{}).Complete(r)
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.ActionSet{}).
Complete(r)
}

func (r *ActionSetReconciler) deleteExternalResources(
Expand Down
2 changes: 1 addition & 1 deletion controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

// SetupWithManager sets up the controller with the Manager.
func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).
b := intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.Backup{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(maxConcurDataProtectionReconKey),
Expand Down
2 changes: 1 addition & 1 deletion controllers/dataprotection/backuppolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *BackupPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request

// SetupWithManager sets up the controller with the Manager.
func (r *BackupPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.BackupPolicy{}).
Complete(r)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/dataprotection/backuprepo_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ func (r *BackupRepoReconciler) SetupWithManager(mgr ctrl.Manager) error {
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.BackupRepo{}).
Watches(&storagev1alpha1.StorageProvider{}, handler.EnqueueRequestsFromMapFunc(r.mapProviderToRepos)).
Watches(&dpv1alpha1.Backup{}, handler.EnqueueRequestsFromMapFunc(r.mapBackupToRepo)).
Expand Down
2 changes: 1 addition & 1 deletion controllers/dataprotection/backupschedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *BackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl.Reque

// SetupWithManager sets up the controller with the Manager.
func (r *BackupScheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).
b := intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.BackupSchedule{})

// Compatible with kubernetes versions prior to K8s 1.21, only supports batch v1beta1.
Expand Down
34 changes: 10 additions & 24 deletions controllers/dataprotection/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
ctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
dptypes "github.com/apecloud/kubeblocks/pkg/dataprotection/types"
dputils "github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
viper "github.com/apecloud/kubeblocks/pkg/viperx"
Expand Down Expand Up @@ -63,21 +62,8 @@ func NewGCReconciler(mgr ctrl.Manager) *GCReconciler {
// taken care of. Other events will be filtered to decrease the load on the controller.
func (r *GCReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := dputils.NewPeriodicalEnqueueSource(mgr.GetClient(), &dpv1alpha1.BackupList{}, r.frequency, dputils.PeriodicalEnqueueSourceOption{})
return ctrl.NewControllerManagedBy(mgr).
For(&dpv1alpha1.Backup{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool {
return false
},
UpdateFunc: func(_ event.UpdateEvent) bool {
return false
},
DeleteFunc: func(_ event.DeleteEvent) bool {
return false
},
GenericFunc: func(_ event.GenericEvent) bool {
return false
},
})).
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&dpv1alpha1.Backup{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(client.Object) bool { return false }))).
WatchesRawSource(s, nil).
Complete(r)
}
Expand All @@ -88,7 +74,7 @@ func (r *GCReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// delete expired backups.
func (r *GCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqCtx := ctrlutil.RequestCtx{
reqCtx := intctrlutil.RequestCtx{
Ctx: ctx,
Req: req,
Log: log.FromContext(ctx).WithValues("gc backup", req.NamespacedName),
Expand All @@ -100,14 +86,14 @@ func (r *GCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
if err := r.Get(reqCtx.Ctx, reqCtx.Req.NamespacedName, backup); err != nil {
if apierrors.IsNotFound(err) {
reqCtx.Log.Error(err, "backup ont found")
return ctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
}

// backup is being deleted, skip
if !backup.DeletionTimestamp.IsZero() {
reqCtx.Log.V(1).Info("backup is being deleted, skipping")
return ctrlutil.Reconciled()
return intctrlutil.Reconciled()
}

reqCtx.Log.V(1).Info("gc reconcile", "backup", req.String(),
Expand All @@ -117,17 +103,17 @@ func (r *GCReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
now := r.clock.Now()
if backup.Status.Expiration == nil || backup.Status.Expiration.After(now) {
reqCtx.Log.V(1).Info("backup is not expired yet, skipping")
return ctrlutil.Reconciled()
return intctrlutil.Reconciled()
}

reqCtx.Log.Info("backup has expired, delete it", "backup", req.String())
if err := ctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, backup); err != nil {
if err := intctrlutil.BackgroundDeleteObject(r.Client, reqCtx.Ctx, backup); err != nil {
reqCtx.Log.Error(err, "failed to delete backup")
r.Recorder.Event(backup, corev1.EventTypeWarning, "RemoveExpiredBackupsFailed", err.Error())
return ctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}

return ctrlutil.Reconciled()
return intctrlutil.Reconciled()
}

func getGCFrequency() time.Duration {
Expand Down
65 changes: 34 additions & 31 deletions controllers/dataprotection/log_collection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (
"strings"

"golang.org/x/exp/slices"
"k8s.io/utils/pointer"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -101,35 +100,16 @@ func (r *LogCollectionReconciler) Reconcile(ctx context.Context, req ctrl.Reques

// SetupWithManager sets up the controller with the Manager.
func (r *LogCollectionReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.Job{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(_ event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
job, ok := e.ObjectNew.(*batchv1.Job)
if !ok {
return false
}
if !r.ownedByDataProtection(job) {
return false
}
for _, c := range job.Status.Conditions {
if c.Type == batchv1.JobFailed {
return true
}
}
return false
},
DeleteFunc: func(_ event.DeleteEvent) bool {
return false
},
GenericFunc: func(_ event.GenericEvent) bool {
return false
},
})).WithOptions(controller.Options{
MaxConcurrentReconciles: runtime.NumCPU(),
}).Complete(r)
return intctrlutil.NewNamespacedControllerManagedBy(mgr).
For(&batchv1.Job{}, builder.WithPredicates(
failedJobUpdatePredicate{
Funcs: predicate.NewPredicateFuncs(func(object client.Object) bool { return false }),
r: r,
})).
WithOptions(controller.Options{
MaxConcurrentReconciles: runtime.NumCPU(),
}).
Complete(r)
}

func (r *LogCollectionReconciler) ownedByDataProtection(job *batchv1.Job) bool {
Expand Down Expand Up @@ -253,3 +233,26 @@ func (r *LogCollectionReconciler) patchRestoreStatus(reqCtx intctrlutil.RequestC
}
return r.Client.Status().Patch(reqCtx.Ctx, restore, patch)
}

type failedJobUpdatePredicate struct {
predicate.Funcs
r *LogCollectionReconciler
}

var _ predicate.Predicate = failedJobUpdatePredicate{}

func (p failedJobUpdatePredicate) Update(e event.UpdateEvent) bool {
job, ok := e.ObjectNew.(*batchv1.Job)
if !ok {
return false
}
if !p.r.ownedByDataProtection(job) {
return false
}
for _, c := range job.Status.Conditions {
if c.Type == batchv1.JobFailed {
return true
}
}
return false
}
Loading

0 comments on commit 15b0752

Please sign in to comment.