From 6cb9546aa95aeffb872fa3521ae4c379392b1a66 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Mon, 6 May 2024 10:43:09 +0800 Subject: [PATCH] support running workloads in visibility endpoint Signed-off-by: KunWuLuan --- .../v1alpha1/openapi/zz_generated.openapi.go | 140 +++++- apis/visibility/v1alpha1/types.go | 34 +- .../v1alpha1/zz_generated.deepcopy.go | 83 +++- client-go/applyconfiguration/utils.go | 4 + .../visibility/v1alpha1/clusterqueue.go | 19 +- .../visibility/v1alpha1/runningworkload.go | 187 ++++++++ .../v1alpha1/runningworkloadssummary.go | 212 +++++++++ .../typed/visibility/v1alpha1/clusterqueue.go | 14 + .../v1alpha1/fake/fake_clusterqueue.go | 10 + cmd/kueue/main.go | 2 +- keps/xxxx-list-admitted-workloads/README.md | 100 ++++ keps/xxxx-list-admitted-workloads/kep.yaml | 22 + pkg/cache/cache.go | 39 ++ pkg/visibility/api/install.go | 5 +- .../api/rest/running_workloads_cq.go | 99 ++++ .../api/rest/running_workloads_cq_test.go | 434 ++++++++++++++++++ pkg/visibility/api/rest/test_utils.go | 11 + pkg/visibility/api/rest/utils.go | 21 + pkg/visibility/server.go | 5 +- 19 files changed, 1428 insertions(+), 13 deletions(-) create mode 100644 client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go create mode 100644 client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go create mode 100644 keps/xxxx-list-admitted-workloads/README.md create mode 100644 keps/xxxx-list-admitted-workloads/kep.yaml create mode 100644 pkg/visibility/api/rest/running_workloads_cq.go create mode 100644 pkg/visibility/api/rest/running_workloads_cq_test.go diff --git a/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go b/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go index 1170a04aab..e804ec4be2 100644 --- a/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go +++ b/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go @@ -90,6 +90,9 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadOptions": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadOptions(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummary(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummaryList": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummaryList(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkload": schema_kueue_apis_visibility_v1alpha1_RunningWorkload(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary": schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummary(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummaryList": schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummaryList(ref), } } @@ -2544,12 +2547,18 @@ func schema_kueue_apis_visibility_v1alpha1_ClusterQueue(ref common.ReferenceCall Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"), }, }, + "runningWorkloadsSummary": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"), + }, + }, }, - Required: []string{"pendingWorkloadsSummary"}, + Required: []string{"pendingWorkloadsSummary", "runningWorkloadsSummary"}, }, }, Dependencies: []string{ - "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"}, + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"}, } } @@ -2883,3 +2892,130 @@ func schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummaryList(ref commo "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"}, } } + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkload(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RunningWorkload is a user-facing representation of a running workload that summarizes the relevant information for assumed resources in the cluster queue.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "localQueueName": { + SchemaProps: spec.SchemaProps{ + Description: "LocalQueueName indicates the name of the LocalQueue the workload is submitted to", + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"localQueueName"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + } +} + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummary(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RunningWorkloadsSummary contains a list of running workloads in the context of the query (within LocalQueue or ClusterQueue).", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkload"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkload"}, + } +} + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummaryList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"}, + } +} diff --git a/apis/visibility/v1alpha1/types.go b/apis/visibility/v1alpha1/types.go index 1d7b6e9980..dc09571c78 100644 --- a/apis/visibility/v1alpha1/types.go +++ b/apis/visibility/v1alpha1/types.go @@ -25,11 +25,13 @@ import ( // +k8s:openapi-gen=true // +genclient:nonNamespaced // +genclient:method=GetPendingWorkloadsSummary,verb=get,subresource=pendingworkloads,result=sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary +// +genclient:method=GetRunningWorkloadsSummary,verb=get,subresource=runningWorkloads,result=sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary type ClusterQueue struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Summary PendingWorkloadsSummary `json:"pendingWorkloadsSummary"` + PendingWorkloadsSummary PendingWorkloadsSummary `json:"pendingWorkloadsSummary"` + RunningWorkloadsSummary RunningWorkloadsSummary `json:"runningWorkloadsSummary"` } // +kubebuilder:object:root=true @@ -77,6 +79,35 @@ type PendingWorkload struct { PositionInLocalQueue int32 `json:"positionInLocalQueue"` } +// RunningWorkload is a user-facing representation of a running workload that summarizes the relevant information for +// assumed resources in the cluster queue. +type RunningWorkload struct { + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Priority indicates the workload's priority + Priority int32 `json:"priority"` +} + +// +k8s:openapi-gen=true +// +kubebuilder:object:root=true + +// RunningWorkloadsSummary contains a list of running workloads in the context +// of the query (within LocalQueue or ClusterQueue). +type RunningWorkloadsSummary struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Items []RunningWorkload `json:"items"` +} + +// +kubebuilder:object:root=true +type RunningWorkloadsSummaryList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []RunningWorkloadsSummary `json:"items"` +} + // +k8s:openapi-gen=true // +kubebuilder:object:root=true @@ -117,5 +148,6 @@ func init() { SchemeBuilder.Register( &PendingWorkloadsSummary{}, &PendingWorkloadOptions{}, + &RunningWorkloadsSummary{}, ) } diff --git a/apis/visibility/v1alpha1/zz_generated.deepcopy.go b/apis/visibility/v1alpha1/zz_generated.deepcopy.go index 30ecbe25ab..b57c1ad0ee 100644 --- a/apis/visibility/v1alpha1/zz_generated.deepcopy.go +++ b/apis/visibility/v1alpha1/zz_generated.deepcopy.go @@ -29,7 +29,8 @@ func (in *ClusterQueue) DeepCopyInto(out *ClusterQueue) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.Summary.DeepCopyInto(&out.Summary) + in.PendingWorkloadsSummary.DeepCopyInto(&out.PendingWorkloadsSummary) + in.RunningWorkloadsSummary.DeepCopyInto(&out.RunningWorkloadsSummary) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueue. @@ -243,3 +244,83 @@ func (in *PendingWorkloadsSummaryList) DeepCopyObject() runtime.Object { } return nil } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkload) DeepCopyInto(out *RunningWorkload) { + *out = *in + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkload. +func (in *RunningWorkload) DeepCopy() *RunningWorkload { + if in == nil { + return nil + } + out := new(RunningWorkload) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkloadsSummary) DeepCopyInto(out *RunningWorkloadsSummary) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RunningWorkload, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkloadsSummary. +func (in *RunningWorkloadsSummary) DeepCopy() *RunningWorkloadsSummary { + if in == nil { + return nil + } + out := new(RunningWorkloadsSummary) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RunningWorkloadsSummary) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkloadsSummaryList) DeepCopyInto(out *RunningWorkloadsSummaryList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RunningWorkloadsSummary, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkloadsSummaryList. +func (in *RunningWorkloadsSummaryList) DeepCopy() *RunningWorkloadsSummaryList { + if in == nil { + return nil + } + out := new(RunningWorkloadsSummaryList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RunningWorkloadsSummaryList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 8bd03f6f73..2e4651c997 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -134,6 +134,10 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &applyconfigurationvisibilityv1alpha1.PendingWorkloadApplyConfiguration{} case visibilityv1alpha1.SchemeGroupVersion.WithKind("PendingWorkloadsSummary"): return &applyconfigurationvisibilityv1alpha1.PendingWorkloadsSummaryApplyConfiguration{} + case visibilityv1alpha1.SchemeGroupVersion.WithKind("RunningWorkload"): + return &applyconfigurationvisibilityv1alpha1.RunningWorkloadApplyConfiguration{} + case visibilityv1alpha1.SchemeGroupVersion.WithKind("RunningWorkloadsSummary"): + return &applyconfigurationvisibilityv1alpha1.RunningWorkloadsSummaryApplyConfiguration{} } return nil diff --git a/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go b/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go index dd6baa3506..b1def5ae4c 100644 --- a/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go +++ b/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go @@ -28,7 +28,8 @@ import ( type ClusterQueueApplyConfiguration struct { v1.TypeMetaApplyConfiguration `json:",inline"` *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` - Summary *PendingWorkloadsSummaryApplyConfiguration `json:"pendingWorkloadsSummary,omitempty"` + PendingWorkloadsSummary *PendingWorkloadsSummaryApplyConfiguration `json:"pendingWorkloadsSummary,omitempty"` + RunningWorkloadsSummary *RunningWorkloadsSummaryApplyConfiguration `json:"runningWorkloadsSummary,omitempty"` } // ClusterQueue constructs an declarative configuration of the ClusterQueue type for use with @@ -199,10 +200,18 @@ func (b *ClusterQueueApplyConfiguration) ensureObjectMetaApplyConfigurationExist } } -// WithSummary sets the Summary field in the declarative configuration to the given value +// WithPendingWorkloadsSummary sets the PendingWorkloadsSummary field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Summary field is set to the value of the last call. -func (b *ClusterQueueApplyConfiguration) WithSummary(value *PendingWorkloadsSummaryApplyConfiguration) *ClusterQueueApplyConfiguration { - b.Summary = value +// If called multiple times, the PendingWorkloadsSummary field is set to the value of the last call. +func (b *ClusterQueueApplyConfiguration) WithPendingWorkloadsSummary(value *PendingWorkloadsSummaryApplyConfiguration) *ClusterQueueApplyConfiguration { + b.PendingWorkloadsSummary = value + return b +} + +// WithRunningWorkloadsSummary sets the RunningWorkloadsSummary field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the RunningWorkloadsSummary field is set to the value of the last call. +func (b *ClusterQueueApplyConfiguration) WithRunningWorkloadsSummary(value *RunningWorkloadsSummaryApplyConfiguration) *ClusterQueueApplyConfiguration { + b.RunningWorkloadsSummary = value return b } diff --git a/client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go b/client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go new file mode 100644 index 0000000000..ce8063d884 --- /dev/null +++ b/client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go @@ -0,0 +1,187 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" +) + +// RunningWorkloadApplyConfiguration represents an declarative configuration of the RunningWorkload type for use +// with apply. +type RunningWorkloadApplyConfiguration struct { + *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` + LocalQueueName *string `json:"localQueueName,omitempty"` +} + +// RunningWorkloadApplyConfiguration constructs an declarative configuration of the RunningWorkload type for use with +// apply. +func RunningWorkload() *RunningWorkloadApplyConfiguration { + return &RunningWorkloadApplyConfiguration{} +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithName(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Name = &value + return b +} + +// WithGenerateName sets the GenerateName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the GenerateName field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithGenerateName(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.GenerateName = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithNamespace(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Namespace = &value + return b +} + +// WithUID sets the UID field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UID field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithUID(value types.UID) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.UID = &value + return b +} + +// WithResourceVersion sets the ResourceVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ResourceVersion field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithResourceVersion(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ResourceVersion = &value + return b +} + +// WithGeneration sets the Generation field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Generation field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithGeneration(value int64) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Generation = &value + return b +} + +// WithCreationTimestamp sets the CreationTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CreationTimestamp field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithCreationTimestamp(value metav1.Time) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.CreationTimestamp = &value + return b +} + +// WithDeletionTimestamp sets the DeletionTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionTimestamp field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithDeletionTimestamp(value metav1.Time) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionTimestamp = &value + return b +} + +// WithDeletionGracePeriodSeconds sets the DeletionGracePeriodSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionGracePeriodSeconds field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithDeletionGracePeriodSeconds(value int64) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionGracePeriodSeconds = &value + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *RunningWorkloadApplyConfiguration) WithLabels(entries map[string]string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + +// WithAnnotations puts the entries into the Annotations field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Annotations field, +// overwriting an existing map entries in Annotations field with the same key. +func (b *RunningWorkloadApplyConfiguration) WithAnnotations(entries map[string]string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Annotations == nil && len(entries) > 0 { + b.Annotations = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Annotations[k] = v + } + return b +} + +// WithOwnerReferences adds the given value to the OwnerReferences field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OwnerReferences field. +func (b *RunningWorkloadApplyConfiguration) WithOwnerReferences(values ...*v1.OwnerReferenceApplyConfiguration) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + if values[i] == nil { + panic("nil value passed to WithOwnerReferences") + } + b.OwnerReferences = append(b.OwnerReferences, *values[i]) + } + return b +} + +// WithFinalizers adds the given value to the Finalizers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Finalizers field. +func (b *RunningWorkloadApplyConfiguration) WithFinalizers(values ...string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + b.Finalizers = append(b.Finalizers, values[i]) + } + return b +} + +func (b *RunningWorkloadApplyConfiguration) ensureObjectMetaApplyConfigurationExists() { + if b.ObjectMetaApplyConfiguration == nil { + b.ObjectMetaApplyConfiguration = &v1.ObjectMetaApplyConfiguration{} + } +} + +// WithLocalQueueName sets the LocalQueueName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the LocalQueueName field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithLocalQueueName(value string) *RunningWorkloadApplyConfiguration { + b.LocalQueueName = &value + return b +} diff --git a/client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go b/client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go new file mode 100644 index 0000000000..756788c18a --- /dev/null +++ b/client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go @@ -0,0 +1,212 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" +) + +// RunningWorkloadsSummaryApplyConfiguration represents an declarative configuration of the RunningWorkloadsSummary type for use +// with apply. +type RunningWorkloadsSummaryApplyConfiguration struct { + v1.TypeMetaApplyConfiguration `json:",inline"` + *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` + Items []RunningWorkloadApplyConfiguration `json:"items,omitempty"` +} + +// RunningWorkloadsSummaryApplyConfiguration constructs an declarative configuration of the RunningWorkloadsSummary type for use with +// apply. +func RunningWorkloadsSummary() *RunningWorkloadsSummaryApplyConfiguration { + b := &RunningWorkloadsSummaryApplyConfiguration{} + b.WithKind("RunningWorkloadsSummary") + b.WithAPIVersion("visibility.kueue.x-k8s.io/v1alpha1") + return b +} + +// WithKind sets the Kind field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Kind field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithKind(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.Kind = &value + return b +} + +// WithAPIVersion sets the APIVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the APIVersion field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithAPIVersion(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.APIVersion = &value + return b +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithName(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Name = &value + return b +} + +// WithGenerateName sets the GenerateName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the GenerateName field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithGenerateName(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.GenerateName = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithNamespace(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Namespace = &value + return b +} + +// WithUID sets the UID field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UID field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithUID(value types.UID) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.UID = &value + return b +} + +// WithResourceVersion sets the ResourceVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ResourceVersion field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithResourceVersion(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ResourceVersion = &value + return b +} + +// WithGeneration sets the Generation field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Generation field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithGeneration(value int64) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Generation = &value + return b +} + +// WithCreationTimestamp sets the CreationTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CreationTimestamp field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithCreationTimestamp(value metav1.Time) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.CreationTimestamp = &value + return b +} + +// WithDeletionTimestamp sets the DeletionTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionTimestamp field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithDeletionTimestamp(value metav1.Time) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionTimestamp = &value + return b +} + +// WithDeletionGracePeriodSeconds sets the DeletionGracePeriodSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionGracePeriodSeconds field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithDeletionGracePeriodSeconds(value int64) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionGracePeriodSeconds = &value + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithLabels(entries map[string]string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + +// WithAnnotations puts the entries into the Annotations field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Annotations field, +// overwriting an existing map entries in Annotations field with the same key. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithAnnotations(entries map[string]string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Annotations == nil && len(entries) > 0 { + b.Annotations = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Annotations[k] = v + } + return b +} + +// WithOwnerReferences adds the given value to the OwnerReferences field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OwnerReferences field. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithOwnerReferences(values ...*v1.OwnerReferenceApplyConfiguration) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + if values[i] == nil { + panic("nil value passed to WithOwnerReferences") + } + b.OwnerReferences = append(b.OwnerReferences, *values[i]) + } + return b +} + +// WithFinalizers adds the given value to the Finalizers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Finalizers field. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithFinalizers(values ...string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + b.Finalizers = append(b.Finalizers, values[i]) + } + return b +} + +func (b *RunningWorkloadsSummaryApplyConfiguration) ensureObjectMetaApplyConfigurationExists() { + if b.ObjectMetaApplyConfiguration == nil { + b.ObjectMetaApplyConfiguration = &v1.ObjectMetaApplyConfiguration{} + } +} + +// WithItems adds the given value to the Items field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Items field. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithItems(values ...*RunningWorkloadApplyConfiguration) *RunningWorkloadsSummaryApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithItems") + } + b.Items = append(b.Items, *values[i]) + } + return b +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go index a4b6fcbf52..ec6698ad94 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go @@ -50,6 +50,7 @@ type ClusterQueueInterface interface { Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.ClusterQueue, err error) Apply(ctx context.Context, clusterQueue *visibilityv1alpha1.ClusterQueueApplyConfiguration, opts v1.ApplyOptions) (result *v1alpha1.ClusterQueue, err error) GetPendingWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (*v1alpha1.PendingWorkloadsSummary, error) + GetRunningWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (*v1alpha1.RunningWorkloadsSummary, error) ClusterQueueExpansion } @@ -209,3 +210,16 @@ func (c *clusterQueues) GetPendingWorkloadsSummary(ctx context.Context, clusterQ Into(result) return } + +// GetRunningWorkloadsSummary takes name of the clusterQueue, and returns the corresponding v1alpha1.RunningWorkloadsSummary object, and an error if there is any. +func (c *clusterQueues) GetRunningWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (result *v1alpha1.RunningWorkloadsSummary, err error) { + result = &v1alpha1.RunningWorkloadsSummary{} + err = c.client.Get(). + Resource("clusterqueues"). + Name(clusterQueueName). + SubResource("runningWorkloads"). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go index d0cc7fa7d7..8956ce734a 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go @@ -152,3 +152,13 @@ func (c *FakeClusterQueues) GetPendingWorkloadsSummary(ctx context.Context, clus } return obj.(*v1alpha1.PendingWorkloadsSummary), err } + +// GetRunningWorkloadsSummary takes name of the clusterQueue, and returns the corresponding runningWorkloadsSummary object, and an error if there is any. +func (c *FakeClusterQueues) GetRunningWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (result *v1alpha1.RunningWorkloadsSummary, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetSubresourceAction(clusterqueuesResource, "runningWorkloads", clusterQueueName), &v1alpha1.RunningWorkloadsSummary{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RunningWorkloadsSummary), err +} diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 4b0bd26bdd..739acdc9db 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -180,7 +180,7 @@ func main() { }() if features.Enabled(features.VisibilityOnDemand) { - go visibility.CreateAndStartVisibilityServer(queues, ctx) + go visibility.CreateAndStartVisibilityServer(queues, cCache, ctx) } setupScheduler(mgr, cCache, queues, &cfg) diff --git a/keps/xxxx-list-admitted-workloads/README.md b/keps/xxxx-list-admitted-workloads/README.md new file mode 100644 index 0000000000..e8877809c0 --- /dev/null +++ b/keps/xxxx-list-admitted-workloads/README.md @@ -0,0 +1,100 @@ +# KEP-1834: List Admitted And Not Finished Workloads + + +- [Summary](#summary) +- [Motivation](#motivation) + - [Goals](#goals) + - [Non-Goals](#non-goals) +- [Proposal](#proposal) + - [Risks and Mitigations](#risks-and-mitigations) +- [Design Details](#design-details) + - [Test Plan](#test-plan) + - [Prerequisite testing updates](#prerequisite-testing-updates) + - [Unit Tests](#unit-tests) + - [Integration tests](#integration-tests) + - [Graduation Criteria](#graduation-criteria) +- [Implementation History](#implementation-history) +- [Drawbacks](#drawbacks) +- [Alternatives](#alternatives) + + +## Summary + +Add a new visibility endpoint in Kueue for querying the list of Workloads that are still "running". + +## Motivation + +Jsonpath support in kubectl is limited, and we can not filter resources by condition in this way. By adding a new +visibility endpoint, users can list running workloads by `kubectl get`. + +### Goals + +* Support list workloads that are admitted and not finished. + +### Non-Goals + + +## Proposal + +We will add a new visibility endpoint in Kueue. Users can list the running workloads by using +``` bash +kubectl get --raw "/apis/visibility.kueue.x-k8s.io/v1alpha1/clusterqueues/cluster-queue/runningworkloads" +``` +and +``` bash +kubectl get --raw "/apis/visibility.kueue.x-k8s.io/v1alpha1/namespaces/default/localqueues/user-queue/runningworkloads" +``` + +We will show priority and localqueue information in response. Like this: +``` +{"kind":"RunningWorkloadsSummary","apiVersion":"visibility.kueue.x-k8s.io/v1alpha1","metadata":{"creationTimestamp":null},"items":[{"metadata":{"name":"job-sample-job-jz228-ef938","namespace":"default","creationTimestamp":"2024-05-06T02:15:26Z","ownerReferences":[{"apiVersion":"batch/v1","kind":"Job","name":"sample-job-jz228","uid":"2de8a359-4c95-4159-b677-0279066149b6"}]},"priority":0,"localQueue":"lqA"}]} +``` + +### Risks and Mitigations + +None. + +## Design Details + + +### Test Plan + + + +[x] I/we understand the owners of the involved components may require updates to +existing tests to make this code solid enough prior to committing the changes necessary +to implement this enhancement. + +##### Prerequisite testing updates + + + +#### Unit Tests + +New unit tests should be added testing the functionality for jobs and pods. + +#### Integration tests + +The idea is to enhance the existing integrations tests to check if workload objects are created with correct labels. + +### Graduation Criteria + +## Implementation History + +* 2024-04-29 First draft + +## Drawbacks + +## Alternatives diff --git a/keps/xxxx-list-admitted-workloads/kep.yaml b/keps/xxxx-list-admitted-workloads/kep.yaml new file mode 100644 index 0000000000..f9a9385da6 --- /dev/null +++ b/keps/xxxx-list-admitted-workloads/kep.yaml @@ -0,0 +1,22 @@ +title: List Admitted And Not Finished Workloads +kep-number: 1834 +authors: + - "@kunwuluan" +status: draft +creation-date: 2024-04-29 +reviewers: + - "@alculquicondor" +approvers: + +# The target maturity stage in the current dev cycle for this KEP. +stage: beta + +# The most recent milestone for which work toward delivery of this KEP has been +# done. This can be the current (upcoming) milestone, if it is being actively +# worked on. +latest-milestone: "v0.9" + +# The milestone at which this feature was, or is targeted to be, at each stage. +milestone: + alpha: "v0.9" + beta: "v0.9" diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index a6ef5912e0..9312e012ce 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -23,6 +23,8 @@ import ( "sort" "sync" + apierr "k8s.io/apimachinery/pkg/api/errors" + "github.com/go-logr/logr" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,9 +34,11 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/metrics" + utilpriority "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -657,6 +661,41 @@ func filterLocalQueueUsage(orig FlavorResourceQuantities, resourceGroups []Resou return qFlvUsages } +func workloadOrderingFunc(a, b *workload.Info) bool { + p1 := utilpriority.Priority(a.Obj) + p2 := utilpriority.Priority(b.Obj) + + if p1 != p2 { + return p1 > p2 + } + + return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp) +} + +func (c *Cache) RunningWorkload(name string) ([]*workload.Info, error) { + c.RLock() + defer c.RUnlock() + cq, ok := c.clusterQueues[name] + if !ok { + return []*workload.Info{}, apierr.NewNotFound(v1alpha1.Resource("clusterqueue"), name) + } + + count := 0 + wkls := make([]*workload.Info, 0, len(cq.Workloads)) + for k, wkl := range cq.Workloads { + if _, ok := c.assumedWorkloads[k]; ok { + continue + } + wkls = append(wkls, wkl) + count++ + } + wkls = wkls[:count] + sort.Slice(wkls, func(i, j int) bool { + return workloadOrderingFunc(wkls[i], wkls[j]) + }) + return wkls, nil +} + func (c *Cache) cleanupAssumedState(w *kueue.Workload) { k := workload.Key(w) assumedCQName, assumed := c.assumedWorkloads[k] diff --git a/pkg/visibility/api/install.go b/pkg/visibility/api/install.go index 48f8a9b761..f738960cbe 100644 --- a/pkg/visibility/api/install.go +++ b/pkg/visibility/api/install.go @@ -24,6 +24,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" v1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" apirest "sigs.k8s.io/kueue/pkg/visibility/api/rest" ) @@ -43,9 +44,10 @@ func init() { } // Install installs API scheme defined in apis/v1alpha1 and registers storage -func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager) error { +func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager, cCache *cache.Cache) error { apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v1alpha1.GroupVersion.Group, Scheme, ParameterCodec, Codecs) pendingWorkloadsInCqREST := apirest.NewPendingWorkloadsInCqREST(kueueMgr) + runningWorkloadsInCqREST := apirest.NewRunningWorkloadsInCqREST(cCache) cqREST := apirest.NewCqREST() pendingWorkloadsInLqREST := apirest.NewPendingWorkloadsInLqREST(kueueMgr) lqREST := apirest.NewLqREST() @@ -53,6 +55,7 @@ func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager) visibilityServerResources := map[string]rest.Storage{ "clusterqueues": cqREST, "clusterqueues/pendingworkloads": pendingWorkloadsInCqREST, + "clusterqueues/runningworkloads": runningWorkloadsInCqREST, "localqueues": lqREST, "localqueues/pendingworkloads": pendingWorkloadsInLqREST, } diff --git a/pkg/visibility/api/rest/running_workloads_cq.go b/pkg/visibility/api/rest/running_workloads_cq.go new file mode 100644 index 0000000000..c8e4efce80 --- /dev/null +++ b/pkg/visibility/api/rest/running_workloads_cq.go @@ -0,0 +1,99 @@ +// Copyright 2023 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/rest" + ctrl "sigs.k8s.io/controller-runtime" + + "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + + _ "k8s.io/metrics/pkg/apis/metrics/install" +) + +type runningWorkloadsInCqREST struct { + c *cache.Cache + log logr.Logger +} + +var _ rest.Storage = &runningWorkloadsInCqREST{} +var _ rest.GetterWithOptions = &runningWorkloadsInCqREST{} +var _ rest.Scoper = &runningWorkloadsInCqREST{} + +func NewRunningWorkloadsInCqREST(c *cache.Cache) *runningWorkloadsInCqREST { + return &runningWorkloadsInCqREST{ + c: c, + log: ctrl.Log.WithName("running-workload-in-cq"), + } +} + +// New implements rest.Storage interface +func (m *runningWorkloadsInCqREST) New() runtime.Object { + return &v1alpha1.PendingWorkloadsSummary{} +} + +// Destroy implements rest.Storage interface +func (m *runningWorkloadsInCqREST) Destroy() {} + +var ( + debugLog = ctrl.Log.WithName("visibility-server") +) + +// Get implements rest.GetterWithOptions interface +// It fetches information about pending workloads and returns according to query params +func (m *runningWorkloadsInCqREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) { + debugLog.V(1).Info("got a request") + pendingWorkloadOpts, ok := opts.(*v1alpha1.PendingWorkloadOptions) + if !ok { + return nil, fmt.Errorf("invalid options object: %#v", opts) + } + limit := pendingWorkloadOpts.Limit + offset := pendingWorkloadOpts.Offset + + wls := make([]v1alpha1.RunningWorkload, 0, limit) + runningWorkloadsInfo, err := m.c.RunningWorkload(name) + debugLog.V(1).Info("got runningWorkloads", "len", len(runningWorkloadsInfo)) + if err != nil { + return nil, err + } + + for index := int(offset); index < int(offset+limit) && index < len(runningWorkloadsInfo); index++ { + // Update positions in LocalQueue + wlInfo := runningWorkloadsInfo[index] + wls = append(wls, *newRunningWorkload(wlInfo)) + } + debugLog.V(1).Info("response", "len", len(wls)) + return &v1alpha1.RunningWorkloadsSummary{Items: wls}, nil +} + +// NewGetOptions creates a new options object +func (m *runningWorkloadsInCqREST) NewGetOptions() (runtime.Object, bool, string) { + // If no query parameters were passed the generated defaults function are not executed so it's necessary to set default values here as well + return &v1alpha1.PendingWorkloadOptions{ + Limit: constants.DefaultPendingWorkloadsLimit, + }, false, "" +} + +// NamespaceScoped implements rest.Scoper interface +func (m *runningWorkloadsInCqREST) NamespaceScoped() bool { + return false +} diff --git a/pkg/visibility/api/rest/running_workloads_cq_test.go b/pkg/visibility/api/rest/running_workloads_cq_test.go new file mode 100644 index 0000000000..76183141b7 --- /dev/null +++ b/pkg/visibility/api/rest/running_workloads_cq_test.go @@ -0,0 +1,434 @@ +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + corev1 "k8s.io/api/core/v1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +func TestRunningWorkloadsInCQ(t *testing.T) { + const ( + nsName = "foo" + cqNameA = "cqA" + cqNameB = "cqB" + lqNameA = "lqA" + lqNameB = "lqB" + flavorName = "flavor" + lowPrio = 50 + highPrio = 100 + ) + + var ( + defaultQueryParams = &visibility.PendingWorkloadOptions{ + Offset: 0, + Limit: constants.DefaultPendingWorkloadsLimit, + } + ) + + var ( + f1 = utiltesting.MakeResourceFlavor("flavor1").Obj() + q1 = utiltesting.MakeFlavorQuotas("flavor1").Resource("cpu", "10").Obj() + ) + + podSets := []kueue.PodSet{ + *utiltesting.MakePodSet("driver", 1). + Request(corev1.ResourceCPU, "1"). + Obj(), + } + podSetFlavors := []kueue.PodSetAssignment{ + { + Name: "driver", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "flavor1", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + } + + var ( + adA = utiltesting.MakeAdmission(cqNameA).Obj() + ) + + scheme := runtime.NewScheme() + if err := kueue.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %s", err) + } + if err := visibility.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %s", err) + } + + now := time.Now() + cases := map[string]struct { + clusterQueues []*kueue.ClusterQueue + queues []*kueue.LocalQueue + flavors []*kueue.ResourceFlavor + workloads []*kueue.Workload + req *runningReq + wantResp *runningResp + wantErrMatch func(error) bool + }{ + "single ClusterQueue and single LocalQueue setup with two workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsName).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Creation(now).Admission(adA).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(lowPrio).Creation(now).Admission(adA).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "a", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: lowPrio, + }}, + }, + }, + "single ClusterQueue and two LocalQueue setup with four workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsName).ClusterQueue(cqNameA).Obj(), + utiltesting.MakeLocalQueue(lqNameB, nsName).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("lqA-high-prio", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Creation(now).Admission(adA).Obj(), + utiltesting.MakeWorkload("lqA-low-prio", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(lowPrio).Creation(now).Admission(adA).Obj(), + utiltesting.MakeWorkload("lqB-high-prio", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Admission(adA).Obj(), + utiltesting.MakeWorkload("lqB-low-prio", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Admission(adA).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-high-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-high-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-low-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: lowPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-low-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: lowPrio, + }}, + }, + }, + "limit query parameter set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsName).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("c", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now.Add(time.Second * 2)).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: &visibility.PendingWorkloadOptions{ + Limit: 2, + }, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "a", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }}, + }, + }, + "offset query parameter set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsName).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("c", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now.Add(time.Second * 2)).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: &visibility.PendingWorkloadOptions{ + Offset: 1, + Limit: constants.DefaultPendingWorkloadsLimit, + }, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "c", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second * 2)), + }, + Priority: highPrio, + }}, + }, + }, + "limit offset query parameters set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue(lqNameA, nsName).ClusterQueue(cqNameA).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("c", nsName).PodSets(podSets...).ReserveQuota(&kueue.Admission{ + ClusterQueue: cqNameA, + PodSetAssignments: podSetFlavors, + }).Queue(lqNameA).Priority(highPrio).Admission(adA).Creation(now.Add(time.Second * 2)).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: &visibility.PendingWorkloadOptions{ + Offset: 1, + Limit: 1, + }, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }}, + }, + }, + "empty cluster queue": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{}, + }, + "nonexistent queue name": { + req: &runningReq{ + queueName: "nonexistent-queue", + queryParams: defaultQueryParams, + }, + flavors: []*kueue.ResourceFlavor{ + f1, + }, + wantResp: &runningResp{ + wantErr: errors.NewNotFound(visibility.Resource("clusterqueue"), "nonexistent-queue"), + }, + wantErrMatch: errors.IsNotFound, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + client := utiltesting.NewFakeClient() + cCache := cache.New(client, cache.WithPodsReadyTracking(false)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + runningWorkloadsInCqREST := NewRunningWorkloadsInCqREST(cCache) + for _, cq := range tc.clusterQueues { + if err := cCache.AddClusterQueue(ctx, cq); err != nil { + t.Fatalf("Adding cluster queue %s: %v", cq.Name, err) + } + } + for _, q := range tc.queues { + if err := cCache.AddLocalQueue(q); err != nil { + t.Fatalf("Adding queue %q: %v", q.Name, err) + } + } + for _, w := range tc.workloads { + cCache.AddOrUpdateWorkload(w) + } + + info, err := runningWorkloadsInCqREST.Get(ctx, tc.req.queueName, tc.req.queryParams) + if tc.wantErrMatch != nil { + if !tc.wantErrMatch(err) { + t.Errorf("Error differs: (-want,+got):\n%s", cmp.Diff(tc.wantResp.wantErr.Error(), err.Error())) + } + } else if err != nil { + t.Error(err) + } else { + runningWorkloadsInfo := info.(*visibility.RunningWorkloadsSummary) + less := func(a, b visibility.RunningWorkload) bool { + p1 := a.Priority + p2 := b.Priority + + if p1 != p2 { + return p1 > p2 + } + + return a.CreationTimestamp.Before(&b.CreationTimestamp) + } + sort.Slice(tc.wantResp.wantRunningWorkloads, func(i, j int) bool { + return less(tc.wantResp.wantRunningWorkloads[i], tc.wantResp.wantRunningWorkloads[j]) + }) + if diff := cmp.Diff(tc.wantResp.wantRunningWorkloads, runningWorkloadsInfo.Items, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Pending workloads differ: (-want,+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/visibility/api/rest/test_utils.go b/pkg/visibility/api/rest/test_utils.go index 4a69b1fe93..3888739be6 100644 --- a/pkg/visibility/api/rest/test_utils.go +++ b/pkg/visibility/api/rest/test_utils.go @@ -28,3 +28,14 @@ type resp struct { wantErr error wantPendingWorkloads []visibility.PendingWorkload } + +type runningReq struct { + nsName string + queueName string + queryParams *visibility.PendingWorkloadOptions +} + +type runningResp struct { + wantErr error + wantRunningWorkloads []visibility.RunningWorkload +} diff --git a/pkg/visibility/api/rest/utils.go b/pkg/visibility/api/rest/utils.go index 81b73a7c10..ea271c3a06 100644 --- a/pkg/visibility/api/rest/utils.go +++ b/pkg/visibility/api/rest/utils.go @@ -44,3 +44,24 @@ func newPendingWorkload(wlInfo *workload.Info, positionInLq int32, positionInCq PositionInLocalQueue: positionInLq, } } + +func newRunningWorkload(wlInfo *workload.Info) *v1alpha1.RunningWorkload { + ownerReferences := make([]metav1.OwnerReference, 0, len(wlInfo.Obj.OwnerReferences)) + for _, ref := range wlInfo.Obj.OwnerReferences { + ownerReferences = append(ownerReferences, metav1.OwnerReference{ + APIVersion: ref.APIVersion, + Kind: ref.Kind, + Name: ref.Name, + UID: ref.UID, + }) + } + return &v1alpha1.RunningWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: wlInfo.Obj.Name, + Namespace: wlInfo.Obj.Namespace, + OwnerReferences: ownerReferences, + CreationTimestamp: wlInfo.Obj.CreationTimestamp, + }, + Priority: *wlInfo.Obj.Spec.Priority, + } +} diff --git a/pkg/visibility/server.go b/pkg/visibility/server.go index 59b21b9003..a5e3e8ed77 100644 --- a/pkg/visibility/server.go +++ b/pkg/visibility/server.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/kueue/apis/visibility/v1alpha1" generatedopenapi "sigs.k8s.io/kueue/apis/visibility/v1alpha1/openapi" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/visibility/api" @@ -46,7 +47,7 @@ type server struct { // +kubebuilder:rbac:groups=flowcontrol.apiserver.k8s.io,resources=flowschemas/status,verbs=patch // CreateAndStartVisibilityServer creates visibility server injecting KueueManager and starts it -func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, ctx context.Context) { +func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, cCache *cache.Cache, ctx context.Context) { config := newVisibilityServerConfig() if err := applyVisibilityServerOptions(config); err != nil { setupLog.Error(err, "Unable to apply VisibilityServerOptions") @@ -57,7 +58,7 @@ func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, ctx context.Context setupLog.Error(err, "Unable to create visibility server") } - if err := api.Install(visibilityServer, kueueMgr); err != nil { + if err := api.Install(visibilityServer, kueueMgr, cCache); err != nil { setupLog.Error(err, "Unable to install visibility.kueue.x-k8s.io/v1alpha1 API") }