Skip to content

Commit

Permalink
workloadspread support rolling update
Browse files Browse the repository at this point in the history
Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
  • Loading branch information
mingzhou.swx committed Dec 11, 2023
1 parent 28c0a72 commit 1ce2978
Show file tree
Hide file tree
Showing 12 changed files with 1,008 additions and 343 deletions.
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/workloadspread_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ type WorkloadSpreadStatus struct {
// Contains the status of each subset. Each element in this array represents one subset
// +optional
SubsetStatuses []WorkloadSpreadSubsetStatus `json:"subsetStatuses,omitempty"`

// VersionedSubsetStatuses is to solve rolling-update problems, where the creation of new-version pod
// may be earlier than deletion of old-version pod. We have to calculate the pod subset distribution for
// each version.
VersionedSubsetStatuses map[string][]WorkloadSpreadSubsetStatus `json:"versionedSubsetStatuses,omitempty"`
}

type WorkloadSpreadSubsetConditionType string
Expand Down
17 changes: 17 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 90 additions & 0 deletions config/crd/bases/apps.kruise.io_workloadspreads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,96 @@ spec:
- replicas
type: object
type: array
versionedSubsetStatuses:
additionalProperties:
items:
description: WorkloadSpreadSubsetStatus defines the observed state
of subset
properties:
conditions:
description: Conditions is an array of current observed subset
conditions.
items:
properties:
lastTransitionTime:
description: Last time the condition transitioned from
one status to another.
format: date-time
type: string
message:
description: A human readable message indicating details
about the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
status:
description: Status of the condition, one of True, False,
Unknown.
type: string
type:
description: Type of in place set condition.
type: string
required:
- status
- type
type: object
type: array
creatingPods:
additionalProperties:
format: date-time
type: string
description: CreatingPods contains information about pods
whose creation was processed by the webhook handler but
not yet been observed by the WorkloadSpread controller.
A pod will be in this map from the time when the webhook
handler processed the creation request to the time when
the pod is seen by controller. The key in the map is the
name of the pod and the value is the time when the webhook
handler process the creation request. If the real creation
didn't happen and a pod is still in this map, it will be
removed from the list automatically by WorkloadSpread controller
after some time. If everything goes smooth this map should
be empty for the most of the time. Large number of entries
in the map may indicate problems with pod creations.
type: object
deletingPods:
additionalProperties:
format: date-time
type: string
description: DeletingPods is similar with CreatingPods and
it contains information about pod deletion.
type: object
missingReplicas:
description: MissingReplicas is the number of active replicas
belong to this subset not be found. MissingReplicas > 0
indicates the subset is still missing MissingReplicas pods
to create MissingReplicas = 0 indicates the subset already
has enough pods, there is no need to create MissingReplicas
= -1 indicates the subset's MaxReplicas not set, then there
is no limit for pods number
format: int32
type: integer
name:
description: Name should be unique between all of the subsets
under one WorkloadSpread.
type: string
replicas:
description: Replicas is the most recently observed number
of active replicas for subset.
format: int32
type: integer
required:
- missingReplicas
- name
- replicas
type: object
type: array
description: VersionedSubsetStatuses is to solve rolling-update problems,
where the creation of new-version pod may be earlier than deletion
of old-version pod. We have to calculate the pod subset distribution
for each version.
type: object
type: object
type: object
served: true
Expand Down
61 changes: 57 additions & 4 deletions pkg/controller/workloadspread/update_pod_deletion_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
Expand All @@ -33,19 +34,71 @@ import (
wsutil "github.com/openkruise/kruise/pkg/util/workloadspread"
)

const (
// RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence
RevisionAnnotation = "deployment.kubernetes.io/revision"
)

func (r *ReconcileWorkloadSpread) getWorkloadLatestVersion(ws *appsv1alpha1.WorkloadSpread) (string, error) {
targetRef := ws.Spec.TargetReference
if targetRef == nil {
return "", nil
}

gvk := schema.FromAPIVersionAndKind(targetRef.APIVersion, targetRef.Kind)
key := types.NamespacedName{Namespace: ws.Namespace, Name: targetRef.Name}

object := wsutil.GenerateEmptyWorkloadObject(gvk, key)
if err := r.Get(context.TODO(), key, object); err != nil {
return "", client.IgnoreNotFound(err)
}

return wsutil.GetWorkloadVersion(r.Client, object)
}

