Skip to content

Commit

Permalink
Add an event recorder for Admission Check Controller (#1271)
Browse files Browse the repository at this point in the history
* added recorders for admission checks controller

* review updates

* resolve comments of review

* review fixes
  • Loading branch information
Anton committed Nov 14, 2023
1 parent cf25c71 commit e7bb101
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
// setup provision admission check controller
if features.Enabled(features.ProvisioningACC) && provisioning.ServerSupportsProvisioningRequest(mgr) {
// A info message is added in setupIndexes if autoscaling is not supported by the cluster
if err := provisioning.NewController(mgr.GetClient()).SetupWithManager(mgr); err != nil {
if err := provisioning.NewController(mgr.GetClient(), mgr.GetEventRecorderFor("kueue-provisioning-request-controller")).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Could not setup provisioning controller")
os.Exit(1)
}
Expand Down
28 changes: 24 additions & 4 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
autoscaling "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand All @@ -41,6 +42,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand All @@ -59,6 +61,7 @@ var (
type Controller struct {
client client.Client
helper *storeHelper
record record.EventRecorder
}

var _ reconcile.Reconciler = (*Controller)(nil)
Expand All @@ -72,9 +75,10 @@ var _ reconcile.Reconciler = (*Controller)(nil)
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=admissionchecks,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=provisioningrequestconfigs,verbs=get;list;watch

func NewController(client client.Client) *Controller {
func NewController(client client.Client, record record.EventRecorder) *Controller {
return &Controller{
client: client,
record: record,
helper: &storeHelper{
client: client,
},
Expand Down Expand Up @@ -382,6 +386,7 @@ func requestHasParamaters(req *autoscaling.ProvisioningRequest, prc *kueue.Provi
func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, checks []string) error {
checksMap := slices.ToRefMap(wl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name })
wlPatch := workload.BaseSSAWorkload(wl)
recorderMessages := make([]string, 0, len(checks))
updated := false
for _, check := range checks {
checkState := *checksMap[check]
Expand All @@ -407,7 +412,7 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch

prFailed := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Failed)
prAccepted := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.Provisioned)
prAvaiable := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.CapacityAvailable)
prAvailable := apimeta.IsStatusConditionTrue(pr.Status.Conditions, autoscaling.CapacityAvailable)

switch {
case prFailed:
Expand All @@ -416,7 +421,7 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
checkState.State = kueue.CheckStateRejected
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
}
case prAccepted || prAvaiable:
case prAccepted || prAvailable:
if checkState.State != kueue.CheckStateReady {
updated = true
checkState.State = kueue.CheckStateReady
Expand All @@ -430,10 +435,25 @@ func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, ch
}
}
}

existingCondition := workload.FindAdmissionCheck(wlPatch.Status.AdmissionChecks, checkState.Name)
if existingCondition != nil && existingCondition.State != checkState.State {
message := fmt.Sprintf("Admission check %s updated state from %s to %s", checkState.Name, existingCondition.State, checkState.State)
if checkState.Message != "" {
message += fmt.Sprintf(" with message %s", checkState.Message)
}
recorderMessages = append(recorderMessages, message)
}

workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState)
}
if updated {
return c.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
if err := c.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership); err != nil {
return err
}
for i := range recorderMessages {
c.record.Eventf(wl, corev1.EventTypeNormal, "AdmissionCheckUpdated", api.TruncateEventMessage(recorderMessages[i]))
}
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
autoscaling "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -580,7 +581,8 @@ func TestReconcile(t *testing.T) {
)

k8sclient := builder.Build()
controller := NewController(k8sclient)
recorder := record.NewBroadcaster().NewRecorder(k8sclient.Scheme(), corev1.EventSource{Component: "admission-checks-controller"})
controller := NewController(k8sclient, recorder)

req := reconcile.Request{
NamespacedName: types.NamespacedName{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func managerSetup(mgr manager.Manager, ctx context.Context) {
err = provisioning.SetupIndexer(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

reconciler := provisioning.NewController(mgr.GetClient())
reconciler := provisioning.NewController(mgr.GetClient(), mgr.GetEventRecorderFor("kueue-provisioning-request-controller"))
err = reconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
Expand Down

0 comments on commit e7bb101

Please sign in to comment.