Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vpa-admission-controller: Wire contexts #6899

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions vertical-pod-autoscaler/pkg/admission-controller/logic/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ limitations under the License.
package logic

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"k8s.io/api/admission/v1"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod"
Expand Down Expand Up @@ -56,12 +57,12 @@ func (s *AdmissionServer) RegisterResourceHandler(resourceHandler resource.Handl
s.resourceHandlers[resourceHandler.GroupResource()] = resourceHandler
}

func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_admission.AdmissionStatus, metrics_admission.AdmissionResource) {
func (s *AdmissionServer) admit(ctx context.Context, data []byte) (*admissionv1.AdmissionResponse, metrics_admission.AdmissionStatus, metrics_admission.AdmissionResource) {
// we don't block the admission by default, even on unparsable JSON
response := v1.AdmissionResponse{}
response := admissionv1.AdmissionResponse{}
response.Allowed = true

ar := v1.AdmissionReview{}
ar := admissionv1.AdmissionReview{}
if err := json.Unmarshal(data, &ar); err != nil {
klog.Error(err)
return &response, metrics_admission.Error, metrics_admission.Unknown
Expand All @@ -80,7 +81,7 @@ func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_adm

handler, ok := s.resourceHandlers[admittedGroupResource]
if ok {
patches, err = handler.GetPatches(ar.Request)
patches, err = handler.GetPatches(ctx, ar.Request)
resource = handler.AdmissionResource()

if handler.DisallowIncorrectObjects() && err != nil {
Expand All @@ -106,7 +107,7 @@ func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_adm
klog.Errorf("Cannot marshal the patch %v: %v", patches, err)
return &response, metrics_admission.Error, resource
}
patchType := v1.PatchTypeJSONPatch
patchType := admissionv1.PatchTypeJSONPatch
response.PatchType = &patchType
response.Patch = patch
klog.V(4).Infof("Sending patches: %v", patches)
Expand All @@ -127,6 +128,8 @@ func (s *AdmissionServer) admit(data []byte) (*v1.AdmissionResponse, metrics_adm

// Serve is a handler function of AdmissionServer
func (s *AdmissionServer) Serve(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

executionTimer := metrics_admission.NewExecutionTimer()
defer executionTimer.ObserveTotal()
admissionLatency := metrics_admission.NewAdmissionLatency()
Expand All @@ -146,8 +149,8 @@ func (s *AdmissionServer) Serve(w http.ResponseWriter, r *http.Request) {
}
executionTimer.ObserveStep("read_request")

reviewResponse, status, resource := s.admit(body)
ar := v1.AdmissionReview{
reviewResponse, status, resource := s.admit(ctx, body)
ar := admissionv1.AdmissionReview{
Response: reviewResponse,
TypeMeta: metav1.TypeMeta{
Kind: "AdmissionReview",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package resource

import (
"context"

v1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/metrics/admission"
Expand All @@ -38,5 +40,5 @@ type Handler interface {
// DisallowIncorrectObjects returns whether incorrect objects (eg. unparsable, not passing validations) should be disallowed by Admission Server.
DisallowIncorrectObjects() bool
// GetPatches returns patches for given AdmissionRequest
GetPatches(*v1.AdmissionRequest) ([]PatchRecord, error)
GetPatches(context.Context, *v1.AdmissionRequest) ([]PatchRecord, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package pod

import (
"context"
"encoding/json"
"fmt"

admissionv1 "k8s.io/api/admission/v1"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
resource_admission "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/admission-controller/resource/pod/patch"
Expand Down Expand Up @@ -63,12 +64,12 @@ func (h *resourceHandler) DisallowIncorrectObjects() bool {
}

// GetPatches builds patches for Pod in given admission request.
func (h *resourceHandler) GetPatches(ar *admissionv1.AdmissionRequest) ([]resource_admission.PatchRecord, error) {
func (h *resourceHandler) GetPatches(ctx context.Context, ar *admissionv1.AdmissionRequest) ([]resource_admission.PatchRecord, error) {
if ar.Resource.Version != "v1" {
return nil, fmt.Errorf("only v1 Pods are supported")
}
raw, namespace := ar.Object.Raw, ar.Namespace
pod := v1.Pod{}
pod := corev1.Pod{}
if err := json.Unmarshal(raw, &pod); err != nil {
return nil, err
}
Expand All @@ -77,7 +78,7 @@ func (h *resourceHandler) GetPatches(ar *admissionv1.AdmissionRequest) ([]resour
pod.Namespace = namespace
}
klog.V(4).Infof("Admitting pod %s", klog.KObj(&pod))
controllingVpa := h.vpaMatcher.GetMatchingVPA(&pod)
controllingVpa := h.vpaMatcher.GetMatchingVPA(ctx, &pod)
if controllingVpa == nil {
klog.V(4).Infof("No matching VPA found for pod %s", klog.KObj(&pod))
return []resource_admission.PatchRecord{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pod

import (
"context"
"fmt"
"testing"

Expand All @@ -43,7 +44,7 @@ type fakeVpaMatcher struct {
vpa *vpa_types.VerticalPodAutoscaler
}

func (m *fakeVpaMatcher) GetMatchingVPA(_ *apiv1.Pod) *vpa_types.VerticalPodAutoscaler {
func (m *fakeVpaMatcher) GetMatchingVPA(_ context.Context, _ *apiv1.Pod) *vpa_types.VerticalPodAutoscaler {
return m.vpa
}

Expand Down Expand Up @@ -176,7 +177,7 @@ func TestGetPatches(t *testing.T) {
fppp := &fakePodPreProcessor{tc.podPreProcessorError}
fvm := &fakeVpaMatcher{vpa: tc.vpa}
h := NewResourceHandler(fppp, fvm, tc.calculators)
patches, err := h.GetPatches(&admissionv1.AdmissionRequest{
patches, err := h.GetPatches(context.Background(), &admissionv1.AdmissionRequest{
Resource: v1.GroupVersionResource{
Version: "v1",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vpa

import (
"context"
"encoding/json"
"fmt"

Expand Down Expand Up @@ -71,7 +72,7 @@ func (h *resourceHandler) DisallowIncorrectObjects() bool {
}

// GetPatches builds patches for VPA in given admission request.
func (h *resourceHandler) GetPatches(ar *v1.AdmissionRequest) ([]resource.PatchRecord, error) {
func (h *resourceHandler) GetPatches(_ context.Context, ar *v1.AdmissionRequest) ([]resource.PatchRecord, error) {
raw, isCreate := ar.Object.Raw, ar.Operation == v1.Create
vpa, err := parseVPA(raw)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package vpa

import (
"context"

core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
Expand All @@ -31,7 +33,7 @@ import (
// Matcher is capable of returning a single matching VPA object
// for a pod. Will return nil if no matching object is found.
type Matcher interface {
GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler
GetMatchingVPA(ctx context.Context, pod *core.Pod) *vpa_types.VerticalPodAutoscaler
}

type matcher struct {
Expand All @@ -49,7 +51,7 @@ func NewMatcher(vpaLister vpa_lister.VerticalPodAutoscalerLister,
controllerFetcher: controllerFetcher}
}

func (m *matcher) GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler {
func (m *matcher) GetMatchingVPA(ctx context.Context, pod *core.Pod) *vpa_types.VerticalPodAutoscaler {
configs, err := m.vpaLister.VerticalPodAutoscalers(pod.Namespace).List(labels.Everything())
if err != nil {
klog.Errorf("failed to get vpa configs: %v", err)
Expand All @@ -60,7 +62,7 @@ func (m *matcher) GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler
if vpa_api_util.GetUpdateMode(vpaConfig) == vpa_types.UpdateModeOff {
continue
}
selector, err := m.selectorFetcher.Fetch(vpaConfig)
selector, err := m.selectorFetcher.Fetch(ctx, vpaConfig)
if err != nil {
klog.V(3).Infof("skipping VPA object %s because we cannot fetch selector: %s", klog.KObj(vpaConfig), err)
continue
Expand All @@ -71,7 +73,7 @@ func (m *matcher) GetMatchingVPA(pod *core.Pod) *vpa_types.VerticalPodAutoscaler
})
}
klog.V(2).Infof("Let's choose from %d configs for pod %s", len(onConfigs), klog.KObj(pod))
result := vpa_api_util.GetControllingVPAForPod(pod, onConfigs, m.controllerFetcher)
result := vpa_api_util.GetControllingVPAForPod(ctx, pod, onConfigs, m.controllerFetcher)
if result != nil {
return result.Vpa
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vpa

import (
"context"
"testing"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -148,7 +149,7 @@ func TestGetMatchingVpa(t *testing.T) {
// The hierarchy part is being test in the "TestControllerFetcher" test.
matcher := NewMatcher(vpaLister, mockSelectorFetcher, controllerfetcher.FakeControllerFetcher{})

vpa := matcher.GetMatchingVPA(tc.pod)
vpa := matcher.GetMatchingVPA(context.Background(), tc.pod)
if tc.expectedFound && assert.NotNil(t, vpa) {
assert.Equal(t, tc.expectedVpaName, vpa.Name)
} else {
Expand Down
20 changes: 10 additions & 10 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ClusterStateFeeder interface {
InitFromCheckpoints()

// LoadVPAs updates clusterState with current state of VPAs.
LoadVPAs()
LoadVPAs(ctx context.Context)

// LoadPods updates clusterState with current specification of Pods and their Containers.
LoadPods()
Expand Down Expand Up @@ -243,7 +243,7 @@ func (feeder *clusterStateFeeder) setVpaCheckpoint(checkpoint *vpa_types.Vertica

func (feeder *clusterStateFeeder) InitFromCheckpoints() {
klog.V(3).Info("Initializing VPA from checkpoints")
feeder.LoadVPAs()
feeder.LoadVPAs(context.TODO())

namespaces := make(map[string]bool)
for _, v := range feeder.clusterState.Vpas {
Expand All @@ -270,7 +270,7 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints() {

func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
klog.V(3).Info("Starting garbage collection of checkpoints")
feeder.LoadVPAs()
feeder.LoadVPAs(context.TODO())

namespaceList, err := feeder.coreClient.Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down Expand Up @@ -338,7 +338,7 @@ func filterVPAs(feeder *clusterStateFeeder, allVpaCRDs []*vpa_types.VerticalPodA
}

// LoadVPAs fetches VPA objects and loads them into the cluster state.
func (feeder *clusterStateFeeder) LoadVPAs() {
func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
// List VPA API objects.
allVpaCRDs, err := feeder.vpaLister.List(labels.Everything())
if err != nil {
Expand All @@ -358,7 +358,7 @@ func (feeder *clusterStateFeeder) LoadVPAs() {
VpaName: vpaCRD.Name,
}

selector, conditions := feeder.getSelector(vpaCRD)
selector, conditions := feeder.getSelector(ctx, vpaCRD)
klog.V(4).Infof("Using selector %s for VPA %s", selector.String(), klog.KObj(vpaCRD))

if feeder.clusterState.AddOrUpdateVpa(vpaCRD, selector) == nil {
Expand Down Expand Up @@ -486,7 +486,7 @@ type condition struct {
message string
}

func (feeder *clusterStateFeeder) validateTargetRef(vpa *vpa_types.VerticalPodAutoscaler) (bool, condition) {
func (feeder *clusterStateFeeder) validateTargetRef(ctx context.Context, vpa *vpa_types.VerticalPodAutoscaler) (bool, condition) {
//
if vpa.Spec.TargetRef == nil {
return false, condition{}
Expand All @@ -499,7 +499,7 @@ func (feeder *clusterStateFeeder) validateTargetRef(vpa *vpa_types.VerticalPodAu
},
ApiVersion: vpa.Spec.TargetRef.APIVersion,
}
top, err := feeder.controllerFetcher.FindTopMostWellKnownOrScalable(&k)
top, err := feeder.controllerFetcher.FindTopMostWellKnownOrScalable(ctx, &k)
if err != nil {
return false, condition{conditionType: vpa_types.ConfigUnsupported, delete: false, message: fmt.Sprintf("Error checking if target is a topmost well-known or scalable controller: %s", err)}
}
Expand All @@ -512,10 +512,10 @@ func (feeder *clusterStateFeeder) validateTargetRef(vpa *vpa_types.VerticalPodAu
return true, condition{}
}

func (feeder *clusterStateFeeder) getSelector(vpa *vpa_types.VerticalPodAutoscaler) (labels.Selector, []condition) {
selector, fetchErr := feeder.selectorFetcher.Fetch(vpa)
func (feeder *clusterStateFeeder) getSelector(ctx context.Context, vpa *vpa_types.VerticalPodAutoscaler) (labels.Selector, []condition) {
selector, fetchErr := feeder.selectorFetcher.Fetch(ctx, vpa)
if selector != nil {
validTargetRef, unsupportedCondition := feeder.validateTargetRef(vpa)
validTargetRef, unsupportedCondition := feeder.validateTargetRef(ctx, vpa)
if !validTargetRef {
return labels.Nothing(), []condition{
unsupportedCondition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package input

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -32,7 +33,7 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/history"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/input/spec"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
controllerfetcher "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/controller_fetcher"
target_mock "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/target/mock"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
)
Expand All @@ -42,7 +43,7 @@ type fakeControllerFetcher struct {
err error
}

func (f *fakeControllerFetcher) FindTopMostWellKnownOrScalable(_ *controllerfetcher.ControllerKeyWithAPIVersion) (*controllerfetcher.ControllerKeyWithAPIVersion, error) {
func (f *fakeControllerFetcher) FindTopMostWellKnownOrScalable(_ context.Context, _ *controllerfetcher.ControllerKeyWithAPIVersion) (*controllerfetcher.ControllerKeyWithAPIVersion, error) {
return f.key, f.err
}

Expand Down Expand Up @@ -315,7 +316,7 @@ func TestLoadPods(t *testing.T) {
if tc.expectedVpaFetch {
targetSelectorFetcher.EXPECT().Fetch(vpa).Return(tc.selector, tc.fetchSelectorError)
}
clusterStateFeeder.LoadVPAs()
clusterStateFeeder.LoadVPAs(context.Background())

vpaID := model.VpaID{
Namespace: vpa.Namespace,
Expand Down
Loading
Loading