func (r *ReconcileWorkloadSpread) updateDeletionCost(ws *appsv1alpha1.WorkloadSpread,
podMap map[string][]*corev1.Pod,
versionedPodMap map[string]map[string][]*corev1.Pod,
workloadReplicas int32) error {
targetRef := ws.Spec.TargetReference
if targetRef == nil || !isEffectiveKindForDeletionCost(targetRef) {
return nil
}
// update Pod's deletion-cost annotation in each subset
for idx, subset := range ws.Spec.Subsets {
if err := r.syncSubsetPodDeletionCost(ws, &subset, idx, podMap[subset.Name], workloadReplicas); err != nil {

latestVersion, err := r.getWorkloadLatestVersion(ws)
if err != nil {
klog.Errorf("Failed to get the latest version for workload in workloadSpread %v, err: %v", klog.KObj(ws), err)
return err
}

// To try our best to keep the distribution of workload description during workload rolling:
// - to the latest version, we hope to scale down the last subset preferentially;
// - to other old versions, we hope to scale down the first subset preferentially;
for version, podMap := range versionedPodMap {
err = r.updateDeletionCostBySubset(ws, podMap, workloadReplicas, version != latestVersion)
if err != nil {
return err
}
}
return nil
}

func (r *ReconcileWorkloadSpread) updateDeletionCostBySubset(ws *appsv1alpha1.WorkloadSpread,
podMap map[string][]*corev1.Pod, workloadReplicas int32, reverseOrder bool) error {
// update Pod's deletion-cost annotation in each subset
if reverseOrder {
subsetNum := len(ws.Spec.Subsets)
for idx, subset := range ws.Spec.Subsets {
if err := r.syncSubsetPodDeletionCost(ws, &subset, subsetNum-idx-1, podMap[subset.Name], workloadReplicas); err != nil {
return err
}
}
} else {
for idx, subset := range ws.Spec.Subsets {
if err := r.syncSubsetPodDeletionCost(ws, &subset, idx, podMap[subset.Name], workloadReplicas); err != nil {
return err
}
}
}
// update the deletion-cost annotation for such pods that do not match any real subsets.
// these pods will have the minimum deletion-cost, and will be deleted preferentially.
if len(podMap[FakeSubsetName]) > 0 {
Expand Down
93 changes: 78 additions & 15 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,20 @@ func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSp
klog.Warningf("WorkloadSpread (%s/%s) has no matched pods, target workload's replicas[%d]", ws.Namespace, ws.Name, workloadReplicas)
}

// group Pods by subset
podMap, err := r.groupPod(ws, pods, workloadReplicas)
// group Pods by pod-revision and subset
versionedPodMap, subsetPodMap, err := r.groupVersionedPods(ws, pods, workloadReplicas)
if err != nil {
return err
}

// update deletion-cost for each subset
err = r.updateDeletionCost(ws, podMap, workloadReplicas)
err = r.updateDeletionCost(ws, versionedPodMap, workloadReplicas)
if err != nil {
return err
}

// calculate status and reschedule
status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, podMap, workloadReplicas)
status, scheduleFailedPodMap := r.calculateWorkloadSpreadStatus(ws, versionedPodMap, subsetPodMap, workloadReplicas)
if status == nil {
return nil
}
Expand Down Expand Up @@ -362,8 +362,33 @@ func getInjectWorkloadSpreadFromPod(pod *corev1.Pod) *wsutil.InjectWorkloadSprea
return injectWS
}

// groupPod returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset.
func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) {
// groupVersionedPods will group pods by pod version and subset
func (r *ReconcileWorkloadSpread) groupVersionedPods(ws *appsv1alpha1.WorkloadSpread, allPods []*corev1.Pod, replicas int32) (map[string]map[string][]*corev1.Pod, map[string][]*corev1.Pod, error) {
versionedPods := map[string][]*corev1.Pod{}
for _, pod := range allPods {
version := wsutil.GetPodVersion(pod)
versionedPods[version] = append(versionedPods[version], pod)
}

subsetPodMap := map[string][]*corev1.Pod{}
versionedPodMap := map[string]map[string][]*corev1.Pod{}
// group pods by version
for version, pods := range versionedPods {
// group pods by subset
podMap, err := r.groupPodBySubset(ws, pods, replicas)
if err != nil {
return nil, nil, err
}
for subset, ps := range podMap {
subsetPodMap[subset] = append(subsetPodMap[subset], ps...)
}
versionedPodMap[version] = podMap
}
return versionedPodMap, subsetPodMap, nil
}

// groupPodBySubset returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset.
func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) {
podMap := make(map[string][]*corev1.Pod, len(ws.Spec.Subsets)+1)
podMap[FakeSubsetName] = []*corev1.Pod{}
subsetMissingReplicas := make(map[string]int)
Expand Down Expand Up @@ -507,18 +532,58 @@ func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.P
// 1. current WorkloadSpreadStatus
// 2. a map, the key is the subsetName, the value is the schedule failed Pods belongs to the subset.
func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread,
podMap map[string][]*corev1.Pod, workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) {
// set the generation in the returned status
versionedPodMap map[string]map[string][]*corev1.Pod, subsetPodMap map[string][]*corev1.Pod,
workloadReplicas int32) (*appsv1alpha1.WorkloadSpreadStatus, map[string][]*corev1.Pod) {
status := appsv1alpha1.WorkloadSpreadStatus{}
// set the generation in the returned status
status.ObservedGeneration = ws.Generation
// status.ObservedWorkloadReplicas = workloadReplicas
status.SubsetStatuses = make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets))
status.VersionedSubsetStatuses = make(map[string][]appsv1alpha1.WorkloadSpreadSubsetStatus, len(versionedPodMap))

