Skip to content

Commit

Permalink
fix, register podopslifecycle webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs committed Aug 17, 2023
1 parent 035bb84 commit 7c2684b
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 32 deletions.
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package v1alpha1

const (
PodOperationProtectionFinalizerPrefix = "prot.lifecycle.kafed.kusionstack.io"
)

// +kubebuilder:object:generate=false
type PodAvailableConditions struct {
ExpectedFinalizers []string `json:"expectedFinalizers,omitempty"` // indicate the expected finalizers of a pod
Expand Down
1 change: 1 addition & 0 deletions pkg/webhook/server/add_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/klog/v2"

"kusionstack.io/kafed/pkg/webhook/server/generic"
_ "kusionstack.io/kafed/pkg/webhook/server/generic/pod"
)

func init() {
Expand Down
64 changes: 54 additions & 10 deletions pkg/webhook/server/generic/pod/opslifecycle/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import (
"kusionstack.io/kafed/pkg/log"
)

const (
waitingForLifecycleSeconds int64 = 5
)

var (
// some labels must exist together and have the same id, and they are a pair
pairLabelPrefixesMap = map[string]string{
Expand All @@ -52,17 +56,19 @@ var (
)

type ReadyToUpgrade func(pod *corev1.Pod) (bool, []string, *time.Duration)
type SatisfyExpectedFinalizers func(pod *corev1.Pod) (bool, []string, error)
type TimeLabelValue func() string

type OpsLifecycle struct {
readyToUpgrade ReadyToUpgrade

timeLabelValue TimeLabelValue
readyToUpgrade ReadyToUpgrade // for testing
satisfyExpectedFinalizers SatisfyExpectedFinalizers
timeLabelValue TimeLabelValue
}

func New(readyToUpgrade ReadyToUpgrade) *OpsLifecycle {
func New() *OpsLifecycle {
return &OpsLifecycle{
readyToUpgrade: readyToUpgrade,
readyToUpgrade: hasNoBlockingFinalizer,
satisfyExpectedFinalizers: satisfyExpectedFinalizers,
timeLabelValue: func() string {
return strconv.FormatInt(time.Now().Unix(), 10)
},
Expand Down Expand Up @@ -121,7 +127,7 @@ func (lc *OpsLifecycle) Validating(ctx context.Context, pod *corev1.Pod, logger
}

func (lc *OpsLifecycle) Mutating(ctx context.Context, oldPod, newPod *corev1.Pod, _ client.Client, logger *log.Logger) error {
lc.addReadinessGates(newPod, v1alpha1.ReadinessGatePodServiceReady)
addReadinessGates(newPod, v1alpha1.ReadinessGatePodServiceReady)

newIdToLabelsMap, typeToNumsMap, err := podopslifecycle.PodIDAndTypesMap(newPod)
if err != nil {
Expand Down Expand Up @@ -245,7 +251,7 @@ func (lc *OpsLifecycle) addLabelWithTime(pod *corev1.Pod, key string) {
pod.Labels[key] = lc.timeLabelValue()
}

func (lc *OpsLifecycle) addReadinessGates(pod *corev1.Pod, conditionType corev1.PodConditionType) {
func addReadinessGates(pod *corev1.Pod, conditionType corev1.PodConditionType) {
for _, v := range pod.Spec.ReadinessGates {
if v.ConditionType == conditionType {
return
Expand All @@ -256,11 +262,11 @@ func (lc *OpsLifecycle) addReadinessGates(pod *corev1.Pod, conditionType corev1.
})
}

func (lc *OpsLifecycle) satisfyExpectedFinalizers(pod *corev1.Pod) (bool, []string, error) {
func satisfyExpectedFinalizers(pod *corev1.Pod) (bool, []string, error) {
satisfy := true
var expectedFinalizer []string // expected finalizers that are not satisfied

availableConditions, err := lc.podAvailableConditions(pod)
availableConditions, err := podAvailableConditions(pod)
if err != nil {
return satisfy, expectedFinalizer, err
}
Expand All @@ -282,7 +288,7 @@ func (lc *OpsLifecycle) satisfyExpectedFinalizers(pod *corev1.Pod) (bool, []stri
return satisfy, expectedFinalizer, nil
}

func (lc *OpsLifecycle) podAvailableConditions(pod *corev1.Pod) (*v1alpha1.PodAvailableConditions, error) {
func podAvailableConditions(pod *corev1.Pod) (*v1alpha1.PodAvailableConditions, error) {
if pod.Annotations == nil {
return nil, nil
}
Expand All @@ -298,3 +304,41 @@ func (lc *OpsLifecycle) podAvailableConditions(pod *corev1.Pod) (*v1alpha1.PodAv
}
return availableConditions, nil
}

func hasNoBlockingFinalizer(pod *corev1.Pod) (bool, []string, *time.Duration) {
if pod == nil {
return true, nil, nil
}

hasReadinessGate := false
if pod.Spec.ReadinessGates != nil {
for _, readinessGate := range pod.Spec.ReadinessGates {
if readinessGate.ConditionType == v1alpha1.ReadinessGatePodServiceReady {
hasReadinessGate = true
break
}
}
}
if !hasReadinessGate {
// if has no service-ready ReadinessGate, treat it as normal pod.
return true, nil, nil
}

if pod.ObjectMeta.Finalizers == nil || len(pod.ObjectMeta.Finalizers) == 0 {
return true, nil, nil
}

var finalizers []string
for _, f := range pod.ObjectMeta.Finalizers {
if strings.HasPrefix(f, v1alpha1.PodOperationProtectionFinalizerPrefix) {
finalizers = append(finalizers, f)
}
}

if len(finalizers) > 0 {
requeneAfter := time.Duration(waitingForLifecycleSeconds) * time.Second
return false, finalizers, &requeneAfter
}

return true, nil, nil
}
37 changes: 36 additions & 1 deletion pkg/webhook/server/generic/pod/opslifecycle/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestMutating(t *testing.T) {
newPodLabels map[string]string
expectedLabels map[string]string

readyToUpgrade ReadyToUpgrade
satisfyExpectedFinalizers SatisfyExpectedFinalizers
readyToUpgrade ReadyToUpgrade

keyWords string // used to check the error message
}{
Expand Down Expand Up @@ -376,6 +377,27 @@ func TestMutating(t *testing.T) {
},
},

{
notes: "wait for removing finalizers",
newPodLabels: map[string]string{
fmt.Sprintf("%s/%s", v1alpha1.PodOperatedLabelPrefix, "123"): "1402144848",
fmt.Sprintf("%s/%s", v1alpha1.PodDoneOperationTypeLabelPrefix, "123"): "upgrade",
fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckLabelPrefix, "123"): "1402144848",
fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckedLabelPrefix, "123"): "1402144848",

fmt.Sprintf("%s/%s", v1alpha1.PodCompleteLabelPrefix, "123"): "1402144848",
},
expectedLabels: map[string]string{
fmt.Sprintf("%s/%s", v1alpha1.PodOperatedLabelPrefix, "123"): "1402144848",
fmt.Sprintf("%s/%s", v1alpha1.PodDoneOperationTypeLabelPrefix, "123"): "upgrade",
fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckLabelPrefix, "123"): "1402144848",
fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckedLabelPrefix, "123"): "1402144848",

fmt.Sprintf("%s/%s", v1alpha1.PodCompleteLabelPrefix, "123"): "1402144848",
},
satisfyExpectedFinalizers: satifyExpectedFinalizersReturnFalse,
},

{
notes: "all finished",
newPodLabels: map[string]string{
Expand Down Expand Up @@ -423,6 +445,11 @@ func TestMutating(t *testing.T) {
opslifecycle.readyToUpgrade = readyToUpgradeReturnTrue
}

opslifecycle.satisfyExpectedFinalizers = v.satisfyExpectedFinalizers
if opslifecycle.satisfyExpectedFinalizers == nil {
opslifecycle.satisfyExpectedFinalizers = satifyExpectedFinalizersReturnTrue
}

t.Logf("notes: %s", v.notes)
err := opslifecycle.Mutating(context.Background(), oldPod, newPod, nil, nil)
if v.keyWords == "" {
Expand All @@ -443,3 +470,11 @@ func readyToUpgradeReturnTrue(pod *corev1.Pod) (bool, []string, *time.Duration)
func readyToUpgradeReturnFalse(pod *corev1.Pod) (bool, []string, *time.Duration) {
return false, nil, nil
}

func satifyExpectedFinalizersReturnTrue(pod *corev1.Pod) (bool, []string, error) {
return true, nil, nil
}

func satifyExpectedFinalizersReturnFalse(pod *corev1.Pod) (bool, []string, error) {
return false, nil, nil
}
14 changes: 7 additions & 7 deletions pkg/webhook/server/generic/pod/pod_mutating_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ var (
)

type MutatingHandler struct {
needLifecycle NeedOpsLifecycle
opsLifecycle *opslifecycle.OpsLifecycle
needOpsLifecycle NeedOpsLifecycle
opsLifecycle *opslifecycle.OpsLifecycle
}

func NewMutatingHandler(needLifecycle NeedOpsLifecycle, readyToUpgrade opslifecycle.ReadyToUpgrade) *MutatingHandler {
func NewMutatingHandler(needOpsLifecycle NeedOpsLifecycle) *MutatingHandler {
return &MutatingHandler{
needLifecycle: needLifecycle,
opsLifecycle: opslifecycle.New(readyToUpgrade),
needOpsLifecycle: needOpsLifecycle,
opsLifecycle: opslifecycle.New(),
}
}

Expand All @@ -69,7 +69,7 @@ func (h *MutatingHandler) Handle(ctx context.Context, req admission.Request, c c
}

if req.Operation == admissionv1.Create {
if !h.needLifecycle(nil, pod) {
if h.needOpsLifecycle != nil && !h.needOpsLifecycle(nil, pod) {
return admission.Patched("Not need opslifecycle mutating")
}
}
Expand All @@ -80,7 +80,7 @@ func (h *MutatingHandler) Handle(ctx context.Context, req admission.Request, c c
return admission.Errored(http.StatusBadRequest, fmt.Errorf("fail to unmarshal old object: %s", err))
}

if !h.needLifecycle(old, pod) {
if h.needOpsLifecycle != nil && !h.needOpsLifecycle(old, pod) {
return admission.Patched("Not need opslifecycle mutating")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/webhook/server/generic/pod/pod_validating_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (h *ValidatingHandler) Handle(ctx context.Context, req admission.Request, c
s, _ := json.Marshal(req)
return admission.Errored(http.StatusBadRequest, fmt.Errorf("fail to decode old object from request %s: %s", s, err))
}
if !h.needOpsLifecycle(nil, obj) {
return admission.Allowed("pod is allowed by opslifecycle")
if h.needOpsLifecycle != nil && !h.needOpsLifecycle(nil, obj) {
return admission.Allowed("pod is ignored by opslifecycle")
}

if err := h.opslifecycle.Validating(ctx, obj, logger); err != nil {
Expand Down
23 changes: 11 additions & 12 deletions pkg/webhook/server/generic/pod/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@ package pod
import (
v1 "k8s.io/api/core/v1"

"kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/webhook/server/generic"
"kusionstack.io/kafed/pkg/webhook/server/generic/pod/opslifecycle"
)

type NeedOpsLifecycle func(oldPod, newPod *v1.Pod) bool

type PodWebhook struct {
mutatingHandler *MutatingHandler
validatingHandler *ValidatingHandler
func init() {
generic.MutatingTypeHandlerMap["Pod"] = NewMutatingHandler(ControlledByKafed)
generic.ValidatingTypeHandlerMap["Pod"] = NewValidatingHandler(ControlledByKafed)
}

func NewPodWebhook(needLifecycle NeedOpsLifecycle, readyToUpgrade opslifecycle.ReadyToUpgrade) *PodWebhook {
return &PodWebhook{
mutatingHandler: NewMutatingHandler(needLifecycle, readyToUpgrade),
validatingHandler: NewValidatingHandler(needLifecycle),
func ControlledByKafed(oldPod, newPod *v1.Pod) bool {
if newPod == nil || newPod.Labels == nil {
return false
}
if v, ok := newPod.Labels[v1alpha1.KafedSystemLabel]; !ok || v != "true" {
return false
}
}

func (h *PodWebhook) RegisterToDispatcher() {
generic.MutatingTypeHandlerMap["Pod"] = h.mutatingHandler
generic.ValidatingTypeHandlerMap["Pod"] = h.validatingHandler
return true
}

0 comments on commit 7c2684b

Please sign in to comment.