Skip to content

Commit

Permalink
feat: add pod index label
Browse files Browse the repository at this point in the history
Signed-off-by: Se7en <chengzw258@163.com>
  • Loading branch information
cr7258 committed Jul 23, 2024
1 parent 2d3e0be commit 6c2aa29
Show file tree
Hide file tree
Showing 14 changed files with 498 additions and 10 deletions.
5 changes: 3 additions & 2 deletions apis/apps/v1alpha1/daemonset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package v1alpha1

import (
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

appspub "github.com/openkruise/kruise/apis/apps/pub"
)

// DaemonSetUpdateStrategy is a struct used to control the update strategy for a DaemonSet.
Expand Down Expand Up @@ -91,7 +92,7 @@ type RollingUpdateDaemonSet struct {
// pod is available (Ready for at least minReadySeconds) the old DaemonSet pod
// on that node is marked deleted. If the old pod becomes unavailable for any
// reason (Ready transitions to false, is evicted, or is drained) an updated
// pod is immediatedly created on that node without considering surge limits.
// pod is immediately created on that node without considering surge limits.
// Allowing surge implies the possibility that the resources consumed by the
// daemonset on any given node can double if the readiness check fails, and
// so resource intensive daemonsets should take into account that they may
Expand Down
2 changes: 1 addition & 1 deletion config/crd/bases/apps.kruise.io_daemonsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ spec:
pod is available (Ready for at least minReadySeconds) the old DaemonSet pod
on that node is marked deleted. If the old pod becomes unavailable for any
reason (Ready transitions to false, is evicted, or is drained) an updated
pod is immediatedly created on that node without considering surge limits.
pod is immediately created on that node without considering surge limits.
Allowing surge implies the possibility that the resources consumed by the
daemonset on any given node can double if the readiness check fails, and
so resource intensive daemonsets should take into account that they may
Expand Down
2 changes: 1 addition & 1 deletion config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
- --enable-leader-election
- --logtostderr=true
- --v=5
- --feature-gates=AllAlpha=true
- --feature-gates=AllAlpha=true,EnableExternalCerts=false
image: controller:latest
imagePullPolicy: Always
securityContext:
Expand Down
161 changes: 161 additions & 0 deletions docs/proposals/20240309-cloneset-support-progressDeadlineSeconds.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
---
title: CloneSet
authors:
- "@hantmac"
reviewers:
- "@Fei-Guo"
- "@furykerry"
- "@FillZpp"
creation-date: 2024-03-10
last-updated: 2024-03-10
status: implementable
---

# Support progressDeadlineSeconds in CloneSet
Table of Contents
=================

- [Support progressDeadlineSeconds in CloneSet](#Support progressDeadlineSeconds in CloneSet)
- [Table of Contents](#table-of-contents)
- [Motivation](#motivation)
- [Proposal](#proposal)
- [1. add .spec.progressDeadlineSeconds field](#1add-.spec.progressDeadlineSeconds-field)
- [2. The behavior of progressDeadlineSeconds](#2the-behavior-of-progressDeadlineSeconds)
- [3. handle the logic](#2handle-the-logic)

## Motivation

`.spec.progressDeadlineSeconds` is an optional field in Deployment that specifies the number of seconds one wants to wait for their Deployment to progress before the system reports back that the Deployment has failed progressing.
Once the deadline has been exceeded, the Deployment controller adds a DeploymentCondition with the following attributes to the Deployment's `.status.conditions`:
```
type: Progressing
status: "False"
reason: ProgressDeadlineExceeded
```

This is useful for users to control the progress of the deployment.
So we should add support for `progressDeadlineSeconds` in CloneSet as well.

## Proposal
Firstly, add the `progressDeadlineSeconds` field to the CloneSetSpec.
Then add the handle logic in cloneSet controller to handle the `progressDeadlineSeconds` field.

### 1. add .spec.progressDeadlineSeconds field
The default value of `progressDeadlineSeconds` is 600 seconds according to the [official document](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#progress-deadline-seconds).
```yaml
apiVersion: apps.kruise.io/v1alpha1
kind: CloneSet
metadata:
name: cloneset-example
spec:
replicas: 3
progressDeadlineSeconds: 600
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
```
### 2. The behavior of progressDeadlineSeconds
However, the behavior of `progressDeadlineSeconds` in CloneSet might differ from its behavior in Deployment due to the support of partition in CloneSet. If a CloneSet is paused due to partition, it's debatable whether the paused time should be included in the progress deadline.
Here are two possible interpretations of `progressDeadlineSeconds` in the context of CloneSet:
1. `progressDeadlineSeconds` could be redefined as the time taken for the CloneSet to reach completion or the "paused" state due to partition. In this case, the time during which the CloneSet is paused would NOT be included in the progress deadline.
2. Secondly, `progressDeadlineSeconds` could only be supported if the partition is not set. This means that if a partition is set, the `progressDeadlineSeconds` field would not be applicable or has no effect.

After the discussion of the community, we choose the first interpretation.So we should re-set the progressDeadlineSeconds when the CloneSet reach completion OR "the paused state".

### 3. handle the logic
In cloneset controller, we should add the logic to handle the `progressDeadlineSeconds` field. We firstly add a timer to check the progress of the CloneSet.
If the progress exceeds the `progressDeadlineSeconds`, we should add a CloneSetCondition to the CloneSet's `.status.conditions`:
```go
// add a timer to check the progress of the CloneSet
if cloneSet.Spec.ProgressDeadlineSeconds != nil {
// handle the logic
starttime := time.Now()
...
if time.Now().After(starttime.Add(time.Duration(*cloneSet.Spec.ProgressDeadlineSeconds) * time.Second)) {
newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
Type: appsv1alpha1.CloneSetProgressing,
Status: corev1.ConditionFalse,
Reason: appsv1alpha1.CloneSetProgressDeadlineExceeded,
})
}
}
```

When the CloneSet reaches the "paused" state, we should reset the timer to avoid the progress deadline being exceeded.
And we check the progress of the CloneSet in the `syncCloneSetStatus` function. If the progress exceeds the `progressDeadlineSeconds`, we should add a CloneSetCondition to the CloneSet's `.status.conditions`:

```go
const (
CloneSetProgressDeadlineExceeded CloneSetConditionReason = "ProgressDeadlineExceeded"
CloneSetConditionTypeProgressing CloneSetConditionType = "Progressing"
)
```

```go
func (c *CloneSetController) syncCloneSetStatus(cloneSet *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus) error {
...
if cloneSet.Spec.ProgressDeadlineSeconds != nil {
// handle the logic
if time.Now().After(starttime.Add(time.Duration(*cloneSet.Spec.ProgressDeadlineSeconds) * time.Second)) {
newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
Type: appsv1alpha1.CloneSetProgressing,
Status: corev1.ConditionFalse,
Reason: appsv1alpha1.CloneSetProgressDeadlineExceeded,
})
}
}
...
}
```

When the CloneSet reaches the "paused" state, we should reset the timer to avoid the progress deadline being exceeded.
```go
func (c *CloneSetController) syncCloneSetStatus(cloneSet *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus) error {
...
// reset the starttime when the CloneSet reaches the "paused" state or complete state
if cloneSet.Status.UpdatedReadyReplicas == cloneSet.Status.Replicas || replicas - updatedReplicas = partition {
starttime = time.Now()
}
if cloneSet.Spec.Paused {
starttime = time.Now()
}
...
}
...
}
```

And we can save the starttime in the `LastUpdateTime` in the CloneSet's `.status.conditions`:
```
status:
conditions:
- lastTransitionTime: "2021-11-26T20:52:12Z"
lastUpdateTime: "2021-11-26T20:52:12Z"
message: CloneSet has minimum availability.
reason: MinimumReplicasAvailable
status: "True"
type: Available
- lastTransitionTime: "2021-11-26T20:52:12Z"
lastUpdateTime: "2021-11-26T20:52:12Z"
message: 'progress deadline exceeded'
reason: ProgressDeadlineExceeded
status: "False"
type: Progressing
```

## Implementation History

- [ ] 06/07/2024: Proposal submission


8 changes: 7 additions & 1 deletion pkg/controller/statefulset/stateful_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/revision"
)
Expand Down Expand Up @@ -371,12 +373,16 @@ func initIdentity(set *appsv1beta1.StatefulSet, pod *v1.Pod) {
// updateIdentity updates pod's name, hostname, and subdomain, and StatefulSetPodNameLabel to conform to set's name
// and headless service.
func updateIdentity(set *appsv1beta1.StatefulSet, pod *v1.Pod) {
pod.Name = getPodName(set, getOrdinal(pod))
ordinal := getOrdinal(pod)
pod.Name = getPodName(set, ordinal)
pod.Namespace = set.Namespace
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[apps.StatefulSetPodNameLabel] = pod.Name
if utilfeature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
pod.Labels[apps.PodIndexLabel] = strconv.Itoa(ordinal)
}
}

// isRunningAndAvailable returns true if pod is in the PodRunning Phase,
Expand Down
17 changes: 17 additions & 0 deletions pkg/controller/statefulset/stateful_set_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ func TestUpdateIdentity(t *testing.T) {
}
}

func TestUpdateIdentityWithPodIndexLabel(t *testing.T) {
defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodIndexLabel, true)()

set := newStatefulSet(3)
pod := newStatefulSetPod(set, 1)
updateIdentity(set, pod)

podIndexFromLabel, exists := pod.Labels[apps.PodIndexLabel]
if !exists {
t.Errorf("Missing pod index label: %s", apps.PodIndexLabel)
}
podIndexFromName := strconv.Itoa(getOrdinal(pod))
if podIndexFromLabel != podIndexFromName {
t.Errorf("Pod index label value (%s) does not match pod index in pod name (%s)", podIndexFromLabel, podIndexFromName)
}
}

func TestUpdateStorage(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 1)
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/kruise_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ const (

// Enables a StatefulSet to start from an arbitrary non zero ordinal
StatefulSetStartOrdinal featuregate.Feature = "StatefulSetStartOrdinal"

// Set pod completion index as a pod label for Indexed Jobs.
PodIndexLabel featuregate.Feature = "PodIndexLabel"

// Use certs generated externally
EnableExternalCerts featuregate.Feature = "EnableExternalCerts"
)

var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
Expand Down Expand Up @@ -154,6 +160,8 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
EnhancedLivenessProbeGate: {Default: false, PreRelease: featuregate.Alpha},
RecreatePodWhenChangeVCTInCloneSetGate: {Default: false, PreRelease: featuregate.Alpha},
StatefulSetStartOrdinal: {Default: false, PreRelease: featuregate.Alpha},
PodIndexLabel: {Default: true, PreRelease: featuregate.Beta},
EnableExternalCerts: {Default: false, PreRelease: featuregate.Alpha},
}

func init() {
Expand Down
20 changes: 20 additions & 0 deletions pkg/webhook/util/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/webhook/types"
webhookutil "github.com/openkruise/kruise/pkg/webhook/util"
)
Expand All @@ -46,6 +48,24 @@ func Ensure(kubeClient clientset.Interface, handlers map[string]types.HandlerGet
if err != nil {
return fmt.Errorf("not found ValidatingWebhookConfiguration %s", validatingWebhookConfigurationName)
}

if utilfeature.DefaultFeatureGate.Enabled(features.EnableExternalCerts) {
// if using external certs, only check the caBundle of webhook
for _, wh := range mutatingConfig.Webhooks {
if wh.ClientConfig.CABundle == nil {
return fmt.Errorf("caBundle of MutatingWebhookConfiguration %s is nil", mutatingWebhookConfigurationName)

}
}

for _, wh := range validatingConfig.Webhooks {
if wh.ClientConfig.CABundle == nil {
return fmt.Errorf("caBundle of ValidatingWebhookConfiguration %s is nil", mutatingWebhookConfigurationName)
}
}
return nil
}
// if using certs generated by kruise, update webhook configurations
oldMutatingConfig := mutatingConfig.DeepCopy()
oldValidatingConfig := validatingConfig.DeepCopy()

Expand Down
9 changes: 7 additions & 2 deletions pkg/webhook/util/controller/webhook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"k8s.io/klog/v2"

extclient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
webhooktypes "github.com/openkruise/kruise/pkg/webhook/types"
webhookutil "github.com/openkruise/kruise/pkg/webhook/util"
"github.com/openkruise/kruise/pkg/webhook/util/configuration"
Expand Down Expand Up @@ -233,7 +235,11 @@ func (c *Controller) sync() error {
var err error

certWriterType := webhookutil.GetCertWriter()
if certWriterType == writer.FsCertWriter || (len(certWriterType) == 0 && len(webhookutil.GetHost()) != 0) {
if utilfeature.DefaultFeatureGate.Enabled(features.EnableExternalCerts) {
certWriter, err = writer.NewExternalCertWriter(writer.ExternalCertWriterOptions{
Clientset: c.kubeClient,
})
} else if certWriterType == writer.FsCertWriter || (len(certWriterType) == 0 && len(webhookutil.GetHost()) != 0) {
certWriter, err = writer.NewFSCertWriter(writer.FSCertWriterOptions{
Path: webhookutil.GetCertDir(),
})
Expand All @@ -254,7 +260,6 @@ func (c *Controller) sync() error {
if err := writer.WriteCertsToDir(webhookutil.GetCertDir(), certs); err != nil {
return fmt.Errorf("failed to write certs to dir: %v", err)
}

if err := configuration.Ensure(c.kubeClient, c.handlers, certs.CACert); err != nil {
return fmt.Errorf("failed to ensure configuration: %v", err)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/webhook/util/crd/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/openkruise/kruise/apis"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
webhookutil "github.com/openkruise/kruise/pkg/webhook/util"
)

Expand All @@ -49,6 +51,21 @@ func Ensure(client apiextensionsclientset.Interface, lister apiextensionslisters
return fmt.Errorf("failed to list crds: %v", err)
}

if utilfeature.DefaultFeatureGate.Enabled(features.EnableExternalCerts) {
for _, crd := range crdList {
if len(crd.Spec.Versions) == 0 || crd.Spec.Conversion == nil || crd.Spec.Conversion.Strategy != apiextensionsv1.WebhookConverter {
continue
}
if !kruiseScheme.Recognizes(schema.GroupVersionKind{Group: crd.Spec.Group, Version: crd.Spec.Versions[0].Name, Kind: crd.Spec.Names.Kind}) {
continue
}

if crd.Spec.Conversion.Webhook == nil || crd.Spec.Conversion.Webhook.ClientConfig == nil || crd.Spec.Conversion.Webhook.ClientConfig.CABundle == nil {
return fmt.Errorf("bad conversion configuration of CRD %s", crd.Name)
}
}
return nil
}
webhookConfig := apiextensionsv1.WebhookClientConfig{
CABundle: caBundle,
}
Expand Down Expand Up @@ -85,5 +102,6 @@ func Ensure(client apiextensionsclientset.Interface, lister apiextensionslisters
}
}
}

return nil
}
Loading

0 comments on commit 6c2aa29

Please sign in to comment.