Skip to content

Commit

Permalink
pub support custom workload (openkruise#982)
Browse files Browse the repository at this point in the history
  • Loading branch information
zmberg authored May 27, 2022
1 parent 44d530d commit a6f6547
Show file tree
Hide file tree
Showing 25 changed files with 343 additions and 137 deletions.
8 changes: 8 additions & 0 deletions apis/policy/v1alpha1/podunavailablebudget_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

const (
// PubProtectOperationAnnotation indicates the pub protected Operation[DELETE,UPDATE]
// the following indicates the pub only protect DELETE,UPDATE Operation
// annotations[kruise.io/pub-protect-operations]=DELETE,UPDATE
// if the annotations do not exist, the default DELETE and UPDATE are protected
PubProtectOperationAnnotation = "kruise.io/pub-protect-operations"
)

// PodUnavailableBudgetSpec defines the desired state of PodUnavailableBudget
type PodUnavailableBudgetSpec struct {
// Selector label query over pods managed by the budget
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ rules:
- '*'
verbs:
- list
- apiGroups:
- '*'
resources:
- '*/scale'
verbs:
- get
- list
- watch
- apiGroups:
- admissionregistration.k8s.io
resources:
Expand Down
6 changes: 6 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"time"

"github.com/openkruise/kruise/pkg/util/controllerfinder"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -149,6 +150,11 @@ func main() {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
err = controllerfinder.InitControllerFinder(mgr)
if err != nil {
setupLog.Error(err, "unable to start ControllerFinder")
os.Exit(1)
}

setupLog.Info("register field index")
if err := fieldindex.RegisterFieldIndexes(mgr.GetCache()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/control/pubcontrol/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ type PubControl interface {
}

func NewPubControl(client client.Client) PubControl {
controllerFinder := controllerfinder.NewControllerFinder(client)
controllerFinder := controllerfinder.Finder
return &commonControl{controllerFinder: controllerFinder, Client: client}
}
39 changes: 1 addition & 38 deletions pkg/control/pubcontrol/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
kubeClient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -55,7 +52,7 @@ type Operation string

const (
UpdateOperation = "UPDATE"
//DeleteOperation = "DELETE"
DeleteOperation = "DELETE"

// Marked pods will not be pub-protected, solving the scenario of force pod deletion
PodPubNoProtectionAnnotation = "pub.kruise.io/no-protect"
Expand Down Expand Up @@ -203,40 +200,6 @@ func isPodRecordedInPub(podName string, pub *policyv1alpha1.PodUnavailableBudget
return false
}

func GetPubForWorkload(c client.Client, workload *controllerfinder.ScaleAndSelector) (*policyv1alpha1.PodUnavailableBudget, error) {
pubList := &policyv1alpha1.PodUnavailableBudgetList{}
if err := c.List(context.TODO(), pubList, &client.ListOptions{Namespace: workload.Metadata.Namespace}, utilclient.DisableDeepCopy); err != nil {
return nil, err
}
for i := range pubList.Items {
pub := &pubList.Items[i]
// if targetReference isn't nil, priority to take effect
if pub.Spec.TargetReference != nil {
// belongs the same workload
if IsReferenceEqual(&policyv1alpha1.TargetReference{
APIVersion: workload.APIVersion,
Kind: workload.Kind,
Name: workload.Name,
}, pub.Spec.TargetReference) {
return pub, nil
}
} else {
// This error is irreversible, so continue
labelSelector, err := util.GetFastLabelSelector(pub.Spec.Selector)
if err != nil {
continue
}
// If a PUB with a nil or empty selector creeps in, it should match nothing, not everything.
if labelSelector.Empty() || !labelSelector.Matches(labels.Set(workload.TempLabels)) {
continue
}
return pub, nil
}
}
klog.V(6).Infof("could not find PodUnavailableBudget for workload %s in namespace %s with labels: %v", workload.Name, workload.Metadata.Namespace, workload.TempLabels)
return nil, nil
}

// check APIVersion, Kind, Name
func IsReferenceEqual(ref1, ref2 *policyv1alpha1.TargetReference) bool {
gv1, err := schema.ParseGroupVersion(ref1.APIVersion)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/cloneset/sync/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func New(c client.Client, recorder record.EventRecorder) Interface {
inplaceControl: inplaceupdate.New(c, clonesetutils.RevisionAdapterImpl),
lifecycleControl: lifecycle.New(c),
recorder: recorder,
controllerFinder: controllerfinder.NewControllerFinder(c),
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(c),
}
}
2 changes: 1 addition & 1 deletion pkg/controller/cloneset/sync/cloneset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ func TestUpdate(t *testing.T) {
lifecycle.New(fakeClient),
inplaceupdate.New(fakeClient, clonesetutils.RevisionAdapterImpl),
record.NewFakeRecorder(10),
controllerfinder.NewControllerFinder(fakeClient),
&controllerfinder.ControllerFinder{Client: fakeClient},
pubcontrol.NewPubControl(fakeClient),
}
currentRevision := mc.updateRevision
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcilePersistentPodState{
Client: cli,
scheme: mgr.GetScheme(),
finder: controllerfinder.NewControllerFinder(cli),
finder: controllerfinder.Finder,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func TestReconcilePersistentPodState(t *testing.T) {
fakeClient := clientBuilder.Build()
reconciler := ReconcilePersistentPodState{
Client: fakeClient,
finder: controllerfinder.NewControllerFinder(fakeClient),
finder: &controllerfinder.ControllerFinder{Client: fakeClient},
}
if _, err := reconciler.Reconcile(context.TODO(), request); err != nil {
t.Fatalf("reconcile failed, err: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("podunavailablebudget-controller"),
controllerFinder: controllerfinder.NewControllerFinder(mgr.GetClient()),
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(mgr.GetClient()),
}
}
Expand Down Expand Up @@ -208,6 +208,7 @@ type ReconcilePodUnavailableBudget struct {

// +kubebuilder:rbac:groups=policy.kruise.io,resources=podunavailablebudgets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=policy.kruise.io,resources=podunavailablebudgets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=*,resources=*/scale,verbs=get;list;watch

// pkg/controller/cloneset/cloneset_controller.go Watch for changes to CloneSet
func (r *ReconcilePodUnavailableBudget) Reconcile(_ context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/podunavailablebudget/pub_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,11 @@ func TestPubReconcile(t *testing.T) {
t.Fatalf("create pod failed: %s", err.Error())
}
}
controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient}
reconciler := ReconcilePodUnavailableBudget{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: controllerfinder.NewControllerFinder(fakeClient),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
pubControl: pubcontrol.NewPubControl(fakeClient),
}

Expand Down
56 changes: 52 additions & 4 deletions pkg/controller/podunavailablebudget/pub_pod_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -45,7 +47,7 @@ var _ handler.EventHandler = &enqueueRequestForPod{}

func newEnqueueRequestForPod(c client.Client) handler.EventHandler {
e := &enqueueRequestForPod{client: c}
e.controllerFinder = controllerfinder.NewControllerFinder(c)
e.controllerFinder = controllerfinder.Finder
e.pubControl = pubcontrol.NewPubControl(c)
return e
}
Expand Down Expand Up @@ -74,9 +76,12 @@ func (p *enqueueRequestForPod) addPod(q workqueue.RateLimitingInterface, obj run
if !ok {
return
}

// reconcile pub
pub, _ := p.pubControl.GetPubForPod(pod)
var pub *policyv1alpha1.PodUnavailableBudget
if pod.Annotations[pubcontrol.PodRelatedPubAnnotation] == "" {
pub, _ = GetPubForPod(p.client, pod)
} else {
pub, _ = p.pubControl.GetPubForPod(pod)
}
if pub == nil {
return
}
Expand All @@ -89,6 +94,49 @@ func (p *enqueueRequestForPod) addPod(q workqueue.RateLimitingInterface, obj run
})
}

func GetPubForPod(c client.Client, pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error) {
ref := metav1.GetControllerOf(pod)
if ref == nil {
return nil, nil
}
workload, err := controllerfinder.Finder.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, pod.Namespace, ref.Name, "")
if err != nil {
return nil, err
} else if workload == nil {
return nil, nil
}
pubList := &policyv1alpha1.PodUnavailableBudgetList{}
if err = c.List(context.TODO(), pubList, &client.ListOptions{Namespace: pod.Namespace}, utilclient.DisableDeepCopy); err != nil {
return nil, err
}
for i := range pubList.Items {
pub := &pubList.Items[i]
// if targetReference isn't nil, priority to take effect
if pub.Spec.TargetReference != nil {
// belongs the same workload
if pubcontrol.IsReferenceEqual(&policyv1alpha1.TargetReference{
APIVersion: workload.APIVersion,
Kind: workload.Kind,
Name: workload.Name,
}, pub.Spec.TargetReference) {
return pub, nil
}
} else {
// This error is irreversible, so continue
labelSelector, err := util.GetFastLabelSelector(pub.Spec.Selector)
if err != nil {
continue
}
// If a PUB with a nil or empty selector creeps in, it should match nothing, not everything.
if labelSelector.Empty() || !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
return pub, nil
}
}
return nil, nil
}

func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old, cur runtime.Object) {
newPod := cur.(*corev1.Pod)
oldPod := old.(*corev1.Pod)
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/workloadspread/reschedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,11 @@ func TestRescheduleSubset(t *testing.T) {
}

reconciler := ReconcileWorkloadSpread{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: controllerfinder.NewControllerFinder(fakeClient),
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: &controllerfinder.ControllerFinder{
Client: fakeClient,
},
}

err := reconciler.syncWorkloadSpread(workloadSpread)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Client: cli,
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(controllerName),
controllerFinder: controllerfinder.NewControllerFinder(mgr.GetClient()),
controllerFinder: controllerfinder.Finder,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,7 +1409,7 @@ func TestWorkloadSpreadReconcile(t *testing.T) {
reconciler := ReconcileWorkloadSpread{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: controllerfinder.NewControllerFinder(fakeClient),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
}

err := reconciler.syncWorkloadSpread(workloadSpread)
Expand Down Expand Up @@ -1609,7 +1609,7 @@ func TestDelayReconcile(t *testing.T) {
reconciler := ReconcileWorkloadSpread{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: controllerfinder.NewControllerFinder(fakeClient),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
}

durationStore = requeueduration.DurationStore{}
Expand Down Expand Up @@ -2031,7 +2031,7 @@ func TestManagerExistingPods(t *testing.T) {
reconciler := ReconcileWorkloadSpread{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: controllerfinder.NewControllerFinder(fakeClient),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
}

durationStore = requeueduration.DurationStore{}
Expand Down
Loading

0 comments on commit a6f6547

Please sign in to comment.