Skip to content

Commit

Permalink
Introduce a new API named WorkloadRebalancer to support rescheduling
Browse files Browse the repository at this point in the history
Signed-off-by: chaosi-zju <chaosi@zju.edu.cn>
  • Loading branch information
chaosi-zju committed Apr 20, 2024
1 parent 21c6dfc commit ed44bd0
Showing 1 changed file with 53 additions and 48 deletions.
101 changes: 53 additions & 48 deletions pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,22 @@ var predicateFunc = predicate.Funcs{
}

// SetupWithManager creates a controller and register to controller manager.
func (s *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error {
func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
Named(ControllerName).
For(&appsv1alpha1.WorkloadRebalancer{}, builder.WithPredicates(predicateFunc)).
Complete(s)
Complete(c)
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (s *RebalancerController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for WorkloadRebalancer %s", req.Name)
func (c *RebalancerController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling for WorkloadRebalancer %c", req.Name)

// 1. get latest WorkloadRebalancer
rebalancer := &appsv1alpha1.WorkloadRebalancer{}
if err := s.Client.Get(ctx, req.NamespacedName, rebalancer); err != nil {
if err := c.Client.Get(ctx, req.NamespacedName, rebalancer); err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("no need to reconcile WorkloadRebalancer for it not found")
return controllerruntime.Result{}, nil
Expand All @@ -78,11 +78,37 @@ func (s *RebalancerController) Reconcile(ctx context.Context, req controllerrunt

// 2. build status of WorkloadRebalancer
if len(rebalancer.Status.ObservedWorkloads) == 0 {
s.buildWorkloadRebalancerStatus(rebalancer)
c.buildWorkloadRebalancerStatus(rebalancer)
}

// 3. get and update referenced binding to trigger a rescheduling
successNum, retryNum := int64(0), int64(0)
successNum, retryNum := c.doWorkloadRebalance(ctx, rebalancer)

// 4. update status of WorkloadRebalancer
if err := c.updateWorkloadRebalancerStatus(rebalancer); err != nil {
return controllerruntime.Result{}, err
}
klog.Infof("Finish handling WorkloadRebalancer (%c), %d/%d resource success in all, while %d resource need retry",
rebalancer.Name, successNum, len(rebalancer.Status.ObservedWorkloads), retryNum)

if retryNum > 0 {
return controllerruntime.Result{}, fmt.Errorf("%d resource reschedule triggered failed and need retry", retryNum)
}
return controllerruntime.Result{}, nil
}

func (c *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) {
resourceList := make([]appsv1alpha1.ObservedWorkload, 0)
for _, resource := range rebalancer.Spec.Workloads {
resourceList = append(resourceList, appsv1alpha1.ObservedWorkload{
Workload: resource,
})
}
rebalancer.Status.ObservedWorkloads = resourceList
}

func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (successNum int64, retryNum int64) {
successNum, retryNum = int64(0), int64(0)
for i, resource := range rebalancer.Status.ObservedWorkloads {
if resource.State == appsv1alpha1.Success {
successNum++
Expand All @@ -96,91 +122,70 @@ func (s *RebalancerController) Reconcile(ctx context.Context, req controllerrunt
// resource with empty namespace represents it is a cluster wide resource.
if resource.Namespace != "" {
binding := &workv1alpha2.ResourceBinding{}
if err := s.Client.Get(ctx, client.ObjectKey{Namespace: resource.Namespace, Name: bindingName}, binding); err != nil {
if err := c.Client.Get(ctx, client.ObjectKey{Namespace: resource.Namespace, Name: bindingName}, binding); err != nil {
klog.Errorf("get binding failed: %+v", err)
s.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
c.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
continue
}
// update spec.rescheduleTriggeredAt of referenced fetchTargetRefBindings to trigger a rescheduling
if s.needTriggerReschedule(rebalancer.CreationTimestamp, binding.Spec.RescheduleTriggeredAt) {
if c.needTriggerReschedule(rebalancer.CreationTimestamp, binding.Spec.RescheduleTriggeredAt) {
binding.Spec.RescheduleTriggeredAt = &rebalancer.CreationTimestamp

if err := s.Client.Update(ctx, binding); err != nil {
if err := c.Client.Update(ctx, binding); err != nil {
klog.Errorf("update binding failed: %+v", err)
s.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
c.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
continue
}
}
s.recordWorkloadRebalanceSuccess(&rebalancer.Status.ObservedWorkloads[i], &successNum)
c.recordWorkloadRebalanceSuccess(&rebalancer.Status.ObservedWorkloads[i], &successNum)
} else {
clusterbinding := &workv1alpha2.ClusterResourceBinding{}
if err := s.Client.Get(ctx, client.ObjectKey{Name: bindingName}, clusterbinding); err != nil {
if err := c.Client.Get(ctx, client.ObjectKey{Name: bindingName}, clusterbinding); err != nil {
klog.Errorf("get cluster binding failed: %+v", err)
s.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
c.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
continue
}
// update spec.rescheduleTriggeredAt of referenced clusterbinding to trigger a rescheduling
if s.needTriggerReschedule(rebalancer.CreationTimestamp, clusterbinding.Spec.RescheduleTriggeredAt) {
if c.needTriggerReschedule(rebalancer.CreationTimestamp, clusterbinding.Spec.RescheduleTriggeredAt) {
clusterbinding.Spec.RescheduleTriggeredAt = &rebalancer.CreationTimestamp

if err := s.Client.Update(ctx, clusterbinding); err != nil {
if err := c.Client.Update(ctx, clusterbinding); err != nil {
klog.Errorf("update cluster binding failed: %+v", err)
s.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
c.recordWorkloadRebalanceFailed(&rebalancer.Status.ObservedWorkloads[i], &retryNum, err)
continue
}
}
s.recordWorkloadRebalanceSuccess(&rebalancer.Status.ObservedWorkloads[i], &successNum)
c.recordWorkloadRebalanceSuccess(&rebalancer.Status.ObservedWorkloads[i], &successNum)
}
}

// 4. update status of WorkloadRebalancer
if err := s.updateWorkloadRebalancerStatus(rebalancer); err != nil {
return controllerruntime.Result{}, err
}
klog.Infof("Finish handling WorkloadRebalancer (%s), %d/%d resource success in all, while %d resource need retry",
rebalancer.Name, successNum, len(rebalancer.Status.ObservedWorkloads), retryNum)

if retryNum > 0 {
return controllerruntime.Result{}, fmt.Errorf("%d resource reschedule triggered failed and need retry", retryNum)
}
return controllerruntime.Result{}, nil
}

func (s *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) {
resourceList := make([]appsv1alpha1.ObservedWorkload, 0)
for _, resource := range rebalancer.Spec.Workloads {
resourceList = append(resourceList, appsv1alpha1.ObservedWorkload{
Workload: resource,
})
}
rebalancer.Status.ObservedWorkloads = resourceList
return
}

func (s *RebalancerController) needTriggerReschedule(creationTimestamp metav1.Time, rescheduleTriggeredAt *metav1.Time) bool {
func (c *RebalancerController) needTriggerReschedule(creationTimestamp metav1.Time, rescheduleTriggeredAt *metav1.Time) bool {
return rescheduleTriggeredAt == nil || creationTimestamp.After(rescheduleTriggeredAt.Time)
}

func (s *RebalancerController) recordWorkloadRebalanceSuccess(resource *appsv1alpha1.ObservedWorkload, successNum *int64) {
func (c *RebalancerController) recordWorkloadRebalanceSuccess(resource *appsv1alpha1.ObservedWorkload, successNum *int64) {
resource.State = appsv1alpha1.Success
*successNum++
}

func (s *RebalancerController) recordWorkloadRebalanceFailed(resource *appsv1alpha1.ObservedWorkload, retryNum *int64, err error) {
func (c *RebalancerController) recordWorkloadRebalanceFailed(resource *appsv1alpha1.ObservedWorkload, retryNum *int64, err error) {
resource.State = appsv1alpha1.Failed
resource.Reason = apierrors.ReasonForError(err)
if resource.Reason != metav1.StatusReasonNotFound {
*retryNum++
}
}

func (s *RebalancerController) updateWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) error {
func (c *RebalancerController) updateWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) error {
rebalancerCopy := rebalancer.DeepCopy()
rebalancerPatch := client.MergeFrom(rebalancerCopy)

return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
klog.V(4).Infof("Start to patch WorkloadRebalancer(%s) status", rebalancer.Name)
if err := s.Client.Patch(context.TODO(), rebalancerCopy, rebalancerPatch); err != nil {
klog.Errorf("Failed to patch WorkloadRebalancer (%s) status, err: %+v", rebalancer.Name, err)
klog.V(4).Infof("Start to patch WorkloadRebalancer(%c) status", rebalancer.Name)
if err := c.Client.Patch(context.TODO(), rebalancerCopy, rebalancerPatch); err != nil {
klog.Errorf("Failed to patch WorkloadRebalancer (%c) status, err: %+v", rebalancer.Name, err)
return err
}
return nil
Expand Down

0 comments on commit ed44bd0

Please sign in to comment.