From d219a0894220d3b4bd7ce8dac2bdb8a2ef1cec22 Mon Sep 17 00:00:00 2001
From: Shiming Zhang
Date: Tue, 5 Mar 2024 16:24:31 +0800
Subject: [PATCH] Update Stage API
---
kustomize/crd/bases/kwok.x-k8s.io_stages.yaml | 40 +++++-
pkg/apis/internalversion/conversion.go | 35 +++++
pkg/apis/internalversion/stage_types.go | 21 ++-
.../zz_generated.conversion.go | 67 +++++++--
.../internalversion/zz_generated.deepcopy.go | 31 +++-
pkg/apis/v1alpha1/stage_types.go | 21 +++
pkg/apis/v1alpha1/zz_generated.deepcopy.go | 28 ++++
pkg/kwok/controllers/node_controller.go | 93 ++++++------
pkg/kwok/controllers/pod_controller.go | 135 +++++++++---------
pkg/kwok/controllers/stage_controller.go | 86 ++++++-----
pkg/kwok/controllers/stage_controller_test.go | 8 +-
pkg/kwok/controllers/utils.go | 42 ++++++
site/content/en/docs/generated/apis.md | 92 +++++++++++-
13 files changed, 516 insertions(+), 183 deletions(-)
diff --git a/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml b/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml
index ae61ad91c2..cf982c324f 100644
--- a/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml
+++ b/kustomize/crd/bases/kwok.x-k8s.io_stages.yaml
@@ -135,12 +135,46 @@ spec:
type: object
type: array
type: object
+ patches:
+ description: Patches means that the resource will be patched.
+ items:
+ description: StagePatch describes the patch for the resource.
+ properties:
+ impersonation:
+ description: |-
+ Impersonation indicates the impersonating configuration for client when patching status.
+ In most cases this will be empty, in which case the default client service account will be used.
+ When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege.
+ The support for this field is not available in Pod and Node resources.
+ properties:
+ username:
+ description: Username the target username for the client
+ to impersonate
+ type: string
+ required:
+ - username
+ type: object
+ root:
+ description: Root indicates the root of the template calculated
+ by the patch.
+ type: string
+ subresource:
+ description: Subresource indicates the name of the subresource
+ that will be patched.
+ type: string
+ template:
+ description: Template indicates the template for modifying
+ the resource in the next.
+ type: string
+ type: object
+ type: array
statusPatchAs:
description: |-
StatusPatchAs indicates the impersonating configuration for client when patching status.
In most cases this will be empty, in which case the default client service account will be used.
When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege.
The support for this field is not available in Pod and Node resources.
+ Deprecated: Use Patches instead.
properties:
username:
description: Username the target username for the client to
@@ -154,10 +188,12 @@ spec:
description: |-
StatusSubresource indicates the name of the subresource that will be patched. The support for
this field is not available in Pod and Node resources.
+ Deprecated: Use Patches instead.
type: string
statusTemplate:
- description: StatusTemplate indicates the template for modifying
- the status of the resource in the next.
+ description: |-
+ StatusTemplate indicates the template for modifying the status of the resource in the next.
+ Deprecated: Use Patches instead.
type: string
type: object
resourceRef:
diff --git a/pkg/apis/internalversion/conversion.go b/pkg/apis/internalversion/conversion.go
index f507354c90..bb0c7f50e3 100644
--- a/pkg/apis/internalversion/conversion.go
+++ b/pkg/apis/internalversion/conversion.go
@@ -17,6 +17,10 @@ limitations under the License.
package internalversion
import (
+ "fmt"
+
+ "k8s.io/apimachinery/pkg/conversion"
+
configv1alpha1 "sigs.k8s.io/kwok/pkg/apis/config/v1alpha1"
"sigs.k8s.io/kwok/pkg/apis/v1alpha1"
"sigs.k8s.io/kwok/pkg/utils/path"
@@ -388,3 +392,34 @@ func ConvertToInternalMetric(in *v1alpha1.Metric) (*Metric, error) {
}
return &out, nil
}
+
+// Convert_v1alpha1_StageNext_To_internalversion_StageNext converts a v1alpha1.StageNext to an internal version.
+func Convert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.StageNext, out *StageNext, s conversion.Scope) error {
+ err := autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in, out, s)
+ if err != nil {
+ return err
+ }
+
+ if in.StatusTemplate != "" {
+ if len(out.Patches) != 0 {
+ return fmt.Errorf("statusTemplate and patches are mutually exclusive")
+ }
+ subresource := "status"
+ if in.StatusSubresource != nil {
+ subresource = *in.StatusSubresource
+ }
+ oldPatch := StagePatch{
+ Subresource: subresource,
+ Root: "status",
+ Template: in.StatusTemplate,
+ Impersonation: (*ImpersonationConfig)(in.StatusPatchAs),
+ }
+ out.Patches = []StagePatch{oldPatch}
+ }
+ return nil
+}
+
+// Convert_internalversion_StagePatch_To_v1alpha1_StagePatch converts an internal version StagePatch to a v1alpha1.StagePatch.
+func Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(in *StagePatch, out *v1alpha1.StagePatch, s conversion.Scope) error {
+ return autoConvert_internalversion_StagePatch_To_v1alpha1_StagePatch(in, out, s)
+}
diff --git a/pkg/apis/internalversion/stage_types.go b/pkg/apis/internalversion/stage_types.go
index 3e368cb6a6..c569148932 100644
--- a/pkg/apis/internalversion/stage_types.go
+++ b/pkg/apis/internalversion/stage_types.go
@@ -81,14 +81,23 @@ type StageNext struct {
Finalizers *StageFinalizers
// Delete means that the resource will be deleted if true.
Delete bool
- // StatusTemplate indicates the template for modifying the status of the resource in the next.
- StatusTemplate string
- // StatusSubresource indicates the name of the subresource that will be patched.
- StatusSubresource string
- // StatusPatchAs indicates the impersonating configuration for client when patching status.
+ // Patches means that the resource will be patched.
+ Patches []StagePatch
+}
+
+// StagePatch describes the patch for the resource.
+type StagePatch struct {
+ // Subresource indicates the name of the subresource that will be patched.
+ Subresource string
+ // Root indicates the root of the template calculated by the patch.
+ Root string
+ // Template indicates the template for modifying the resource in the next.
+ Template string
+ // Impersonation indicates the impersonating configuration for client when patching status.
// In most cases this will be empty, in which case the default client service account will be used.
// When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege.
- StatusPatchAs *ImpersonationConfig
+ // The support for this field is not available in Pod and Node resources.
+ Impersonation *ImpersonationConfig
}
// ImpersonationConfig describes the configuration for impersonating clients
diff --git a/pkg/apis/internalversion/zz_generated.conversion.go b/pkg/apis/internalversion/zz_generated.conversion.go
index af3d82a61b..18df585545 100644
--- a/pkg/apis/internalversion/zz_generated.conversion.go
+++ b/pkg/apis/internalversion/zz_generated.conversion.go
@@ -605,8 +605,8 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
- if err := s.AddGeneratedConversionFunc((*v1alpha1.StageNext)(nil), (*StageNext)(nil), func(a, b interface{}, scope conversion.Scope) error {
- return Convert_v1alpha1_StageNext_To_internalversion_StageNext(a.(*v1alpha1.StageNext), b.(*StageNext), scope)
+ if err := s.AddGeneratedConversionFunc((*v1alpha1.StagePatch)(nil), (*StagePatch)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(a.(*v1alpha1.StagePatch), b.(*StagePatch), scope)
}); err != nil {
return err
}
@@ -650,6 +650,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
+ if err := s.AddConversionFunc((*StagePatch)(nil), (*v1alpha1.StagePatch)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(a.(*StagePatch), b.(*v1alpha1.StagePatch), scope)
+ }); err != nil {
+ return err
+ }
+ if err := s.AddConversionFunc((*v1alpha1.StageNext)(nil), (*StageNext)(nil), func(a, b interface{}, scope conversion.Scope) error {
+ return Convert_v1alpha1_StageNext_To_internalversion_StageNext(a.(*v1alpha1.StageNext), b.(*StageNext), scope)
+ }); err != nil {
+ return err
+ }
return nil
}
@@ -2454,11 +2464,17 @@ func autoConvert_internalversion_StageNext_To_v1alpha1_StageNext(in *StageNext,
out.Event = (*v1alpha1.StageEvent)(unsafe.Pointer(in.Event))
out.Finalizers = (*v1alpha1.StageFinalizers)(unsafe.Pointer(in.Finalizers))
out.Delete = in.Delete
- out.StatusTemplate = in.StatusTemplate
- if err := v1.Convert_string_To_Pointer_string(&in.StatusSubresource, &out.StatusSubresource, s); err != nil {
- return err
+ if in.Patches != nil {
+ in, out := &in.Patches, &out.Patches
+ *out = make([]v1alpha1.StagePatch, len(*in))
+ for i := range *in {
+ if err := Convert_internalversion_StagePatch_To_v1alpha1_StagePatch(&(*in)[i], &(*out)[i], s); err != nil {
+ return err
+ }
+ }
+ } else {
+ out.Patches = nil
}
- out.StatusPatchAs = (*v1alpha1.ImpersonationConfig)(unsafe.Pointer(in.StatusPatchAs))
return nil
}
@@ -2471,17 +2487,42 @@ func autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.St
out.Event = (*StageEvent)(unsafe.Pointer(in.Event))
out.Finalizers = (*StageFinalizers)(unsafe.Pointer(in.Finalizers))
out.Delete = in.Delete
- out.StatusTemplate = in.StatusTemplate
- if err := v1.Convert_Pointer_string_To_string(&in.StatusSubresource, &out.StatusSubresource, s); err != nil {
- return err
+ if in.Patches != nil {
+ in, out := &in.Patches, &out.Patches
+ *out = make([]StagePatch, len(*in))
+ for i := range *in {
+ if err := Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(&(*in)[i], &(*out)[i], s); err != nil {
+ return err
+ }
+ }
+ } else {
+ out.Patches = nil
}
- out.StatusPatchAs = (*ImpersonationConfig)(unsafe.Pointer(in.StatusPatchAs))
+ // WARNING: in.StatusTemplate requires manual conversion: does not exist in peer-type
+ // WARNING: in.StatusSubresource requires manual conversion: does not exist in peer-type
+ // WARNING: in.StatusPatchAs requires manual conversion: does not exist in peer-type
+ return nil
+}
+
+func autoConvert_internalversion_StagePatch_To_v1alpha1_StagePatch(in *StagePatch, out *v1alpha1.StagePatch, s conversion.Scope) error {
+ out.Subresource = in.Subresource
+ out.Root = in.Root
+ out.Template = in.Template
+ out.Impersonation = (*v1alpha1.ImpersonationConfig)(unsafe.Pointer(in.Impersonation))
+ return nil
+}
+
+func autoConvert_v1alpha1_StagePatch_To_internalversion_StagePatch(in *v1alpha1.StagePatch, out *StagePatch, s conversion.Scope) error {
+ out.Subresource = in.Subresource
+ out.Root = in.Root
+ out.Template = in.Template
+ out.Impersonation = (*ImpersonationConfig)(unsafe.Pointer(in.Impersonation))
return nil
}
-// Convert_v1alpha1_StageNext_To_internalversion_StageNext is an autogenerated conversion function.
-func Convert_v1alpha1_StageNext_To_internalversion_StageNext(in *v1alpha1.StageNext, out *StageNext, s conversion.Scope) error {
- return autoConvert_v1alpha1_StageNext_To_internalversion_StageNext(in, out, s)
+// Convert_v1alpha1_StagePatch_To_internalversion_StagePatch is an autogenerated conversion function.
+func Convert_v1alpha1_StagePatch_To_internalversion_StagePatch(in *v1alpha1.StagePatch, out *StagePatch, s conversion.Scope) error {
+ return autoConvert_v1alpha1_StagePatch_To_internalversion_StagePatch(in, out, s)
}
func autoConvert_internalversion_StageResourceRef_To_v1alpha1_StageResourceRef(in *StageResourceRef, out *v1alpha1.StageResourceRef, s conversion.Scope) error {
diff --git a/pkg/apis/internalversion/zz_generated.deepcopy.go b/pkg/apis/internalversion/zz_generated.deepcopy.go
index b3d4420a58..8ced5c9c71 100644
--- a/pkg/apis/internalversion/zz_generated.deepcopy.go
+++ b/pkg/apis/internalversion/zz_generated.deepcopy.go
@@ -1292,10 +1292,12 @@ func (in *StageNext) DeepCopyInto(out *StageNext) {
*out = new(StageFinalizers)
(*in).DeepCopyInto(*out)
}
- if in.StatusPatchAs != nil {
- in, out := &in.StatusPatchAs, &out.StatusPatchAs
- *out = new(ImpersonationConfig)
- **out = **in
+ if in.Patches != nil {
+ in, out := &in.Patches, &out.Patches
+ *out = make([]StagePatch, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
}
return
}
@@ -1310,6 +1312,27 @@ func (in *StageNext) DeepCopy() *StageNext {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *StagePatch) DeepCopyInto(out *StagePatch) {
+ *out = *in
+ if in.Impersonation != nil {
+ in, out := &in.Impersonation, &out.Impersonation
+ *out = new(ImpersonationConfig)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StagePatch.
+func (in *StagePatch) DeepCopy() *StagePatch {
+ if in == nil {
+ return nil
+ }
+ out := new(StagePatch)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StageResourceRef) DeepCopyInto(out *StageResourceRef) {
*out = *in
diff --git a/pkg/apis/v1alpha1/stage_types.go b/pkg/apis/v1alpha1/stage_types.go
index a7f6baede0..4683f6cde8 100644
--- a/pkg/apis/v1alpha1/stage_types.go
+++ b/pkg/apis/v1alpha1/stage_types.go
@@ -116,20 +116,41 @@ type StageNext struct {
Finalizers *StageFinalizers `json:"finalizers,omitempty"`
// Delete means that the resource will be deleted if true.
Delete bool `json:"delete,omitempty"`
+ // Patches means that the resource will be patched.
+ Patches []StagePatch `json:"patches,omitempty"`
+
// StatusTemplate indicates the template for modifying the status of the resource in the next.
+ // Deprecated: Use Patches instead.
StatusTemplate string `json:"statusTemplate,omitempty"`
// StatusSubresource indicates the name of the subresource that will be patched. The support for
// this field is not available in Pod and Node resources.
// +default="status"
// +kubebuilder:default=status
+ // Deprecated: Use Patches instead.
StatusSubresource *string `json:"statusSubresource,omitempty"`
// StatusPatchAs indicates the impersonating configuration for client when patching status.
// In most cases this will be empty, in which case the default client service account will be used.
// When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege.
// The support for this field is not available in Pod and Node resources.
+ // Deprecated: Use Patches instead.
StatusPatchAs *ImpersonationConfig `json:"statusPatchAs,omitempty"`
}
+// StagePatch describes the patch for the resource.
+type StagePatch struct {
+ // Subresource indicates the name of the subresource that will be patched.
+ Subresource string `json:"subresource,omitempty"`
+ // Root indicates the root of the template calculated by the patch.
+ Root string `json:"root,omitempty"`
+ // Template indicates the template for modifying the resource in the next.
+ Template string `json:"template,omitempty"`
+ // Impersonation indicates the impersonating configuration for client when patching status.
+ // In most cases this will be empty, in which case the default client service account will be used.
+ // When this is not empty, a corresponding rbac change is required to grant `impersonate` privilege.
+ // The support for this field is not available in Pod and Node resources.
+ Impersonation *ImpersonationConfig `json:"impersonation,omitempty"`
+}
+
// ImpersonationConfig describes the configuration for impersonating clients
type ImpersonationConfig struct {
// Username the target username for the client to impersonate
diff --git a/pkg/apis/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/v1alpha1/zz_generated.deepcopy.go
index 2d726b62a5..7667c57fff 100644
--- a/pkg/apis/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/v1alpha1/zz_generated.deepcopy.go
@@ -1801,6 +1801,13 @@ func (in *StageNext) DeepCopyInto(out *StageNext) {
*out = new(StageFinalizers)
(*in).DeepCopyInto(*out)
}
+ if in.Patches != nil {
+ in, out := &in.Patches, &out.Patches
+ *out = make([]StagePatch, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
if in.StatusSubresource != nil {
in, out := &in.StatusSubresource, &out.StatusSubresource
*out = new(string)
@@ -1824,6 +1831,27 @@ func (in *StageNext) DeepCopy() *StageNext {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *StagePatch) DeepCopyInto(out *StagePatch) {
+ *out = *in
+ if in.Impersonation != nil {
+ in, out := &in.Impersonation, &out.Impersonation
+ *out = new(ImpersonationConfig)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StagePatch.
+func (in *StagePatch) DeepCopy() *StagePatch {
+ if in == nil {
+ return nil
+ }
+ out := new(StagePatch)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StageResourceRef) DeepCopyInto(out *StageResourceRef) {
*out = *in
diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go
index fffd96f575..e36c63b169 100644
--- a/pkg/kwok/controllers/node_controller.go
+++ b/pkg/kwok/controllers/node_controller.go
@@ -17,7 +17,6 @@ limitations under the License.
package controllers
import (
- "bytes"
"context"
"encoding/json"
"fmt"
@@ -29,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
@@ -430,7 +428,6 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage
var (
result *corev1.Node
- patch []byte
err error
)
@@ -456,19 +453,21 @@ func (c *NodeController) playStage(ctx context.Context, node *corev1.Node, stage
return shouldRetry(err), fmt.Errorf("failed to delete node %s: %w", node.Name, err)
}
result = nil
- } else if next.StatusTemplate != "" {
- patch, err = c.computeStatusPatch(node, next.StatusTemplate)
- if err != nil {
- return shouldRetry(err), fmt.Errorf("failed to compute the status patch of node %s: %w", node.Name, err)
- }
- if patch == nil {
- logger.Debug("Skip node",
- "reason", "do not need to modify",
- )
- } else {
- result, err = c.patchResource(ctx, node, patch)
+ } else if len(next.Patches) != 0 {
+ for _, patch := range next.Patches {
+ patchData, err := c.computeMergePatch(node, patch.Root, patch.Template)
if err != nil {
- return shouldRetry(err), fmt.Errorf("failed to patch node %s: %w", node.Name, err)
+ return shouldRetry(err), fmt.Errorf("failed to compute the node %s: %w", node.Name, err)
+ }
+ if patchData == nil {
+ logger.Debug("Skip node",
+ "reason", "do not need to modify",
+ )
+ } else {
+ result, err = c.patchResource(ctx, node, patchData, patch)
+ if err != nil {
+ return shouldRetry(err), fmt.Errorf("failed to patch node %s: %w", node.Name, err)
+ }
}
}
}
@@ -489,13 +488,20 @@ func (c *NodeController) readOnly(nodeName string) bool {
}
// patchResource patches the resource
-func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patch []byte) (*corev1.Node, error) {
+func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, patchData []byte, patch internalversion.StagePatch) (*corev1.Node, error) {
logger := log.FromContext(ctx)
logger = logger.With(
"node", node.Name,
)
- result, err := c.typedClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status")
+ subresource := []string{}
+ if patch.Subresource != "" {
+ logger = logger.With(
+ "subresource", patch.Subresource,
+ )
+ subresource = []string{patch.Subresource}
+ }
+ result, err := c.typedClient.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subresource...)
if err != nil {
return nil, err
}
@@ -503,40 +509,43 @@ func (c *NodeController) patchResource(ctx context.Context, node *corev1.Node, p
return result, nil
}
-func (c *NodeController) computeStatusPatch(node *corev1.Node, tpl string) ([]byte, error) {
- patch, err := c.renderer.ToJSON(tpl, node)
- if err != nil {
- return nil, err
- }
-
- original, err := json.Marshal(node.Status)
+func (c *NodeController) computeMergePatch(node *corev1.Node, root, tpl string) ([]byte, error) {
+ patchData, err := c.renderer.ToJSON(tpl, node)
if err != nil {
return nil, err
}
- sum, err := strategicpatch.StrategicMergePatch(original, patch, node.Status)
- if err != nil {
- return nil, err
- }
-
- nodeStatus := corev1.NodeStatus{}
- err = json.Unmarshal(sum, &nodeStatus)
- if err != nil {
- return nil, err
- }
-
- dist, err := json.Marshal(nodeStatus)
- if err != nil {
- return nil, err
+ var hasChange bool
+ switch root {
+ default:
+ return nil, fmt.Errorf("root %q is not supported", root)
+ case "":
+ hasChange, err = checkNeedPatch(*node, patchData)
+ if err != nil {
+ return nil, err
+ }
+ case "metadata":
+ hasChange, err = checkNeedPatch(node.ObjectMeta, patchData)
+ if err != nil {
+ return nil, err
+ }
+ case "spec":
+ hasChange, err = checkNeedPatch(node.Spec, patchData)
+ if err != nil {
+ return nil, err
+ }
+ case "status":
+ hasChange, err = checkNeedPatch(node.Status, patchData)
+ if err != nil {
+ return nil, err
+ }
}
- if bytes.Equal(original, dist) {
+ if !hasChange {
return nil, nil
}
- return json.Marshal(map[string]json.RawMessage{
- "status": patch,
- })
+ return wrapPatchData(root, patchData), nil
}
// putNodeInfo puts node info
diff --git a/pkg/kwok/controllers/pod_controller.go b/pkg/kwok/controllers/pod_controller.go
index bf173487b3..915b5646b2 100644
--- a/pkg/kwok/controllers/pod_controller.go
+++ b/pkg/kwok/controllers/pod_controller.go
@@ -17,7 +17,6 @@ limitations under the License.
package controllers
import (
- "bytes"
"context"
"encoding/json"
"fmt"
@@ -28,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/utils/clock"
@@ -327,7 +325,6 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L
var (
result *corev1.Pod
- patch []byte
err error
)
@@ -353,19 +350,22 @@ func (c *PodController) playStage(ctx context.Context, pod *corev1.Pod, stage *L
return shouldRetry(err), fmt.Errorf("failed to delete pod %s: %w", pod.Name, err)
}
result = nil
- } else if next.StatusTemplate != "" {
- patch, err = c.computeStatusPatch(pod, next.StatusTemplate)
- if err != nil {
- return shouldRetry(err), fmt.Errorf("failed to compute the status patch of pod %s: %w", pod.Name, err)
- }
- if patch == nil {
- logger.Debug("Skip pod",
- "reason", "do not need to modify",
- )
- } else {
- result, err = c.patchResource(ctx, pod, patch)
+ } else if len(next.Patches) != 0 {
+ c.markPodIP(pod)
+ for _, patch := range next.Patches {
+ patchData, err := c.computeMergePatch(pod, patch.Root, patch.Template)
if err != nil {
- return shouldRetry(err), fmt.Errorf("failed to patch pod %s: %w", pod.Name, err)
+ return shouldRetry(err), fmt.Errorf("failed to compute the pod %s: %w", pod.Name, err)
+ }
+ if patchData == nil {
+ logger.Debug("Skip pod",
+ "reason", "do not need to modify",
+ )
+ } else {
+ result, err = c.patchResource(ctx, pod, patchData, patch)
+ if err != nil {
+ return shouldRetry(err), fmt.Errorf("failed to patch pod %s: %w", pod.Name, err)
+ }
}
}
}
@@ -386,14 +386,21 @@ func (c *PodController) readOnly(nodeName string) bool {
}
// patchResource patches the resource
-func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patch []byte) (*corev1.Pod, error) {
+func (c *PodController) patchResource(ctx context.Context, pod *corev1.Pod, patchData []byte, patch internalversion.StagePatch) (*corev1.Pod, error) {
logger := log.FromContext(ctx)
logger = logger.With(
"pod", log.KObj(pod),
"node", pod.Spec.NodeName,
)
- result, err := c.typedClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "status")
+ subresource := []string{}
+ if patch.Subresource != "" {
+ logger = logger.With(
+ "subresource", patch.Subresource,
+ )
+ subresource = []string{patch.Subresource}
+ }
+ result, err := c.typedClient.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchData, metav1.PatchOptions{}, subresource...)
if err != nil {
return nil, err
}
@@ -546,71 +553,65 @@ func (c *PodController) recyclingPodIP(ctx context.Context, pod *corev1.Pod) {
}
}
-func (c *PodController) computeStatusPatch(pod *corev1.Pod, template string) ([]byte, error) {
- if !c.enableCNI {
- // Mark the pod IP that existed before the kubelet was started
- if _, has := c.nodeGetFunc(pod.Spec.NodeName); has {
- if pod.Status.PodIP != "" && c.nodeCacheGetter != nil {
- cidr := c.defaultCIDR
- node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName)
- if ok {
- if node.Spec.PodCIDR != "" {
- cidr = node.Spec.PodCIDR
- }
- pool, err := c.ipPool(cidr)
- if err == nil {
- pool.Use(pod.Status.PodIP)
- }
+func (c *PodController) markPodIP(pod *corev1.Pod) {
+ if c.enableCNI {
+ return
+ }
+ // Mark the pod IP that existed before the kubelet was started
+ if _, has := c.nodeGetFunc(pod.Spec.NodeName); has {
+ if pod.Status.PodIP != "" && c.nodeCacheGetter != nil {
+ cidr := c.defaultCIDR
+ node, ok := c.nodeCacheGetter.Get(pod.Spec.NodeName)
+ if ok {
+ if node.Spec.PodCIDR != "" {
+ cidr = node.Spec.PodCIDR
+ }
+ pool, err := c.ipPool(cidr)
+ if err == nil {
+ pool.Use(pod.Status.PodIP)
}
}
}
}
-
- patch, err := c.computePatch(pod, template)
- if err != nil {
- return nil, err
- }
- if patch == nil {
- return nil, nil
- }
-
- return json.Marshal(map[string]json.RawMessage{
- "status": patch,
- })
}
-func (c *PodController) computePatch(pod *corev1.Pod, tpl string) ([]byte, error) {
- patch, err := c.renderer.ToJSON(tpl, pod)
- if err != nil {
- return nil, err
- }
-
- original, err := json.Marshal(pod.Status)
- if err != nil {
- return nil, err
- }
-
- sum, err := strategicpatch.StrategicMergePatch(original, patch, pod.Status)
- if err != nil {
- return nil, err
- }
-
- podStatus := corev1.PodStatus{}
- err = json.Unmarshal(sum, &podStatus)
+func (c *PodController) computeMergePatch(pod *corev1.Pod, root, tpl string) ([]byte, error) {
+ patchData, err := c.renderer.ToJSON(tpl, pod)
if err != nil {
return nil, err
}
- dist, err := json.Marshal(podStatus)
- if err != nil {
- return nil, err
+ var hasChange bool
+ switch root {
+ default:
+ return nil, fmt.Errorf("root %q is not supported", root)
+ case "":
+ hasChange, err = checkNeedPatch(*pod, patchData)
+ if err != nil {
+ return nil, err
+ }
+ case "metadata":
+ hasChange, err = checkNeedPatch(pod.ObjectMeta, patchData)
+ if err != nil {
+ return nil, err
+ }
+ case "spec":
+ hasChange, err = checkNeedPatch(pod.Spec, patchData)
+ if err != nil {
+ return nil, err
+ }
+ case "status":
+ hasChange, err = checkNeedPatch(pod.Status, patchData)
+ if err != nil {
+ return nil, err
+ }
}
- if bytes.Equal(original, dist) {
+ if !hasChange {
return nil, nil
}
- return patch, nil
+ return wrapPatchData(root, patchData), nil
}
func (c *PodController) funcNodeIP() string {
diff --git a/pkg/kwok/controllers/stage_controller.go b/pkg/kwok/controllers/stage_controller.go
index a7c55d23d1..e9bf0636ae 100644
--- a/pkg/kwok/controllers/stage_controller.go
+++ b/pkg/kwok/controllers/stage_controller.go
@@ -307,7 +307,6 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured.
var (
result *unstructured.Unstructured
- patch []byte
err error
)
@@ -333,19 +332,21 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured.
return shouldRetry(err), fmt.Errorf("failed to delete resource %s: %w", resource.GetName(), err)
}
result = nil
- } else if next.StatusTemplate != "" {
- patch, err = c.computeStatusPatch(resource, next.StatusTemplate)
- if err != nil {
- return shouldRetry(err), fmt.Errorf("failed to compute the status patch of resource %s: %w", resource.GetName(), err)
- }
- if patch == nil {
- logger.Debug("Skip resource",
- "reason", "do not need to modify",
- )
- } else {
- result, err = c.patchResource(ctx, resource, patch, next)
+ } else if len(next.Patches) != 0 {
+ for _, patch := range next.Patches {
+ patchData, err := c.computeMergePatch(resource, patch.Root, patch.Template)
if err != nil {
- return shouldRetry(err), fmt.Errorf("failed to patch resource %s: %w", resource.GetName(), err)
+ return shouldRetry(err), fmt.Errorf("failed to compute resource %s: %w", resource.GetName(), err)
+ }
+ if patchData == nil {
+ logger.Debug("Skip resource",
+ "reason", "do not need to modify",
+ )
+ } else {
+ result, err = c.patchResource(ctx, resource, patchData, patch)
+ if err != nil {
+ return shouldRetry(err), fmt.Errorf("failed to patch resource %s: %w", resource.GetName(), err)
+ }
}
}
}
@@ -359,19 +360,19 @@ func (c *StageController) playStage(ctx context.Context, resource *unstructured.
}
// patchResource patches the resource
-func (c *StageController) patchResource(ctx context.Context, resource *unstructured.Unstructured, patch []byte, next *internalversion.StageNext) (*unstructured.Unstructured, error) {
+func (c *StageController) patchResource(ctx context.Context, resource *unstructured.Unstructured, patchData []byte, patch internalversion.StagePatch) (*unstructured.Unstructured, error) {
logger := log.FromContext(ctx)
logger = logger.With(
"resource", log.KObj(resource),
)
nri := c.dynamicClient.Resource(c.gvr)
- if next.StatusPatchAs != nil {
+ if patch.Impersonation != nil {
logger.With(
- "impersonate", next.StatusPatchAs.Username,
+ "impersonate", patch.Impersonation.Username,
)
- dc, err := c.impersonatingDynamicClient.Impersonate(rest.ImpersonationConfig{UserName: next.StatusPatchAs.Username})
+ dc, err := c.impersonatingDynamicClient.Impersonate(rest.ImpersonationConfig{UserName: patch.Impersonation.Username})
if err != nil {
logger.Error("error getting impersonating client", err)
return nil, err
@@ -383,13 +384,13 @@ func (c *StageController) patchResource(ctx context.Context, resource *unstructu
cli = nri.Namespace(ns)
}
subresource := []string{}
- if next.StatusSubresource != "" {
+ if patch.Subresource != "" {
logger = logger.With(
- "subresource", next.StatusSubresource,
+ "subresource", patch.Subresource,
)
- subresource = []string{next.StatusSubresource}
+ subresource = []string{patch.Subresource}
}
- result, err := cli.Patch(ctx, resource.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}, subresource...)
+ result, err := cli.Patch(ctx, resource.GetName(), types.MergePatchType, patchData, metav1.PatchOptions{}, subresource...)
if err != nil {
return nil, err
}
@@ -454,42 +455,34 @@ loop:
logger.Info("Stop watch resources")
}
-func (c *StageController) computeStatusPatch(resource *unstructured.Unstructured, template string) ([]byte, error) {
- patch, err := c.computePatch(resource, template)
- if err != nil {
- return nil, err
- }
- if patch == nil {
- return nil, nil
- }
-
- return json.Marshal(map[string]json.RawMessage{
- "status": patch,
- })
-}
-
-func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl string) ([]byte, error) {
+func (c *StageController) computeMergePatch(resource *unstructured.Unstructured, root, tpl string) ([]byte, error) {
patchData, err := c.renderer.ToJSON(tpl, resource.Object)
if err != nil {
return nil, err
}
if c.schema != nil {
- status, _, err := unstructured.NestedFieldNoCopy(resource.Object, "status")
- if err != nil {
- return nil, err
- }
+ var base any = resource.Object
+ var patchMeta = c.schema
- original, err := json.Marshal(status)
- if err != nil {
- return nil, err
+ if root != "" {
+ base, _, err = unstructured.NestedFieldNoCopy(resource.Object, root)
+ if err != nil {
+ return nil, err
+ }
+
+ patchMeta, _, err = c.schema.LookupPatchMetadataForStruct(root)
+ if err != nil {
+ return nil, err
+ }
}
- statusPatchMeta, _, err := c.schema.LookupPatchMetadataForStruct("status")
+ original, err := json.Marshal(base)
if err != nil {
return nil, err
}
- sum, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(original, patchData, statusPatchMeta)
+
+ sum, err := strategicpatch.StrategicMergePatchUsingLookupPatchMeta(original, patchData, patchMeta)
if err != nil {
return nil, err
}
@@ -498,7 +491,8 @@ func (c *StageController) computePatch(resource *unstructured.Unstructured, tpl
return nil, nil
}
}
- return patchData, nil
+
+ return wrapPatchData(root, patchData), nil
}
// addStageJob adds a stage to be applied into the underlying weight delay queue and the associated helper map
diff --git a/pkg/kwok/controllers/stage_controller_test.go b/pkg/kwok/controllers/stage_controller_test.go
index 5654d59c00..09994b0ad4 100644
--- a/pkg/kwok/controllers/stage_controller_test.go
+++ b/pkg/kwok/controllers/stage_controller_test.go
@@ -69,7 +69,13 @@ func TestStageController(t *testing.T) {
},
},
Next: internalversion.StageNext{
- StatusTemplate: `phase: Available`,
+ Patches: []internalversion.StagePatch{
+ {
+ Template: `phase: Available`,
+ Subresource: "status",
+ Root: "status",
+ },
+ },
},
},
},
diff --git a/pkg/kwok/controllers/utils.go b/pkg/kwok/controllers/utils.go
index a198710e0c..3f5dec70d5 100644
--- a/pkg/kwok/controllers/utils.go
+++ b/pkg/kwok/controllers/utils.go
@@ -17,14 +17,18 @@ limitations under the License.
package controllers
import (
+ "bytes"
+ "encoding/json"
"math"
"net"
+ "strconv"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilnet "k8s.io/apimachinery/pkg/util/net"
+ "k8s.io/apimachinery/pkg/util/strategicpatch"
utilsnet "sigs.k8s.io/kwok/pkg/utils/net"
"sigs.k8s.io/kwok/pkg/utils/wait"
@@ -151,3 +155,41 @@ func shouldRetry(err error) bool {
// ignore all other cases
return false
}
+
+// checkNeedPatch checks if the object needs to be patched
+func checkNeedPatch[T any](obj T, patchData []byte) (bool, error) {
+ original, err := json.Marshal(obj)
+ if err != nil {
+ return false, err
+ }
+
+ sum, err := strategicpatch.StrategicMergePatch(original, patchData, obj)
+ if err != nil {
+ return false, err
+ }
+
+ var tmp T
+ err = json.Unmarshal(sum, &tmp)
+ if err != nil {
+ return false, err
+ }
+
+ dist, err := json.Marshal(tmp)
+ if err != nil {
+ return false, err
+ }
+
+ if bytes.Equal(original, dist) {
+ return false, nil
+ }
+
+ return true, nil
+}
+
+// wrapPatchData wraps the patch data with the parent key
+func wrapPatchData(parent string, patchData []byte) []byte {
+ if parent == "" {
+ return patchData
+ }
+ return []byte("{" + strconv.Quote(parent) + ":" + string(patchData) + "}")
+}
diff --git a/site/content/en/docs/generated/apis.md b/site/content/en/docs/generated/apis.md
index 34c3cd53f4..de86cf2578 100644
--- a/site/content/en/docs/generated/apis.md
+++ b/site/content/en/docs/generated/apis.md
@@ -4956,6 +4956,8 @@ ImpersonationConfig
Appears on:
StageNext
+,
+StagePatch
ImpersonationConfig describes the configuration for impersonating clients
@@ -6093,13 +6095,27 @@ bool
+patches
+
+
+[]StagePatch
+
+
+ |
+
+ Patches means that the resource will be patched.
+ |
+
+
+
statusTemplate
string
|
- StatusTemplate indicates the template for modifying the status of the resource in the next.
+StatusTemplate indicates the template for modifying the status of the resource in the next.
+Deprecated: Use Patches instead.
|
@@ -6111,7 +6127,8 @@ string
StatusSubresource indicates the name of the subresource that will be patched. The support for
-this field is not available in Pod and Node resources.
+this field is not available in Pod and Node resources.
+Deprecated: Use Patches instead.
|
@@ -6127,6 +6144,77 @@ ImpersonationConfig
StatusPatchAs indicates the impersonating configuration for client when patching status.
In most cases this will be empty, in which case the default client service account will be used.
When this is not empty, a corresponding rbac change is required to grant impersonate
privilege.
+The support for this field is not available in Pod and Node resources.
+Deprecated: Use Patches instead.
+
+
+
+
+
+StagePatch
+ #
+
+
+Appears on:
+StageNext
+
+
+
StagePatch describes the patch for the resource.
+
+
+
+
+Field |
+Description |
+
+
+
+
+
+subresource
+
+string
+
+ |
+
+ Subresource indicates the name of the subresource that will be patched.
+ |
+
+
+
+root
+
+string
+
+ |
+
+ Root indicates the root of the template calculated by the patch.
+ |
+
+
+
+template
+
+string
+
+ |
+
+ Template indicates the template for modifying the resource in the next.
+ |
+
+
+
+impersonation
+
+
+ImpersonationConfig
+
+
+ |
+
+ Impersonation indicates the impersonating configuration for client when patching status.
+In most cases this will be empty, in which case the default client service account will be used.
+When this is not empty, a corresponding rbac change is required to grant impersonate privilege.
The support for this field is not available in Pod and Node resources.
|