diff --git a/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml b/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml index ae61ad91c..cf982c324 100644 --- a/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml +++ b/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml @@ -135,12 +135,46 @@ spec: type: object type: array type: object + patches: + description: Patches means that the resource will be patched. + items: + description: StagePatch describes the patch for the resource. + properties: + impersonation: + description: |- + Impersonation indicates the impersonating configuration for client when patching status. + In most cases this will be empty, in which case the default client service account will be used. + When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. + The support for this field is not available in Pod and Node resources. + properties: + username: + description: Username the target username for the client + to impersonate + type: string + required: + - username + type: object + root: + description: Root indicates the root of the template calculated + by the patch. + type: string + subresource: + description: Subresource indicates the name of the subresource + that will be patched. + type: string + template: + description: Template indicates the template for modifying + the resource in the next. + type: string + type: object + type: array statusPatchAs: description: |- StatusPatchAs indicates the impersonating configuration for client when patching status. In most cases this will be empty, in which case the default client service account will be used. When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. The support for this field is not available in Pod and Node resources. + Deprecated: Use Patches instead. properties: username: description: Username the target username for the client to @@ -154,10 +188,12 @@ spec: description: |- StatusSubresource indicates the name of the subresource that will be patched. The support for this field is not available in Pod and Node resources. + Deprecated: Use Patches instead. type: string statusTemplate: - description: StatusTemplate indicates the template for modifying - the status of the resource in the next. + description: |- + StatusTemplate indicates the template for modifying the status of the resource in the next. + Deprecated: Use Patches instead. type: string type: object resourceRef: diff --git a/pkg/apis/internalversion/conversion.go b/pkg/apis/internalversion/conversion.go index f507354c9..22bd71567 100644 --- a/pkg/apis/internalversion/conversion.go +++ b/pkg/apis/internalversion/conversion.go @@ -17,6 +17,8 @@ limitations under the License. package internalversion import ( + "k8s.io/apimachinery/pkg/conversion" + configv1alpha1 "sigs.k8s.io/kwok/pkg/apis/config/v1alpha1" "sigs.k8s.io/kwok/pkg/apis/v1alpha1" "sigs.k8s.io/kwok/pkg/utils/path" @@ -388,3 +390,41 @@ func ConvertToInternalMetric(in *v1alpha1.Metric) (*Metric, error) { } return &out, nil } + +// Convert_v1alpha1_StageNext_To_internalversion_StageNext converts a v1alpha1.StageNext to an internal version. +func Convert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.StageNext, out *StageNext, s conversion.Scope) error { + err := autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in, out, s) + if err != nil { + return err + } + + if in.StatusTemplate != "" { + template := in.StatusTemplate + + subresource := "status" + if in.StatusSubresource != nil { + subresource = *in.StatusSubresource + } + + var impersonation *ImpersonationConfig + if in.StatusPatchAs != nil { + impersonation = &ImpersonationConfig{ + Username: in.StatusPatchAs.Username, + } + } + patch := StagePatch{ + Subresource: subresource, + Root: "status", + Template: template, + Impersonation: impersonation, + } + + out.Patches = append([]StagePatch{patch}, out.Patches...) + } + return nil +} + +// Convert_internalversion_StagePatch_To_v1alpha1_StagePatch converts an internal version StagePatch to a v1alpha1.StagePatch. +func Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(in *StagePatch, out *v1alpha1.StagePatch, s conversion.Scope) error { + return autoConvert_internalversion_StagePatch_To_v1alpha1_StagePatch(in, out, s) +} diff --git a/pkg/apis/internalversion/stage_types.go b/pkg/apis/internalversion/stage_types.go index 3e368cb6a..c56914893 100644 --- a/pkg/apis/internalversion/stage_types.go +++ b/pkg/apis/internalversion/stage_types.go @@ -81,14 +81,23 @@ type StageNext struct { Finalizers *StageFinalizers // Delete means that the resource will be deleted if true. Delete bool - // StatusTemplate indicates the template for modifying the status of the resource in the next. - StatusTemplate string - // StatusSubresource indicates the name of the subresource that will be patched. - StatusSubresource string - // StatusPatchAs indicates the impersonating configuration for client when patching status. + // Patches means that the resource will be patched. + Patches []StagePatch +} + +// StagePatch describes the patch for the resource. +type StagePatch struct { + // Subresource indicates the name of the subresource that will be patched. + Subresource string + // Root indicates the root of the template calculated by the patch. + Root string + // Template indicates the template for modifying the resource in the next. + Template string + // Impersonation indicates the impersonating configuration for client when patching status. // In most cases this will be empty, in which case the default client service account will be used. // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. - StatusPatchAs *ImpersonationConfig + // The support for this field is not available in Pod and Node resources. + Impersonation *ImpersonationConfig } // ImpersonationConfig describes the configuration for impersonating clients diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go index 0b9a3c6aa..0e8df25da 100644 --- a/pkg/apis/internalversion/zz_generated.conversion.go +++ b/pkg/apis/internalversion/zz_generated.conversion.go @@ -605,8 +605,8 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*v1alpha1.StageNext)(nil), (*StageNext)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1alpha1_StageNext_To_internalversion_StageNext(a.(*v1alpha1.StageNext), b.(*StageNext), scope) + if err := s.AddGeneratedConversionFunc((*v1alpha1.StagePatch)(nil), (*StagePatch)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(a.(*v1alpha1.StagePatch), b.(*StagePatch), scope) }); err != nil { return err } @@ -650,6 +650,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*StagePatch)(nil), (*v1alpha1.StagePatch)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(a.(*StagePatch), b.(*v1alpha1.StagePatch), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*v1alpha1.StageNext)(nil), (*StageNext)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_StageNext_To_internalversion_StageNext(a.(*v1alpha1.StageNext), b.(*StageNext), scope) + }); err != nil { + return err + } return nil } @@ -2460,11 +2470,17 @@ func autoConvert_internalversion_StageNext_To_v1alpha1_StageNext(in *StageNext, out.Event = (*v1alpha1.StageEvent)(unsafe.Pointer(in.Event)) out.Finalizers = (*v1alpha1.StageFinalizers)(unsafe.Pointer(in.Finalizers)) out.Delete = in.Delete - out.StatusTemplate = in.StatusTemplate - if err := v1.Convert_string_To_Pointer_string(&in.StatusSubresource, &out.StatusSubresource, s); err != nil { - return err + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]v1alpha1.StagePatch, len(*in)) + for i := range *in { + if err := Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Patches = nil } - out.StatusPatchAs = (*v1alpha1.ImpersonationConfig)(unsafe.Pointer(in.StatusPatchAs)) return nil } @@ -2477,17 +2493,42 @@ func autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.St out.Event = (*StageEvent)(unsafe.Pointer(in.Event)) out.Finalizers = (*StageFinalizers)(unsafe.Pointer(in.Finalizers)) out.Delete = in.Delete - out.StatusTemplate = in.StatusTemplate - if err := v1.Convert_Pointer_string_To_string(&in.StatusSubresource, &out.StatusSubresource, s); err != nil { - return err + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]StagePatch, len(*in)) + for i := range *in { + if err := Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Patches = nil } - out.StatusPatchAs = (*ImpersonationConfig)(unsafe.Pointer(in.StatusPatchAs)) + // INFO: in.StatusTemplate opted out of conversion generation + // INFO: in.StatusSubresource opted out of conversion generation + // INFO: in.StatusPatchAs opted out of conversion generation + return nil +} + +func autoConvert_internalversion_StagePatch_To_v1alpha1_StagePatch(in *StagePatch, out *v1alpha1.StagePatch, s conversion.Scope) error { + out.Subresource = in.Subresource + out.Root = in.Root + out.Template = in.Template + out.Impersonation = (*v1alpha1.ImpersonationConfig)(unsafe.Pointer(in.Impersonation)) + return nil +} + +func autoConvert_v1alpha1_StagePatch_To_internalversion_StagePatch(in *v1alpha1.StagePatch, out *StagePatch, s conversion.Scope) error { + out.Subresource = in.Subresource + out.Root = in.Root + out.Template = in.Template + out.Impersonation = (*ImpersonationConfig)(unsafe.Pointer(in.Impersonation)) return nil } -// Convert_v1alpha1_StageNext_To_internalversion_StageNext is an autogenerated conversion function. -func Convert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.StageNext, out *StageNext, s conversion.Scope) error { - return autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in, out, s) +// Convert_v1alpha1_StagePatch_To_internalversion_StagePatch is an autogenerated conversion function. +func Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(in *v1alpha1.StagePatch, out *StagePatch, s conversion.Scope) error { + return autoConvert_v1alpha1_StagePatch_To_internalversion_StagePatch(in, out, s) } func autoConvert_internalversion_StageResourceRef_To_v1alpha1_StageResourceRef(in *StageResourceRef, out *v1alpha1.StageResourceRef, s conversion.Scope) error { diff --git a/pkg/apis/internalversion/zz_generated.deepcopy.go b/pkg/apis/internalversion/zz_generated.deepcopy.go index b3d4420a5..8ced5c9c7 100644 --- a/pkg/apis/internalversion/zz_generated.deepcopy.go +++ b/pkg/apis/internalversion/zz_generated.deepcopy.go @@ -1292,10 +1292,12 @@ func (in *StageNext) DeepCopyInto(out *StageNext) { *out = new(StageFinalizers) (*in).DeepCopyInto(*out) } - if in.StatusPatchAs != nil { - in, out := &in.StatusPatchAs, &out.StatusPatchAs - *out = new(ImpersonationConfig) - **out = **in + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]StagePatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } return } @@ -1310,6 +1312,27 @@ func (in *StageNext) DeepCopy() *StageNext { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StagePatch) DeepCopyInto(out *StagePatch) { + *out = *in + if in.Impersonation != nil { + in, out := &in.Impersonation, &out.Impersonation + *out = new(ImpersonationConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StagePatch. +func (in *StagePatch) DeepCopy() *StagePatch { + if in == nil { + return nil + } + out := new(StagePatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StageResourceRef) DeepCopyInto(out *StageResourceRef) { *out = *in diff --git a/pkg/apis/v1alpha1/stage_types.go b/pkg/apis/v1alpha1/stage_types.go index a7f6baede..29a10584e 100644 --- a/pkg/apis/v1alpha1/stage_types.go +++ b/pkg/apis/v1alpha1/stage_types.go @@ -116,20 +116,44 @@ type StageNext struct { Finalizers *StageFinalizers `json:"finalizers,omitempty"` // Delete means that the resource will be deleted if true. Delete bool `json:"delete,omitempty"` + // Patches means that the resource will be patched. + Patches []StagePatch `json:"patches,omitempty"` + // StatusTemplate indicates the template for modifying the status of the resource in the next. + // Deprecated: Use Patches instead. + //+k8s:conversion-gen=false StatusTemplate string `json:"statusTemplate,omitempty"` // StatusSubresource indicates the name of the subresource that will be patched. The support for // this field is not available in Pod and Node resources. // +default="status" // +kubebuilder:default=status + // Deprecated: Use Patches instead. + //+k8s:conversion-gen=false StatusSubresource *string `json:"statusSubresource,omitempty"` // StatusPatchAs indicates the impersonating configuration for client when patching status. // In most cases this will be empty, in which case the default client service account will be used. // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. // The support for this field is not available in Pod and Node resources. + // Deprecated: Use Patches instead. + //+k8s:conversion-gen=false StatusPatchAs *ImpersonationConfig `json:"statusPatchAs,omitempty"` } +// StagePatch describes the patch for the resource. +type StagePatch struct { + // Subresource indicates the name of the subresource that will be patched. + Subresource string `json:"subresource,omitempty"` + // Root indicates the root of the template calculated by the patch. + Root string `json:"root,omitempty"` + // Template indicates the template for modifying the resource in the next. + Template string `json:"template,omitempty"` + // Impersonation indicates the impersonating configuration for client when patching status. + // In most cases this will be empty, in which case the default client service account will be used. + // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. + // The support for this field is not available in Pod and Node resources. + Impersonation *ImpersonationConfig `json:"impersonation,omitempty"` +} + // ImpersonationConfig describes the configuration for impersonating clients type ImpersonationConfig struct { // Username the target username for the client to impersonate diff --git a/pkg/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/v1alpha1/zz_generated.deepcopy.go index 2d726b62a..7667c57ff 100644 --- a/pkg/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/v1alpha1/zz_generated.deepcopy.go @@ -1801,6 +1801,13 @@ func (in *StageNext) DeepCopyInto(out *StageNext) { *out = new(StageFinalizers) (*in).DeepCopyInto(*out) } + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]StagePatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.StatusSubresource != nil { in, out := &in.StatusSubresource, &out.StatusSubresource *out = new(string) @@ -1824,6 +1831,27 @@ func (in *StageNext) DeepCopy() *StageNext { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StagePatch) DeepCopyInto(out *StagePatch) { + *out = *in + if in.Impersonation != nil { + in, out := &in.Impersonation, &out.Impersonation + *out = new(ImpersonationConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StagePatch. +func (in *StagePatch) DeepCopy() *StagePatch { + if in == nil { + return nil + } + out := new(StagePatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StageResourceRef) DeepCopyInto(out *StageResourceRef) { *out = *in diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index fffd96f57..e36c63b16 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -17,7 +17,6 @@ limitations under the License. package controllers import ( - "bytes" "context" "encoding/json" "fmt" @@ -29,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" @@ -430,7 +428,6 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage var ( result *corev1.Node - patch []byte err error ) @@ -456,19 +453,21 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage return shouldRetry(err), fmt.Errorf("failed to delete node %s: %w", node.Name, err) } result = nil - } else if next.StatusTemplate != "" { - patch, err = c.computeStatusPatch(node, next.StatusTemplate) - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to compute the status patch of node %s: %w", node.Name, err) - } - if patch == nil { - logger.Debug("Skip node", - "reason", "do not need to modify", - ) - } else { - result, err = c.patchResource(ctx, node, patch) + } else if len(next.Patches) != 0 { + for _, patch := range next.Patches { + patchData, err := c.computeMergePatch(node, patch.Root, patch.Template) if err != nil { - return shouldRetry(err), fmt.Errorf("failed to patch node %s: %w", node.Name, err) + return shouldRetry(err), fmt.Errorf("failed to compute the node %s: %w", node.Name, err) + } + if patchData == nil { + logger.Debug("Skip node", + "reason", "do not need to modify", + ) + } else { + result, err = c.patchResource(ctx, node, patchData, patch) + if err != nil { + return shouldRetry(err), fmt.Errorf("failed to patch node %s: %w", node.Name, err) + } } } } @@ -489,13 +488,20 @@ func (c *NodeController) readOnly(nodeName string) bool { } // patchResource patches the resource -func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patch []byte) (*corev1.Node, error) { +func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patchData []byte, patch internalversion.StagePatch) (*corev1.Node, error) { logger := log.FromContext(ctx) logger = logger.With( "node", node.Name, ) - result, err := c.typedClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status") + subresource := []string{} + if patch.Subresource != "" { + logger = logger.With( + "subresource", patch.Subresource, + ) + subresource = []string{patch.Subresource} + } + result, err := c.typedClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subresource...) if err != nil { return nil, err } @@ -503,40 +509,43 @@ func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, p return result, nil } -func (c *NodeController) computeStatusPatch(node *corev1.Node, tpl string) ([]byte, error) { - patch, err := c.renderer.ToJSON(tpl, node) - if err != nil { - return nil, err - } - - original, err := json.Marshal(node.Status) +func (c *NodeController) computeMergePatch(node *corev1.Node, root, tpl string) ([]byte, error) { + patchData, err := c.renderer.ToJSON(tpl, node) if err != nil { return nil, err } - sum, err := strategicpatch.StrategicMergePatch(original, patch, node.Status) - if err != nil { - return nil, err - } - - nodeStatus := corev1.NodeStatus{} - err = json.Unmarshal(sum, &nodeStatus) - if err != nil { - return nil, err - } - - dist, err := json.Marshal(nodeStatus) - if err != nil { - return nil, err + var hasChange bool + switch root { + default: + return nil, fmt.Errorf("root %q is not supported", root) + case "": + hasChange, err = checkNeedPatch(*node, patchData) + if err != nil { + return nil, err + } + case "metadata": + hasChange, err = checkNeedPatch(node.ObjectMeta, patchData) + if err != nil { + return nil, err + } + case "spec": + hasChange, err = checkNeedPatch(node.Spec, patchData) + if err != nil { + return nil, err + } + case "status": + hasChange, err = checkNeedPatch(node.Status, patchData) + if err != nil { + return nil, err + } } - if bytes.Equal(original, dist) { + if !hasChange { return nil, nil } - return json.Marshal(map[string]json.RawMessage{ - "status": patch, - }) + return wrapPatchData(root, patchData), nil } // putNodeInfo puts node info diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index bf173487b..915b5646b 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -17,7 +17,6 @@ limitations under the License. package controllers import ( - "bytes" "context" "encoding/json" "fmt" @@ -28,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" @@ -327,7 +325,6 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L var ( result *corev1.Pod - patch []byte err error ) @@ -353,19 +350,22 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L return shouldRetry(err), fmt.Errorf("failed to delete pod %s: %w", pod.Name, err) } result = nil - } else if next.StatusTemplate != "" { - patch, err = c.computeStatusPatch(pod, next.StatusTemplate) - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to compute the status patch of pod %s: %w", pod.Name, err) - } - if patch == nil { - logger.Debug("Skip pod", - "reason", "do not need to modify", - ) - } else { - result, err = c.patchResource(ctx, pod, patch) + } else if len(next.Patches) != 0 { + c.markPodIP(pod) + for _, patch := range next.Patches { + patchData, err := c.computeMergePatch(pod, patch.Root, patch.Template) if err != nil { - return shouldRetry(err), fmt.Errorf("failed to patch pod %s: %w", pod.Name, err) + return shouldRetry(err), fmt.Errorf("failed to compute the pod %s: %w", pod.Name, err) + } + if patchData == nil { + logger.Debug("Skip pod", + "reason", "do not need to modify", + ) + } else { + result, err = c.patchResource(ctx, pod, patchData, patch) + if err != nil { + return shouldRetry(err), fmt.Errorf("failed to patch pod %s: %w", pod.Name, err) + } } } } @@ -386,14 +386,21 @@ func (c *PodController) readOnly(nodeName string) bool { } // patchResource patches the resource -func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patch []byte) (*corev1.Pod, error) { +func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patchData []byte, patch internalversion.StagePatch) (*corev1.Pod, error) { logger := log.FromContext(ctx) logger = logger.With( "pod", log.KObj(pod), "node", pod.Spec.NodeName, ) - result, err := c.typedClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status") + subresource := []string{} + if patch.Subresource != "" { + logger = logger.With( + "subresource", patch.Subresource, + ) + subresource = []string{patch.Subresource} + } + result, err := c.typedClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subresource...) if err != nil { return nil, err } @@ -546,71 +553,65 @@ func (c *PodController) recyclingPodIP(ctx context.Context, pod *corev1.Pod) { } } -func (c *PodController) computeStatusPatch(pod *corev1.Pod, template string) ([]byte, error) { - if !c.enableCNI { - // Mark the pod IP that existed before the kubelet was started - if _, has := c.nodeGetFunc(pod.Spec.NodeName); has { - if pod.Status.PodIP != "" && c.nodeCacheGetter != nil { - cidr := c.defaultCIDR - node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName) - if ok { - if node.Spec.PodCIDR != "" { - cidr = node.Spec.PodCIDR - } - pool, err := c.ipPool(cidr) - if err == nil { - pool.Use(pod.Status.PodIP) - } +func (c *PodController) markPodIP(pod *corev1.Pod) { + if c.enableCNI { + return + } + // Mark the pod IP that existed before the kubelet was started + if _, has := c.nodeGetFunc(pod.Spec.NodeName); has { + if pod.Status.PodIP != "" && c.nodeCacheGetter != nil { + cidr := c.defaultCIDR + node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName) + if ok { + if node.Spec.PodCIDR != "" { + cidr = node.Spec.PodCIDR + } + pool, err := c.ipPool(cidr) + if err == nil { + pool.Use(pod.Status.PodIP) } } } } - - patch, err := c.computePatch(pod, template) - if err != nil { - return nil, err - } - if patch == nil { - return nil, nil - } - - return json.Marshal(map[string]json.RawMessage{ - "status": patch, - }) } -func (c *PodController) computePatch(pod *corev1.Pod, tpl string) ([]byte, error) { - patch, err := c.renderer.ToJSON(tpl, pod) - if err != nil { - return nil, err - } - - original, err := json.Marshal(pod.Status) - if err != nil { - return nil, err - } - - sum, err := strategicpatch.StrategicMergePatch(original, patch, pod.Status) - if err != nil { - return nil, err - } - - podStatus := corev1.PodStatus{} - err = json.Unmarshal(sum, &podStatus) +func (c *PodController) computeMergePatch(pod *corev1.Pod, root, tpl string) ([]byte, error) { + patchData, err := c.renderer.ToJSON(tpl, pod) if err != nil { return nil, err } - dist, err := json.Marshal(podStatus) - if err != nil { - return nil, err + var hasChange bool + switch root { + default: + return nil, fmt.Errorf("root %q is not supported", root) + case "": + hasChange, err = checkNeedPatch(*pod, patchData) + if err != nil { + return nil, err + } + case "metadata": + hasChange, err = checkNeedPatch(pod.ObjectMeta, patchData) + if err != nil { + return nil, err + } + case "spec": + hasChange, err = checkNeedPatch(pod.Spec, patchData) + if err != nil { + return nil, err + } + case "status": + hasChange, err = checkNeedPatch(pod.Status, patchData) + if err != nil { + return nil, err + } } - if bytes.Equal(original, dist) { + if !hasChange { return nil, nil } - return patch, nil + return wrapPatchData(root, patchData), nil } func (c *PodController) funcNodeIP() string { diff --git a/pkg/kwok/controllers/stage_controller.go b/pkg/kwok/controllers/stage_controller.go index 7d2a99116..67dcfdd75 100644 --- a/pkg/kwok/controllers/stage_controller.go +++ b/pkg/kwok/controllers/stage_controller.go @@ -307,7 +307,6 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured. var ( result *unstructured.Unstructured - patch []byte err error ) @@ -334,19 +333,21 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured. return shouldRetry(err), fmt.Errorf("failed to delete resource %s: %w", resource.GetName(), err) } result = nil - } else if next.StatusTemplate != "" { - patch, err = c.computeStatusPatch(resource, next.StatusTemplate) - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to compute the status patch of resource %s: %w", resource.GetName(), err) - } - if patch == nil { - logger.Debug("Skip resource", - "reason", "do not need to modify", - ) - } else { - result, err = c.patchResource(ctx, resource, patch, next) + } else if len(next.Patches) != 0 { + for _, patch := range next.Patches { + patchData, err := c.computeMergePatch(resource, patch.Root, patch.Template) if err != nil { - return shouldRetry(err), fmt.Errorf("failed to patch resource %s: %w", resource.GetName(), err) + return shouldRetry(err), fmt.Errorf("failed to compute resource %s: %w", resource.GetName(), err) + } + if patchData == nil { + logger.Debug("Skip resource", + "reason", "do not need to modify", + ) + } else { + result, err = c.patchResource(ctx, resource, patchData, patch) + if err != nil { + return shouldRetry(err), fmt.Errorf("failed to patch resource %s: %w", resource.GetName(), err) + } } } } @@ -360,19 +361,19 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured. } // patchResource patches the resource -func (c *StageController) patchResource(ctx context.Context, resource *unstructured.Unstructured, patch []byte, next *internalversion.StageNext) (*unstructured.Unstructured, error) { +func (c *StageController) patchResource(ctx context.Context, resource *unstructured.Unstructured, patchData []byte, patch internalversion.StagePatch) (*unstructured.Unstructured, error) { logger := log.FromContext(ctx) logger = logger.With( "resource", log.KObj(resource), ) nri := c.dynamicClient.Resource(c.gvr) - if next.StatusPatchAs != nil { + if patch.Impersonation != nil { logger.With( - "impersonate", next.StatusPatchAs.Username, + "impersonate", patch.Impersonation.Username, ) - dc, err := c.impersonatingDynamicClient.Impersonate(rest.ImpersonationConfig{UserName: next.StatusPatchAs.Username}) + dc, err := c.impersonatingDynamicClient.Impersonate(rest.ImpersonationConfig{UserName: patch.Impersonation.Username}) if err != nil { logger.Error("error getting impersonating client", err) return nil, err @@ -384,13 +385,13 @@ func (c *StageController) patchResource(ctx context.Context, resource *unstructu cli = nri.Namespace(ns) } subresource := []string{} - if next.StatusSubresource != "" { + if patch.Subresource != "" { logger = logger.With( - "subresource", next.StatusSubresource, + "subresource", patch.Subresource, ) - subresource = []string{next.StatusSubresource} + subresource = []string{patch.Subresource} } - result, err := cli.Patch(ctx, resource.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}, subresource...) + result, err := cli.Patch(ctx, resource.GetName(), types.MergePatchType, patchData, metav1.PatchOptions{}, subresource...) if err != nil { return nil, err } @@ -455,42 +456,34 @@ loop: logger.Info("Stop watch resources") } -func (c *StageController) computeStatusPatch(resource *unstructured.Unstructured, template string) ([]byte, error) { - patch, err := c.computePatch(resource, template) - if err != nil { - return nil, err - } - if patch == nil { - return nil, nil - } - - return json.Marshal(map[string]json.RawMessage{ - "status": patch, - }) -} - -func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl string) ([]byte, error) { +func (c *StageController) computeMergePatch(resource *unstructured.Unstructured, root, tpl string) ([]byte, error) { patchData, err := c.renderer.ToJSON(tpl, resource.Object) if err != nil { return nil, err } if c.schema != nil { - status, _, err := unstructured.NestedFieldNoCopy(resource.Object, "status") - if err != nil { - return nil, err - } + var base any = resource.Object + var patchMeta = c.schema - original, err := json.Marshal(status) - if err != nil { - return nil, err + if root != "" { + base, _, err = unstructured.NestedFieldNoCopy(resource.Object, root) + if err != nil { + return nil, err + } + + patchMeta, _, err = c.schema.LookupPatchMetadataForStruct(root) + if err != nil { + return nil, err + } } - statusPatchMeta, _, err := c.schema.LookupPatchMetadataForStruct("status") + original, err := json.Marshal(base) if err != nil { return nil, err } - sum, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(original, patchData, statusPatchMeta) + + sum, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(original, patchData, patchMeta) if err != nil { return nil, err } @@ -499,7 +492,8 @@ func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl return nil, nil } } - return patchData, nil + + return wrapPatchData(root, patchData), nil } // addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map diff --git a/pkg/kwok/controllers/stage_controller_test.go b/pkg/kwok/controllers/stage_controller_test.go index 5654d59c0..09994b0ad 100644 --- a/pkg/kwok/controllers/stage_controller_test.go +++ b/pkg/kwok/controllers/stage_controller_test.go @@ -69,7 +69,13 @@ func TestStageController(t *testing.T) { }, }, Next: internalversion.StageNext{ - StatusTemplate: `phase: Available`, + Patches: []internalversion.StagePatch{ + { + Template: `phase: Available`, + Subresource: "status", + Root: "status", + }, + }, }, }, }, diff --git a/pkg/kwok/controllers/utils.go b/pkg/kwok/controllers/utils.go index a198710e0..3f5dec70d 100644 --- a/pkg/kwok/controllers/utils.go +++ b/pkg/kwok/controllers/utils.go @@ -17,14 +17,18 @@ limitations under the License. package controllers import ( + "bytes" + "encoding/json" "math" "net" + "strconv" "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/strategicpatch" utilsnet "sigs.k8s.io/kwok/pkg/utils/net" "sigs.k8s.io/kwok/pkg/utils/wait" @@ -151,3 +155,41 @@ func shouldRetry(err error) bool { // ignore all other cases return false } + +// checkNeedPatch checks if the object needs to be patched +func checkNeedPatch[T any](obj T, patchData []byte) (bool, error) { + original, err := json.Marshal(obj) + if err != nil { + return false, err + } + + sum, err := strategicpatch.StrategicMergePatch(original, patchData, obj) + if err != nil { + return false, err + } + + var tmp T + err = json.Unmarshal(sum, &tmp) + if err != nil { + return false, err + } + + dist, err := json.Marshal(tmp) + if err != nil { + return false, err + } + + if bytes.Equal(original, dist) { + return false, nil + } + + return true, nil +} + +// wrapPatchData wraps the patch data with the parent key +func wrapPatchData(parent string, patchData []byte) []byte { + if parent == "" { + return patchData + } + return []byte("{" + strconv.Quote(parent) + ":" + string(patchData) + "}") +} diff --git a/site/content/en/docs/generated/apis.md b/site/content/en/docs/generated/apis.md index ce9a5bf42..28ced467f 100644 --- a/site/content/en/docs/generated/apis.md +++ b/site/content/en/docs/generated/apis.md @@ -4994,6 +4994,8 @@ ImpersonationConfig
Appears on: StageNext +, +StagePatch
ImpersonationConfig describes the configuration for impersonating clients
@@ -6131,13 +6133,27 @@ boolpatches
+
+
+[]StagePatch
+
+
+Patches means that the resource will be patched.
+statusTemplate
string
StatusTemplate indicates the template for modifying the status of the resource in the next.
+StatusTemplate indicates the template for modifying the status of the resource in the next. +Deprecated: Use Patches instead.
StatusSubresource indicates the name of the subresource that will be patched. The support for -this field is not available in Pod and Node resources.
+this field is not available in Pod and Node resources. +Deprecated: Use Patches instead.StatusPatchAs indicates the impersonating configuration for client when patching status.
In most cases this will be empty, in which case the default client service account will be used.
When this is not empty, a corresponding rbac change is required to grant impersonate
privilege.
+The support for this field is not available in Pod and Node resources.
+Deprecated: Use Patches instead.
+Appears on: +StageNext +
++
StagePatch describes the patch for the resource.
+ +Field | +Description | +
---|---|
+subresource
+
+string
+
+ |
+
+ Subresource indicates the name of the subresource that will be patched. + |
+
+root
+
+string
+
+ |
+
+ Root indicates the root of the template calculated by the patch. + |
+
+template
+
+string
+
+ |
+
+ Template indicates the template for modifying the resource in the next. + |
+
+impersonation
+
+
+ImpersonationConfig
+
+
+ |
+
+ Impersonation indicates the impersonating configuration for client when patching status.
+In most cases this will be empty, in which case the default client service account will be used.
+When this is not empty, a corresponding rbac change is required to grant |