diff --git a/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml b/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml index ae61ad91c2..cf982c324f 100644 --- a/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml +++ b/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml @@ -135,12 +135,46 @@ spec: type: object type: array type: object + patches: + description: Patches means that the resource will be patched. + items: + description: StagePatch describes the patch for the resource. + properties: + impersonation: + description: |- + Impersonation indicates the impersonating configuration for client when patching status. + In most cases this will be empty, in which case the default client service account will be used. + When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. + The support for this field is not available in Pod and Node resources. + properties: + username: + description: Username the target username for the client + to impersonate + type: string + required: + - username + type: object + root: + description: Root indicates the root of the template calculated + by the patch. + type: string + subresource: + description: Subresource indicates the name of the subresource + that will be patched. + type: string + template: + description: Template indicates the template for modifying + the resource in the next. + type: string + type: object + type: array statusPatchAs: description: |- StatusPatchAs indicates the impersonating configuration for client when patching status. In most cases this will be empty, in which case the default client service account will be used. When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. The support for this field is not available in Pod and Node resources. + Deprecated: Use Patches instead. properties: username: description: Username the target username for the client to @@ -154,10 +188,12 @@ spec: description: |- StatusSubresource indicates the name of the subresource that will be patched. The support for this field is not available in Pod and Node resources. + Deprecated: Use Patches instead. type: string statusTemplate: - description: StatusTemplate indicates the template for modifying - the status of the resource in the next. + description: |- + StatusTemplate indicates the template for modifying the status of the resource in the next. + Deprecated: Use Patches instead. type: string type: object resourceRef: diff --git a/pkg/apis/internalversion/conversion.go b/pkg/apis/internalversion/conversion.go index f507354c90..bb0c7f50e3 100644 --- a/pkg/apis/internalversion/conversion.go +++ b/pkg/apis/internalversion/conversion.go @@ -17,6 +17,10 @@ limitations under the License. package internalversion import ( + "fmt" + + "k8s.io/apimachinery/pkg/conversion" + configv1alpha1 "sigs.k8s.io/kwok/pkg/apis/config/v1alpha1" "sigs.k8s.io/kwok/pkg/apis/v1alpha1" "sigs.k8s.io/kwok/pkg/utils/path" @@ -388,3 +392,34 @@ func ConvertToInternalMetric(in *v1alpha1.Metric) (*Metric, error) { } return &out, nil } + +// Convert_v1alpha1_StageNext_To_internalversion_StageNext converts a v1alpha1.StageNext to an internal version. +func Convert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.StageNext, out *StageNext, s conversion.Scope) error { + err := autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in, out, s) + if err != nil { + return err + } + + if in.StatusTemplate != "" { + if len(out.Patches) != 0 { + return fmt.Errorf("statusTemplate and patches are mutually exclusive") + } + subresource := "status" + if in.StatusSubresource != nil { + subresource = *in.StatusSubresource + } + oldPatch := StagePatch{ + Subresource: subresource, + Root: "status", + Template: in.StatusTemplate, + Impersonation: (*ImpersonationConfig)(in.StatusPatchAs), + } + out.Patches = []StagePatch{oldPatch} + } + return nil +} + +// Convert_internalversion_StagePatch_To_v1alpha1_StagePatch converts an internal version StagePatch to a v1alpha1.StagePatch. +func Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(in *StagePatch, out *v1alpha1.StagePatch, s conversion.Scope) error { + return autoConvert_internalversion_StagePatch_To_v1alpha1_StagePatch(in, out, s) +} diff --git a/pkg/apis/internalversion/stage_types.go b/pkg/apis/internalversion/stage_types.go index 3e368cb6a6..c569148932 100644 --- a/pkg/apis/internalversion/stage_types.go +++ b/pkg/apis/internalversion/stage_types.go @@ -81,14 +81,23 @@ type StageNext struct { Finalizers *StageFinalizers // Delete means that the resource will be deleted if true. Delete bool - // StatusTemplate indicates the template for modifying the status of the resource in the next. - StatusTemplate string - // StatusSubresource indicates the name of the subresource that will be patched. - StatusSubresource string - // StatusPatchAs indicates the impersonating configuration for client when patching status. + // Patches means that the resource will be patched. + Patches []StagePatch +} + +// StagePatch describes the patch for the resource. +type StagePatch struct { + // Subresource indicates the name of the subresource that will be patched. + Subresource string + // Root indicates the root of the template calculated by the patch. + Root string + // Template indicates the template for modifying the resource in the next. + Template string + // Impersonation indicates the impersonating configuration for client when patching status. // In most cases this will be empty, in which case the default client service account will be used. // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. - StatusPatchAs *ImpersonationConfig + // The support for this field is not available in Pod and Node resources. + Impersonation *ImpersonationConfig } // ImpersonationConfig describes the configuration for impersonating clients diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go index 87324d8212..0930580033 100644 --- a/pkg/apis/internalversion/zz_generated.conversion.go +++ b/pkg/apis/internalversion/zz_generated.conversion.go @@ -605,8 +605,8 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } - if err := s.AddGeneratedConversionFunc((*v1alpha1.StageNext)(nil), (*StageNext)(nil), func(a, b interface{}, scope conversion.Scope) error { - return Convert_v1alpha1_StageNext_To_internalversion_StageNext(a.(*v1alpha1.StageNext), b.(*StageNext), scope) + if err := s.AddGeneratedConversionFunc((*v1alpha1.StagePatch)(nil), (*StagePatch)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(a.(*v1alpha1.StagePatch), b.(*StagePatch), scope) }); err != nil { return err } @@ -650,6 +650,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*StagePatch)(nil), (*v1alpha1.StagePatch)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(a.(*StagePatch), b.(*v1alpha1.StagePatch), scope) + }); err != nil { + return err + } + if err := s.AddConversionFunc((*v1alpha1.StageNext)(nil), (*StageNext)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha1_StageNext_To_internalversion_StageNext(a.(*v1alpha1.StageNext), b.(*StageNext), scope) + }); err != nil { + return err + } return nil } @@ -2459,11 +2469,17 @@ func autoConvert_internalversion_StageNext_To_v1alpha1_StageNext(in *StageNext, out.Event = (*v1alpha1.StageEvent)(unsafe.Pointer(in.Event)) out.Finalizers = (*v1alpha1.StageFinalizers)(unsafe.Pointer(in.Finalizers)) out.Delete = in.Delete - out.StatusTemplate = in.StatusTemplate - if err := v1.Convert_string_To_Pointer_string(&in.StatusSubresource, &out.StatusSubresource, s); err != nil { - return err + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]v1alpha1.StagePatch, len(*in)) + for i := range *in { + if err := Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Patches = nil } - out.StatusPatchAs = (*v1alpha1.ImpersonationConfig)(unsafe.Pointer(in.StatusPatchAs)) return nil } @@ -2476,17 +2492,42 @@ func autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.St out.Event = (*StageEvent)(unsafe.Pointer(in.Event)) out.Finalizers = (*StageFinalizers)(unsafe.Pointer(in.Finalizers)) out.Delete = in.Delete - out.StatusTemplate = in.StatusTemplate - if err := v1.Convert_Pointer_string_To_string(&in.StatusSubresource, &out.StatusSubresource, s); err != nil { - return err + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]StagePatch, len(*in)) + for i := range *in { + if err := Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.Patches = nil } - out.StatusPatchAs = (*ImpersonationConfig)(unsafe.Pointer(in.StatusPatchAs)) + // WARNING: in.StatusTemplate requires manual conversion: does not exist in peer-type + // WARNING: in.StatusSubresource requires manual conversion: does not exist in peer-type + // WARNING: in.StatusPatchAs requires manual conversion: does not exist in peer-type + return nil +} + +func autoConvert_internalversion_StagePatch_To_v1alpha1_StagePatch(in *StagePatch, out *v1alpha1.StagePatch, s conversion.Scope) error { + out.Subresource = in.Subresource + out.Root = in.Root + out.Template = in.Template + out.Impersonation = (*v1alpha1.ImpersonationConfig)(unsafe.Pointer(in.Impersonation)) + return nil +} + +func autoConvert_v1alpha1_StagePatch_To_internalversion_StagePatch(in *v1alpha1.StagePatch, out *StagePatch, s conversion.Scope) error { + out.Subresource = in.Subresource + out.Root = in.Root + out.Template = in.Template + out.Impersonation = (*ImpersonationConfig)(unsafe.Pointer(in.Impersonation)) return nil } -// Convert_v1alpha1_StageNext_To_internalversion_StageNext is an autogenerated conversion function. -func Convert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.StageNext, out *StageNext, s conversion.Scope) error { - return autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in, out, s) +// Convert_v1alpha1_StagePatch_To_internalversion_StagePatch is an autogenerated conversion function. +func Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(in *v1alpha1.StagePatch, out *StagePatch, s conversion.Scope) error { + return autoConvert_v1alpha1_StagePatch_To_internalversion_StagePatch(in, out, s) } func autoConvert_internalversion_StageResourceRef_To_v1alpha1_StageResourceRef(in *StageResourceRef, out *v1alpha1.StageResourceRef, s conversion.Scope) error { diff --git a/pkg/apis/internalversion/zz_generated.deepcopy.go b/pkg/apis/internalversion/zz_generated.deepcopy.go index b3d4420a58..8ced5c9c71 100644 --- a/pkg/apis/internalversion/zz_generated.deepcopy.go +++ b/pkg/apis/internalversion/zz_generated.deepcopy.go @@ -1292,10 +1292,12 @@ func (in *StageNext) DeepCopyInto(out *StageNext) { *out = new(StageFinalizers) (*in).DeepCopyInto(*out) } - if in.StatusPatchAs != nil { - in, out := &in.StatusPatchAs, &out.StatusPatchAs - *out = new(ImpersonationConfig) - **out = **in + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]StagePatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } return } @@ -1310,6 +1312,27 @@ func (in *StageNext) DeepCopy() *StageNext { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StagePatch) DeepCopyInto(out *StagePatch) { + *out = *in + if in.Impersonation != nil { + in, out := &in.Impersonation, &out.Impersonation + *out = new(ImpersonationConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StagePatch. +func (in *StagePatch) DeepCopy() *StagePatch { + if in == nil { + return nil + } + out := new(StagePatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StageResourceRef) DeepCopyInto(out *StageResourceRef) { *out = *in diff --git a/pkg/apis/v1alpha1/stage_types.go b/pkg/apis/v1alpha1/stage_types.go index a7f6baede0..4683f6cde8 100644 --- a/pkg/apis/v1alpha1/stage_types.go +++ b/pkg/apis/v1alpha1/stage_types.go @@ -116,20 +116,41 @@ type StageNext struct { Finalizers *StageFinalizers `json:"finalizers,omitempty"` // Delete means that the resource will be deleted if true. Delete bool `json:"delete,omitempty"` + // Patches means that the resource will be patched. + Patches []StagePatch `json:"patches,omitempty"` + // StatusTemplate indicates the template for modifying the status of the resource in the next. + // Deprecated: Use Patches instead. StatusTemplate string `json:"statusTemplate,omitempty"` // StatusSubresource indicates the name of the subresource that will be patched. The support for // this field is not available in Pod and Node resources. // +default="status" // +kubebuilder:default=status + // Deprecated: Use Patches instead. StatusSubresource *string `json:"statusSubresource,omitempty"` // StatusPatchAs indicates the impersonating configuration for client when patching status. // In most cases this will be empty, in which case the default client service account will be used. // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. // The support for this field is not available in Pod and Node resources. + // Deprecated: Use Patches instead. StatusPatchAs *ImpersonationConfig `json:"statusPatchAs,omitempty"` } +// StagePatch describes the patch for the resource. +type StagePatch struct { + // Subresource indicates the name of the subresource that will be patched. + Subresource string `json:"subresource,omitempty"` + // Root indicates the root of the template calculated by the patch. + Root string `json:"root,omitempty"` + // Template indicates the template for modifying the resource in the next. + Template string `json:"template,omitempty"` + // Impersonation indicates the impersonating configuration for client when patching status. + // In most cases this will be empty, in which case the default client service account will be used. + // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege. + // The support for this field is not available in Pod and Node resources. + Impersonation *ImpersonationConfig `json:"impersonation,omitempty"` +} + // ImpersonationConfig describes the configuration for impersonating clients type ImpersonationConfig struct { // Username the target username for the client to impersonate diff --git a/pkg/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/v1alpha1/zz_generated.deepcopy.go index 2d726b62a5..7667c57fff 100644 --- a/pkg/apis/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/v1alpha1/zz_generated.deepcopy.go @@ -1801,6 +1801,13 @@ func (in *StageNext) DeepCopyInto(out *StageNext) { *out = new(StageFinalizers) (*in).DeepCopyInto(*out) } + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]StagePatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.StatusSubresource != nil { in, out := &in.StatusSubresource, &out.StatusSubresource *out = new(string) @@ -1824,6 +1831,27 @@ func (in *StageNext) DeepCopy() *StageNext { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StagePatch) DeepCopyInto(out *StagePatch) { + *out = *in + if in.Impersonation != nil { + in, out := &in.Impersonation, &out.Impersonation + *out = new(ImpersonationConfig) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StagePatch. +func (in *StagePatch) DeepCopy() *StagePatch { + if in == nil { + return nil + } + out := new(StagePatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StageResourceRef) DeepCopyInto(out *StageResourceRef) { *out = *in diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index 76fb4bd11f..26f368dee7 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -17,7 +17,6 @@ limitations under the License. package controllers import ( - "bytes" "context" "encoding/json" "fmt" @@ -29,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" @@ -430,7 +428,6 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage var ( result *corev1.Node - patch []byte err error ) @@ -455,19 +452,21 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage if err != nil { return shouldRetry(err), fmt.Errorf("failed to delete node %s: %w", node.Name, err) } - } else if next.StatusTemplate != "" { - patch, err = c.computeStatusPatch(node, next.StatusTemplate) - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to compute the status patch of node %s: %w", node.Name, err) - } - if patch == nil { - logger.Debug("Skip node", - "reason", "do not need to modify", - ) - } else { - result, err = c.patchResource(ctx, node, patch) + } else if len(next.Patches) != 0 { + for _, patch := range next.Patches { + patchData, err := c.computeMergePatch(node, patch.Root, patch.Template) if err != nil { - return shouldRetry(err), fmt.Errorf("failed to patch node %s: %w", node.Name, err) + return shouldRetry(err), fmt.Errorf("failed to compute the node %s: %w", node.Name, err) + } + if patchData == nil { + logger.Debug("Skip node", + "reason", "do not need to modify", + ) + } else { + result, err = c.patchResource(ctx, node, patchData, patch) + if err != nil { + return shouldRetry(err), fmt.Errorf("failed to patch node %s: %w", node.Name, err) + } } } } @@ -488,13 +487,20 @@ func (c *NodeController) readOnly(nodeName string) bool { } // patchResource patches the resource -func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patch []byte) (*corev1.Node, error) { +func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patchData []byte, patch internalversion.StagePatch) (*corev1.Node, error) { logger := log.FromContext(ctx) logger = logger.With( "node", node.Name, ) - result, err := c.typedClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status") + subresource := []string{} + if patch.Subresource != "" { + logger = logger.With( + "subresource", patch.Subresource, + ) + subresource = []string{patch.Subresource} + } + result, err := c.typedClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subresource...) if err != nil { return nil, err } @@ -502,40 +508,43 @@ func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, p return result, nil } -func (c *NodeController) computeStatusPatch(node *corev1.Node, tpl string) ([]byte, error) { - patch, err := c.renderer.ToJSON(tpl, node) - if err != nil { - return nil, err - } - - original, err := json.Marshal(node.Status) +func (c *NodeController) computeMergePatch(node *corev1.Node, root, tpl string) ([]byte, error) { + patchData, err := c.renderer.ToJSON(tpl, node) if err != nil { return nil, err } - sum, err := strategicpatch.StrategicMergePatch(original, patch, node.Status) - if err != nil { - return nil, err - } - - nodeStatus := corev1.NodeStatus{} - err = json.Unmarshal(sum, &nodeStatus) - if err != nil { - return nil, err - } - - dist, err := json.Marshal(nodeStatus) - if err != nil { - return nil, err + var hasChange bool + switch root { + default: + return nil, fmt.Errorf("root %q is not supported", root) + case "": + hasChange, err = checkNeedPatch(*node, patchData) + if err != nil { + return nil, err + } + case "metadata": + hasChange, err = checkNeedPatch(node.ObjectMeta, patchData) + if err != nil { + return nil, err + } + case "spec": + hasChange, err = checkNeedPatch(node.Spec, patchData) + if err != nil { + return nil, err + } + case "status": + hasChange, err = checkNeedPatch(node.Status, patchData) + if err != nil { + return nil, err + } } - if bytes.Equal(original, dist) { + if !hasChange { return nil, nil } - return json.Marshal(map[string]json.RawMessage{ - "status": patch, - }) + return wrapPatchData(root, patchData), nil } // putNodeInfo puts node info diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go index 8a97c62f20..d59b2fdc83 100644 --- a/pkg/kwok/controllers/pod_controller.go +++ b/pkg/kwok/controllers/pod_controller.go @@ -17,7 +17,6 @@ limitations under the License. package controllers import ( - "bytes" "context" "encoding/json" "fmt" @@ -28,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/utils/clock" @@ -327,7 +325,6 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L var ( result *corev1.Pod - patch []byte err error ) @@ -350,23 +347,24 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L if next.Delete { err = c.deleteResource(ctx, pod) if err != nil { - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to delete pod %s: %w", pod.Name, err) - } - } - } else if next.StatusTemplate != "" { - patch, err = c.computeStatusPatch(pod, next.StatusTemplate) - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to compute the status patch of pod %s: %w", pod.Name, err) + return shouldRetry(err), fmt.Errorf("failed to delete pod %s: %w", pod.Name, err) } - if patch == nil { - logger.Debug("Skip pod", - "reason", "do not need to modify", - ) - } else { - result, err = c.patchResource(ctx, pod, patch) + } else if len(next.Patches) != 0 { + c.markPodIP(pod) + for _, patch := range next.Patches { + patchData, err := c.computeMergePatch(pod, patch.Root, patch.Template) if err != nil { - return shouldRetry(err), fmt.Errorf("failed to patch pod %s: %w", pod.Name, err) + return shouldRetry(err), fmt.Errorf("failed to compute the pod %s: %w", pod.Name, err) + } + if patchData == nil { + logger.Debug("Skip pod", + "reason", "do not need to modify", + ) + } else { + result, err = c.patchResource(ctx, pod, patchData, patch) + if err != nil { + return shouldRetry(err), fmt.Errorf("failed to patch pod %s: %w", pod.Name, err) + } } } } @@ -387,14 +385,21 @@ func (c *PodController) readOnly(nodeName string) bool { } // patchResource patches the resource -func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patch []byte) (*corev1.Pod, error) { +func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patchData []byte, patch internalversion.StagePatch) (*corev1.Pod, error) { logger := log.FromContext(ctx) logger = logger.With( "pod", log.KObj(pod), "node", pod.Spec.NodeName, ) - result, err := c.typedClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status") + subresource := []string{} + if patch.Subresource != "" { + logger = logger.With( + "subresource", patch.Subresource, + ) + subresource = []string{patch.Subresource} + } + result, err := c.typedClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subresource...) if err != nil { return nil, err } @@ -547,71 +552,65 @@ func (c *PodController) recyclingPodIP(ctx context.Context, pod *corev1.Pod) { } } -func (c *PodController) computeStatusPatch(pod *corev1.Pod, template string) ([]byte, error) { - if !c.enableCNI { - // Mark the pod IP that existed before the kubelet was started - if _, has := c.nodeGetFunc(pod.Spec.NodeName); has { - if pod.Status.PodIP != "" && c.nodeCacheGetter != nil { - cidr := c.defaultCIDR - node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName) - if ok { - if node.Spec.PodCIDR != "" { - cidr = node.Spec.PodCIDR - } - pool, err := c.ipPool(cidr) - if err == nil { - pool.Use(pod.Status.PodIP) - } +func (c *PodController) markPodIP(pod *corev1.Pod) { + if c.enableCNI { + return + } + // Mark the pod IP that existed before the kubelet was started + if _, has := c.nodeGetFunc(pod.Spec.NodeName); has { + if pod.Status.PodIP != "" && c.nodeCacheGetter != nil { + cidr := c.defaultCIDR + node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName) + if ok { + if node.Spec.PodCIDR != "" { + cidr = node.Spec.PodCIDR + } + pool, err := c.ipPool(cidr) + if err == nil { + pool.Use(pod.Status.PodIP) } } } } - - patch, err := c.computePatch(pod, template) - if err != nil { - return nil, err - } - if patch == nil { - return nil, nil - } - - return json.Marshal(map[string]json.RawMessage{ - "status": patch, - }) } -func (c *PodController) computePatch(pod *corev1.Pod, tpl string) ([]byte, error) { - patch, err := c.renderer.ToJSON(tpl, pod) - if err != nil { - return nil, err - } - - original, err := json.Marshal(pod.Status) +func (c *PodController) computeMergePatch(pod *corev1.Pod, root, tpl string) ([]byte, error) { + patchData, err := c.renderer.ToJSON(tpl, pod) if err != nil { return nil, err } - sum, err := strategicpatch.StrategicMergePatch(original, patch, pod.Status) - if err != nil { - return nil, err - } - - podStatus := corev1.PodStatus{} - err = json.Unmarshal(sum, &podStatus) - if err != nil { - return nil, err - } - - dist, err := json.Marshal(podStatus) - if err != nil { - return nil, err + var hasChange bool + switch root { + default: + return nil, fmt.Errorf("root %q is not supported", root) + case "": + hasChange, err = checkNeedPatch(*pod, patchData) + if err != nil { + return nil, err + } + case "metadata": + hasChange, err = checkNeedPatch(pod.ObjectMeta, patchData) + if err != nil { + return nil, err + } + case "spec": + hasChange, err = checkNeedPatch(pod.Spec, patchData) + if err != nil { + return nil, err + } + case "status": + hasChange, err = checkNeedPatch(pod.Status, patchData) + if err != nil { + return nil, err + } } - if bytes.Equal(original, dist) { + if !hasChange { return nil, nil } - return patch, nil + return wrapPatchData(root, patchData), nil } func (c *PodController) funcNodeIP() string { diff --git a/pkg/kwok/controllers/stage_controller.go b/pkg/kwok/controllers/stage_controller.go index 9ce3fb7592..9056124e89 100644 --- a/pkg/kwok/controllers/stage_controller.go +++ b/pkg/kwok/controllers/stage_controller.go @@ -307,7 +307,6 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured. var ( result *unstructured.Unstructured - patch []byte err error ) @@ -332,19 +331,21 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured. if err != nil { return shouldRetry(err), fmt.Errorf("failed to delete resource %s: %w", resource.GetName(), err) } - } else if next.StatusTemplate != "" { - patch, err = c.computeStatusPatch(resource, next.StatusTemplate) - if err != nil { - return shouldRetry(err), fmt.Errorf("failed to compute the status patch of resource %s: %w", resource.GetName(), err) - } - if patch == nil { - logger.Debug("Skip resource", - "reason", "do not need to modify", - ) - } else { - result, err = c.patchResource(ctx, resource, patch, next) + } else if len(next.Patches) != 0 { + for _, patch := range next.Patches { + patchData, err := c.computeMergePatch(resource, patch.Root, patch.Template) if err != nil { - return shouldRetry(err), fmt.Errorf("failed to patch resource %s: %w", resource.GetName(), err) + return shouldRetry(err), fmt.Errorf("failed to compute resource %s: %w", resource.GetName(), err) + } + if patchData == nil { + logger.Debug("Skip resource", + "reason", "do not need to modify", + ) + } else { + result, err = c.patchResource(ctx, resource, patchData, patch) + if err != nil { + return shouldRetry(err), fmt.Errorf("failed to patch resource %s: %w", resource.GetName(), err) + } } } } @@ -358,19 +359,19 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured. } // patchResource patches the resource -func (c *StageController) patchResource(ctx context.Context, resource *unstructured.Unstructured, patch []byte, next *internalversion.StageNext) (*unstructured.Unstructured, error) { +func (c *StageController) patchResource(ctx context.Context, resource *unstructured.Unstructured, patchData []byte, patch internalversion.StagePatch) (*unstructured.Unstructured, error) { logger := log.FromContext(ctx) logger = logger.With( "resource", log.KObj(resource), ) nri := c.dynamicClient.Resource(c.gvr) - if next.StatusPatchAs != nil { + if patch.Impersonation != nil { logger.With( - "impersonate", next.StatusPatchAs.Username, + "impersonate", patch.Impersonation.Username, ) - dc, err := c.impersonatingDynamicClient.Impersonate(rest.ImpersonationConfig{UserName: next.StatusPatchAs.Username}) + dc, err := c.impersonatingDynamicClient.Impersonate(rest.ImpersonationConfig{UserName: patch.Impersonation.Username}) if err != nil { logger.Error("error getting impersonating client", err) return nil, err @@ -382,13 +383,13 @@ func (c *StageController) patchResource(ctx context.Context, resource *unstructu cli = nri.Namespace(ns) } subresource := []string{} - if next.StatusSubresource != "" { + if patch.Subresource != "" { logger = logger.With( - "subresource", next.StatusSubresource, + "subresource", patch.Subresource, ) - subresource = []string{next.StatusSubresource} + subresource = []string{patch.Subresource} } - result, err := cli.Patch(ctx, resource.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}, subresource...) + result, err := cli.Patch(ctx, resource.GetName(), types.MergePatchType, patchData, metav1.PatchOptions{}, subresource...) if err != nil { return nil, err } @@ -453,42 +454,34 @@ loop: logger.Info("Stop watch resources") } -func (c *StageController) computeStatusPatch(resource *unstructured.Unstructured, template string) ([]byte, error) { - patch, err := c.computePatch(resource, template) - if err != nil { - return nil, err - } - if patch == nil { - return nil, nil - } - - return json.Marshal(map[string]json.RawMessage{ - "status": patch, - }) -} - -func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl string) ([]byte, error) { +func (c *StageController) computeMergePatch(resource *unstructured.Unstructured, root, tpl string) ([]byte, error) { patchData, err := c.renderer.ToJSON(tpl, resource.Object) if err != nil { return nil, err } if c.schema != nil { - status, _, err := unstructured.NestedFieldNoCopy(resource.Object, "status") - if err != nil { - return nil, err - } + var base any = resource.Object + var patchMeta = c.schema - original, err := json.Marshal(status) - if err != nil { - return nil, err + if root != "" { + base, _, err = unstructured.NestedFieldNoCopy(resource.Object, root) + if err != nil { + return nil, err + } + + patchMeta, _, err = c.schema.LookupPatchMetadataForStruct(root) + if err != nil { + return nil, err + } } - statusPatchMeta, _, err := c.schema.LookupPatchMetadataForStruct("status") + original, err := json.Marshal(base) if err != nil { return nil, err } - sum, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(original, patchData, statusPatchMeta) + + sum, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(original, patchData, patchMeta) if err != nil { return nil, err } @@ -497,7 +490,8 @@ func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl return nil, nil } } - return patchData, nil + + return wrapPatchData(root, patchData), nil } // addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map diff --git a/pkg/kwok/controllers/stage_controller_test.go b/pkg/kwok/controllers/stage_controller_test.go index 5654d59c00..09994b0ad4 100644 --- a/pkg/kwok/controllers/stage_controller_test.go +++ b/pkg/kwok/controllers/stage_controller_test.go @@ -69,7 +69,13 @@ func TestStageController(t *testing.T) { }, }, Next: internalversion.StageNext{ - StatusTemplate: `phase: Available`, + Patches: []internalversion.StagePatch{ + { + Template: `phase: Available`, + Subresource: "status", + Root: "status", + }, + }, }, }, }, diff --git a/pkg/kwok/controllers/utils.go b/pkg/kwok/controllers/utils.go index a198710e0c..83917d3c03 100644 --- a/pkg/kwok/controllers/utils.go +++ b/pkg/kwok/controllers/utils.go @@ -17,6 +17,8 @@ limitations under the License. package controllers import ( + "bytes" + "encoding/json" "math" "net" "sync" @@ -25,9 +27,11 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/strategicpatch" utilsnet "sigs.k8s.io/kwok/pkg/utils/net" "sigs.k8s.io/kwok/pkg/utils/wait" + "strconv" ) func parseCIDR(s string) (*net.IPNet, error) { @@ -151,3 +155,41 @@ func shouldRetry(err error) bool { // ignore all other cases return false } + +// checkNeedPatch checks if the object needs to be patched +func checkNeedPatch[T any](obj T, patchData []byte) (bool, error) { + original, err := json.Marshal(obj) + if err != nil { + return false, err + } + + sum, err := strategicpatch.StrategicMergePatch(original, patchData, obj) + if err != nil { + return false, err + } + + var tmp T + err = json.Unmarshal(sum, &tmp) + if err != nil { + return false, err + } + + dist, err := json.Marshal(tmp) + if err != nil { + return false, err + } + + if bytes.Equal(original, dist) { + return false, nil + } + + return true, nil +} + +// wrapPatchData wraps the patch data with the parent key +func wrapPatchData(parent string, patchData []byte) []byte { + if parent == "" { + return patchData + } + return []byte("{" + strconv.Quote(parent) + ":" + string(patchData) + "}") +} diff --git a/site/content/en/docs/generated/apis.md b/site/content/en/docs/generated/apis.md index 68002a9ef9..64cf324bc5 100644 --- a/site/content/en/docs/generated/apis.md +++ b/site/content/en/docs/generated/apis.md @@ -4995,6 +4995,8 @@ ImpersonationConfig
Appears on: StageNext +, +StagePatch
ImpersonationConfig describes the configuration for impersonating clients
@@ -6132,13 +6134,27 @@ boolpatches
+
+
+[]StagePatch
+
+
+Patches means that the resource will be patched.
+statusTemplate
string
StatusTemplate indicates the template for modifying the status of the resource in the next.
+StatusTemplate indicates the template for modifying the status of the resource in the next. +Deprecated: Use Patches instead.
StatusSubresource indicates the name of the subresource that will be patched. The support for -this field is not available in Pod and Node resources.
+this field is not available in Pod and Node resources. +Deprecated: Use Patches instead.StatusPatchAs indicates the impersonating configuration for client when patching status.
In most cases this will be empty, in which case the default client service account will be used.
When this is not empty, a corresponding rbac change is required to grant impersonate
privilege.
+The support for this field is not available in Pod and Node resources.
+Deprecated: Use Patches instead.
+Appears on: +StageNext +
++
StagePatch describes the patch for the resource.
+ +Field | +Description | +
---|---|
+subresource
+
+string
+
+ |
+
+ Subresource indicates the name of the subresource that will be patched. + |
+
+root
+
+string
+
+ |
+
+ Root indicates the root of the template calculated by the patch. + |
+
+template
+
+string
+
+ |
+
+ Template indicates the template for modifying the resource in the next. + |
+
+impersonation
+
+
+ImpersonationConfig
+
+
+ |
+
+ Impersonation indicates the impersonating configuration for client when patching status.
+In most cases this will be empty, in which case the default client service account will be used.
+When this is not empty, a corresponding rbac change is required to grant |