diff --git a/pkg/controllers/common/constants.go b/pkg/controllers/common/constants.go index 3bd228d4..81b9c1c1 100644 --- a/pkg/controllers/common/constants.go +++ b/pkg/controllers/common/constants.go @@ -116,6 +116,14 @@ const ( TemplateGeneratorMergePatchAnnotation = FederateControllerPrefix + "template-generator-merge-patch" LatestReplicasetDigestsAnnotation = DefaultPrefix + "latest-replicaset-digests" + + HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path" +) + +// The following consts are labels key-values used by Kubeadmiral controllers. + +const ( + FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled" ) // PropagatedAnnotationKeys and PropagatedLabelKeys are used to store the keys of annotations and labels that are present @@ -190,9 +198,3 @@ var ( // //nolint:lll const MaxFederatedObjectNameLength = 253 - -// HPAScaleTargetRefPath defines the fed hpa annotations and labels -const ( - HPAScaleTargetRefPath = DefaultPrefix + "scale-target-ref-path" - FedHPAEnableKey = DefaultPrefix + "fed-hpa-enabled" -) diff --git a/pkg/controllers/federatedhpa/controller.go b/pkg/controllers/federatedhpa/controller.go index cd80fac6..f9f94bd6 100644 --- a/pkg/controllers/federatedhpa/controller.go +++ b/pkg/controllers/federatedhpa/controller.go @@ -19,6 +19,7 @@ package federatedhpa import ( "context" "fmt" + "sync" "time" "github.com/pkg/errors" @@ -80,9 +81,10 @@ type FederatedHPAController struct { worker worker.ReconcileWorker[Resource] cacheSyncRateLimiter workqueue.RateLimiter - scaleTargetRefMapping map[schema.GroupVersionKind]string - workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource] - ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource] + gvkToScaleTargetRefLock sync.RWMutex + gvkToScaleTargetRef map[schema.GroupVersionKind]string + workloadHPAMapping *bijection.OneToManyRelation[Resource, Resource] + ppWorkloadMapping *bijection.OneToManyRelation[Resource, Resource] metrics stats.Metrics logger klog.Logger @@ -114,9 +116,9 @@ func NewFederatedHPAController( cacheSyncRateLimiter: workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), - scaleTargetRefMapping: map[schema.GroupVersionKind]string{}, - workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](), - ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](), + gvkToScaleTargetRef: map[schema.GroupVersionKind]string{}, + workloadHPAMapping: bijection.NewOneToManyRelation[Resource, Resource](), + ppWorkloadMapping: bijection.NewOneToManyRelation[Resource, Resource](), metrics: metrics, logger: logger.WithValues("controller", FederatedHPAControllerName), @@ -209,10 +211,13 @@ func (f *FederatedHPAController) enqueueFedHPAObjectsForPropagationPolicy(policy func (f *FederatedHPAController) enqueueFedHPAObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) { logger := f.logger.WithValues("ftc", ftc.GetName()) + f.gvkToScaleTargetRefLock.Lock() + defer f.gvkToScaleTargetRefLock.Unlock() + if scaleTargetRefPath, ok := ftc.GetAnnotations()[common.HPAScaleTargetRefPath]; ok { - f.scaleTargetRefMapping[ftc.GetSourceTypeGVK()] = scaleTargetRefPath + f.gvkToScaleTargetRef[ftc.GetSourceTypeGVK()] = scaleTargetRefPath } else { - delete(f.scaleTargetRefMapping, ftc.GetSourceTypeGVK()) + delete(f.gvkToScaleTargetRef, ftc.GetSourceTypeGVK()) return } @@ -329,8 +334,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s return worker.StatusAllOK } - scaleTargetRef := f.scaleTargetRefMapping[key.gvk] - newWorkloadResource, err := scaleTargetRefToResource(hpaObject, scaleTargetRef) + newWorkloadResource, err := f.scaleTargetRefToResource(key.gvk, hpaObject) if err != nil { logger.Error(err, "Failed to get workload resource from hpa") return worker.StatusError @@ -360,8 +364,9 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s } var pp fedcorev1a1.GenericPropagationPolicy + var newPPResource *Resource if workloadExist { - newPPResource := getPropagationPolicyResourceFromFedWorkload(fedWorkload) + newPPResource = getPropagationPolicyResourceFromFedWorkload(fedWorkload) if newPPResource != nil { _, exist = f.ppWorkloadMapping.LookupByT2(newWorkloadResource) if exist { @@ -388,11 +393,11 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s return worker.StatusError } - if !workloadExist || isPropagationPolicyDividedMode(pp) { + if !workloadExist || isPropagationPolicyExist(pp) && isPropagationPolicyDividedMode(pp) { isHPAObjectUpdated = removeFedHPANotWorkReasonAnno(hpaObject, FedHPANotWorkReason) isHPAObjectUpdated = isHPAObjectUpdated || addHPALabel(hpaObject, common.FedHPAEnableKey, common.AnnotationValueTrue) } else { - hpaNotWorkReason := generateFederationHPANotWorkReason(isPropagationPolicyExist(pp), isPropagationPolicyDividedMode(pp)) + hpaNotWorkReason := generateFederationHPANotWorkReason(newPPResource, pp) f.eventRecorder.Eventf( hpaObject, corev1.EventTypeWarning, @@ -408,7 +413,8 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s case FederatedHPAModeDistributed, FederatedHPAModeDefault: isHPAObjectUpdated = removeHPALabel(hpaObject, common.FedHPAEnableKey) - if !workloadExist || isPropagationPolicyDuplicateMode(pp) && + if !workloadExist || isPropagationPolicyExist(pp) && + isPropagationPolicyDuplicateMode(pp) && isPropagationPolicyFollowerEnabled(pp) && isWorkloadRetainReplicas(fedWorkload) && isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) { @@ -417,12 +423,7 @@ func (f *FederatedHPAController) reconcile(ctx context.Context, key Resource) (s return worker.StatusError } } else { - hpaNotWorkReason := generateDistributedHPANotWorkReason( - isPropagationPolicyExist(pp), - isPropagationPolicyDuplicateMode(pp), - isPropagationPolicyFollowerEnabled(pp), - isWorkloadRetainReplicas(fedWorkload), - isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload)) + hpaNotWorkReason := generateDistributedHPANotWorkReason(ctx, newPPResource, pp, fedWorkload, hpaObject) f.eventRecorder.Eventf( hpaObject, corev1.EventTypeWarning, diff --git a/pkg/controllers/federatedhpa/util.go b/pkg/controllers/federatedhpa/util.go index 79ed6d33..62111a57 100644 --- a/pkg/controllers/federatedhpa/util.go +++ b/pkg/controllers/federatedhpa/util.go @@ -73,58 +73,81 @@ func fedObjectToSourceObjectResource(object metav1.Object) (Resource, error) { } func policyObjectToResource(object metav1.Object) Resource { - if cpp, ok := object.(*fedcorev1a1.ClusterPropagationPolicy); ok { + if cpp, ok := object.(*fedcorev1a1.PropagationPolicy); ok { return Resource{ name: cpp.GetName(), namespace: cpp.GetNamespace(), - gvk: cpp.GroupVersionKind(), + gvk: schema.GroupVersionKind{ + Group: fedcorev1a1.SchemeGroupVersion.Group, + Version: fedcorev1a1.SchemeGroupVersion.Version, + Kind: PropagationPolicyKind, + }, } } - if pp, ok := object.(*fedcorev1a1.PropagationPolicy); ok { + if pp, ok := object.(*fedcorev1a1.ClusterPropagationPolicy); ok { return Resource{ name: pp.GetName(), namespace: pp.GetNamespace(), - gvk: pp.GroupVersionKind(), + gvk: schema.GroupVersionKind{ + Group: fedcorev1a1.SchemeGroupVersion.Group, + Version: fedcorev1a1.SchemeGroupVersion.Version, + Kind: ClusterPropagationPolicyKind, + }, } } + return Resource{} } -func generateFederationHPANotWorkReason(isPropagationPolicyExist, isPropagationPolicyDividedMode bool) string { +func generateFederationHPANotWorkReason(newPPResource *Resource, pp fedcorev1a1.GenericPropagationPolicy) string { var reasons []string - if !isPropagationPolicyExist { - reasons = append(reasons, "PropagationPolicy is not exist.") + if newPPResource == nil { + reasons = append(reasons, "The workload is not bound to any propagationPolicy.") + return fmt.Sprintf("%v", reasons) + } + + ppKind, ppName := newPPResource.gvk.Kind, newPPResource.name + if !isPropagationPolicyExist(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload does not exist.", ppKind, ppName)) + return fmt.Sprintf("%v", reasons) } - if isPropagationPolicyExist && !isPropagationPolicyDividedMode { - reasons = append(reasons, "PropagationPolicy is not divide.") + if isPropagationPolicyExist(pp) && !isPropagationPolicyDividedMode(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not Divided mode.", ppKind, ppName)) } return fmt.Sprintf("%v", reasons) } func generateDistributedHPANotWorkReason( - isPropagationPolicyExist, - isPropagationPolicyDuplicateMode, - isPropagationPolicyFollowerEnabled, - isWorkloadRetainReplicas, - isHPAFollowTheWorkload bool, + ctx context.Context, + newPPResource *Resource, + pp fedcorev1a1.GenericPropagationPolicy, + fedWorkload fedcorev1a1.GenericFederatedObject, + hpaObject *unstructured.Unstructured, ) string { var reasons []string - if !isPropagationPolicyExist { - reasons = append(reasons, "PropagationPolicy is not exist.") + if newPPResource == nil { + reasons = append(reasons, "The workload is not bound to any propagationPolicy.") + return fmt.Sprintf("%v", reasons) } - if !isPropagationPolicyDuplicateMode { - reasons = append(reasons, "PropagationPolicy is not Duplicate.") + if !isPropagationPolicyExist(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload does not exist.", pp.GetObjectKind(), pp.GetName())) + return fmt.Sprintf("%v", reasons) } - if !isPropagationPolicyFollowerEnabled { - reasons = append(reasons, "PropagationPolicy follower is not enable.") + + ppKind, ppName := newPPResource.gvk.Kind, newPPResource.name + if !isPropagationPolicyDuplicateMode(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not Duplicate mode.", ppKind, ppName)) } - if !isWorkloadRetainReplicas { - reasons = append(reasons, "Workload is not retain replicas.") + if !isPropagationPolicyFollowerEnabled(pp) { + reasons = append(reasons, fmt.Sprintf("The %s %s bound to the workload is not enabled for follower scheduling.", ppKind, ppName)) } - if !isHPAFollowTheWorkload { - reasons = append(reasons, "Hpa is not follow the workload.") + if !isWorkloadRetainReplicas(fedWorkload) { + reasons = append(reasons, "The workload is not enabled for retain replicas.") + } + if !isHPAFollowTheWorkload(ctx, hpaObject, fedWorkload) { + reasons = append(reasons, "The hpa is not follow the workload.") } return fmt.Sprintf("%v", reasons) @@ -139,11 +162,11 @@ func isPropagationPolicyExist(pp fedcorev1a1.GenericPropagationPolicy) bool { } func isPropagationPolicyDividedMode(pp fedcorev1a1.GenericPropagationPolicy) bool { - return pp != nil && pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDivide + return pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDivide } func isPropagationPolicyDuplicateMode(pp fedcorev1a1.GenericPropagationPolicy) bool { - return pp != nil && pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDuplicate + return pp.GetSpec().SchedulingMode == fedcorev1a1.SchedulingModeDuplicate } func isPropagationPolicyFollowerEnabled(pp fedcorev1a1.GenericPropagationPolicy) bool { @@ -178,16 +201,19 @@ func (f *FederatedHPAController) isHPAType(resourceGVK schema.GroupVersionKind) return false } + f.gvkToScaleTargetRefLock.Lock() + defer f.gvkToScaleTargetRefLock.Unlock() + // HPA gvk has already been stored - if _, ok := f.scaleTargetRefMapping[resourceGVK]; ok { + if _, ok := f.gvkToScaleTargetRef[resourceGVK]; ok { return true } if path, ok := ftc.Annotations[common.HPAScaleTargetRefPath]; ok { - f.scaleTargetRefMapping[resourceGVK] = path + f.gvkToScaleTargetRef[resourceGVK] = path return true } else { - delete(f.scaleTargetRefMapping, resourceGVK) + delete(f.gvkToScaleTargetRef, resourceGVK) return false } } @@ -196,7 +222,12 @@ func isWorkloadRetainReplicas(fedObj metav1.Object) bool { return fedObj.GetAnnotations()[common.RetainReplicasAnnotation] == common.AnnotationValueTrue } -func scaleTargetRefToResource(hpaUns *unstructured.Unstructured, scaleTargetRef string) (Resource, error) { +func (f *FederatedHPAController) scaleTargetRefToResource(gvk schema.GroupVersionKind, hpaUns *unstructured.Unstructured) (Resource, error) { + f.gvkToScaleTargetRefLock.RLock() + defer f.gvkToScaleTargetRefLock.RUnlock() + + scaleTargetRef := f.gvkToScaleTargetRef[gvk] + fieldVal, found, err := unstructured.NestedFieldCopy(hpaUns.Object, strings.Split(scaleTargetRef, ".")...) if err != nil || !found { if err != nil { diff --git a/pkg/controllers/sync/controller.go b/pkg/controllers/sync/controller.go index ab90598a..a5dd00f1 100644 --- a/pkg/controllers/sync/controller.go +++ b/pkg/controllers/sync/controller.go @@ -409,7 +409,11 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua keyedLogger.Error(err, "Failed to get pending controllers") return worker.StatusError } - if len(pendingControllers) > 0 { + isFedHPAObject, err := isFedHPAObject(ctx, fedResource.Object()) + if err != nil { + return worker.StatusError + } + if len(pendingControllers) > 0 && !isFedHPAObject { // upstream controllers have not finished processing, we wait for our turn return worker.StatusAllOK } @@ -422,7 +426,7 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua fedResource.RecordError("EnsureFinalizerError", errors.Wrap(err, "Failed to ensure finalizer")) return worker.StatusError } - clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource) + clustersToSync, selectedClusters, err := s.prepareToSync(ctx, fedResource, isFedHPAObject) if err != nil { fedResource.RecordError("PrepareToSyncError", errors.Wrap(err, "Failed to prepare to sync")) return worker.StatusError @@ -440,6 +444,7 @@ func (s *SyncController) reconcile(ctx context.Context, federatedName common.Qua func (s *SyncController) prepareToSync( ctx context.Context, fedResource FederatedResource, + isFedHPAObject bool, ) ( requireSync []*fedcorev1a1.FederatedCluster, selectedClusters sets.Set[string], @@ -459,6 +464,9 @@ func (s *SyncController) prepareToSync( } return nil, nil, err } + if isFedHPAObject { + clusters = nil + } clusterMap := make(map[string]*fedcorev1a1.FederatedCluster, len(clusters)) for _, cluster := range clusters { clusterMap[cluster.Name] = cluster diff --git a/pkg/controllers/sync/util.go b/pkg/controllers/sync/util.go new file mode 100644 index 00000000..e787d1b2 --- /dev/null +++ b/pkg/controllers/sync/util.go @@ -0,0 +1,42 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + "context" + + "k8s.io/klog/v2" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" +) + +func isFedHPAObject(ctx context.Context, fedObject fedcorev1a1.GenericFederatedObject) (bool, error) { + logger := klog.FromContext(ctx) + + templateMetadata, err := fedObject.GetSpec().GetTemplateMetadata() + if err != nil { + logger.Error(err, "Failed to get TemplateMetadata") + return false, err + } + + if value, ok := templateMetadata.GetLabels()[common.FedHPAEnableKey]; ok && value == common.AnnotationValueTrue { + return true, nil + } + + return false, nil +}