// overall subset statuses
var scheduleFailedPodMap map[string][]*corev1.Pod
status.SubsetStatuses, scheduleFailedPodMap = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.SubsetStatuses, subsetPodMap, workloadReplicas)

// versioned subset statuses calculated by observed pods
for version, podMap := range versionedPodMap {
status.VersionedSubsetStatuses[version], _ = r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], podMap, workloadReplicas)
}

// Consider this case:
// A Pod has been created and processed by webhook, but the Pod is not cached by controller.
// We have to keep the subsetStatus for this version even though there is no Pod belonging to it.
for version := range ws.Status.VersionedSubsetStatuses {
if _, exist := versionedPodMap[version]; exist {
continue
}
versionSubsetStatues, _ := r.calculateWorkloadSpreadSubsetStatuses(ws, ws.Status.VersionedSubsetStatuses[version], nil, workloadReplicas)
if !isEmptySubsetStatuses(versionSubsetStatues) {
status.VersionedSubsetStatuses[version] = versionSubsetStatues
}
}

return &status, scheduleFailedPodMap
}

func isEmptySubsetStatuses(statues []appsv1alpha1.WorkloadSpreadSubsetStatus) bool {
replicas, creating, deleting := 0, 0, 0
for _, subset := range statues {
replicas += int(subset.Replicas)
creating += len(subset.CreatingPods)
deleting += len(subset.DeletingPods)
}
return replicas+creating+deleting == 0
}

func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatuses(ws *appsv1alpha1.WorkloadSpread,
oldSubsetStatuses []appsv1alpha1.WorkloadSpreadSubsetStatus, podMap map[string][]*corev1.Pod, workloadReplicas int32,
) ([]appsv1alpha1.WorkloadSpreadSubsetStatus, map[string][]*corev1.Pod) {
subsetStatuses := make([]appsv1alpha1.WorkloadSpreadSubsetStatus, len(ws.Spec.Subsets))
scheduleFailedPodMap := make(map[string][]*corev1.Pod)

// Using a map to restore name and old status of subset, because user could adjust the spec's subset sequence
// to change priority of subset. We guarantee that operation and use subset name to distinguish which subset
// from old status.
oldSubsetStatuses := ws.Status.SubsetStatuses
oldSubsetStatusMap := make(map[string]*appsv1alpha1.WorkloadSpreadSubsetStatus, len(oldSubsetStatuses))
for i := range oldSubsetStatuses {
oldSubsetStatusMap[oldSubsetStatuses[i].Name] = &oldSubsetStatuses[i]
Expand Down Expand Up @@ -557,10 +622,10 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadStatus(ws *appsv1alpha1
removeWorkloadSpreadSubsetCondition(subsetStatus, appsv1alpha1.SubsetSchedulable)
}

status.SubsetStatuses[i] = *subsetStatus
subsetStatuses[i] = *subsetStatus
}

return &status, scheduleFailedPodMap
return subsetStatuses, scheduleFailedPodMap
}

// calculateWorkloadSpreadSubsetStatus returns the current subsetStatus for subset.
Expand Down Expand Up @@ -678,9 +743,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1

func (r *ReconcileWorkloadSpread) UpdateWorkloadSpreadStatus(ws *appsv1alpha1.WorkloadSpread,
status *appsv1alpha1.WorkloadSpreadStatus) error {
if status.ObservedGeneration == ws.Status.ObservedGeneration &&
// status.ObservedWorkloadReplicas == ws.Status.ObservedWorkloadReplicas &&
apiequality.Semantic.DeepEqual(status.SubsetStatuses, ws.Status.SubsetStatuses) {
if apiequality.Semantic.DeepEqual(status, ws.Status) {
return nil
}

Expand Down
Loading

0 comments on commit 1ce2978

Please sign in to comment.