Skip to content

Commit

Permalink
[WIP] Configure VPA for reconciler, if enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
karlkfi committed Jun 16, 2023
1 parent 98a5161 commit d3c8734
Show file tree
Hide file tree
Showing 17 changed files with 1,361 additions and 32 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
k8s.io/api v0.25.9
k8s.io/apiextensions-apiserver v0.25.9
k8s.io/apimachinery v0.25.9
k8s.io/autoscaler/vertical-pod-autoscaler v0.13.0
k8s.io/cli-runtime v0.25.9
k8s.io/client-go v0.25.9
k8s.io/cluster-registry v0.0.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ k8s.io/apimachinery v0.25.9 h1:MPjgTz4dbAKJ/KiHIvDeYkFfIn7ueihqvT520HkV7v4=
k8s.io/apimachinery v0.25.9/go.mod h1:ZTl0drTQaFi5gMM3snYI5tWV1XJmRH1gfnDx2QCLsxk=
k8s.io/apiserver v0.25.9 h1:1tuxeA28SnoK30bhOa48c6tOCQypcJJYlsGE8BJpUko=
k8s.io/apiserver v0.25.9/go.mod h1:FHU743u4KKL79IpiQU/d8MiwA+JdHX26vfhG7gBJSYo=
k8s.io/autoscaler/vertical-pod-autoscaler v0.13.0 h1:pH6AsxeBZcyX6KBqcnl7SPIJqbN1d59RrEBuIE6Rq6c=
k8s.io/autoscaler/vertical-pod-autoscaler v0.13.0/go.mod h1:LraL5kR2xX7jb4VMCG6/tUH4I75uRHlnzC0VWQHcyWk=
k8s.io/cli-runtime v0.25.9 h1:XNJ82pj8ior9TqO22OGQ0Di6e1p/wXm7IJyqM8iq8I4=
k8s.io/cli-runtime v0.25.9/go.mod h1:Xl7L+X3Uxb8J9rfzICvIi9EP4CYYXXtMdaWSsdcFRHo=
k8s.io/client-go v0.25.9 h1:U0S3nc71NRfHXiA0utyCkPt3Mv1SWpQw0g5VfBCv5xg=
Expand Down
4 changes: 4 additions & 0 deletions pkg/core/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
autoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/client-go/kubernetes/scheme"
clusterregistry "k8s.io/cluster-registry/pkg/apis/clusterregistry/v1alpha1"
k8sadmissionv1 "k8s.io/kubernetes/pkg/apis/admission/v1"
Expand Down Expand Up @@ -107,6 +108,9 @@ func mustRegisterKubernetesResources() {
utilruntime.Must(k8srbacv1beta1.RegisterConversions(scheme.Scheme))

utilruntime.Must(scheme.Scheme.SetVersionPriority(rbacv1.SchemeGroupVersion, rbacv1beta1.SchemeGroupVersion))

utilruntime.Must(autoscalingv1.AddToScheme(scheme.Scheme))
// autoscaling API has no generated defaults or conversions
}

func mustRegisterAPIExtensionsResources() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kinds/kinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
rbacv1beta1 "k8s.io/api/rbac/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
autoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"kpt.dev/configsync/pkg/api/configmanagement"
v1 "kpt.dev/configsync/pkg/api/configmanagement/v1"
"kpt.dev/configsync/pkg/api/configsync"
Expand Down Expand Up @@ -263,3 +264,8 @@ func APIService() schema.GroupVersionKind {
func ValidatingWebhookConfiguration() schema.GroupVersionKind {
return admissionv1.SchemeGroupVersion.WithKind("ValidatingWebhookConfiguration")
}

// VerticalPodAutoscaler returns the VerticalPodAutoscaler kind.
func VerticalPodAutoscaler() schema.GroupVersionKind {
return autoscalingv1.SchemeGroupVersion.WithKind("VerticalPodAutoscaler")
}
8 changes: 8 additions & 0 deletions pkg/reconcilermanager/controllers/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
autoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"kpt.dev/configsync/pkg/core"
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/reconcilermanager"
Expand Down Expand Up @@ -204,3 +205,10 @@ func (r *RootSyncReconciler) deleteClusterRoleBinding(ctx context.Context, recon
}
return nil
}

func (r *reconcilerBase) deleteVerticalPodAutoscaler(ctx context.Context, reconcilerRef types.NamespacedName) error {
vpa := &autoscalingv1.VerticalPodAutoscaler{}
vpa.Name = reconcilerRef.Name
vpa.Namespace = reconcilerRef.Namespace
return r.cleanup(ctx, vpa)
}
126 changes: 101 additions & 25 deletions pkg/reconcilermanager/controllers/reconciler_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ import (

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
autoscaling "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
autoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/client-go/dynamic"
"k8s.io/utils/pointer"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
hubv1 "kpt.dev/configsync/pkg/api/hub/v1"
Expand Down Expand Up @@ -74,9 +78,9 @@ const (
logFieldResourceVersion = "resourceVersion"
)

// The fields in reconcilerManagerAllowList are the fields that reconciler manager allow
// users or other controllers to modify.
var reconcilerManagerAllowList = []string{
// The fields in reconcilerDriftAllowList are the fields that reconciler manager
// allows users or other controllers to modify on the reconciler Deployment.
var reconcilerDriftAllowList = []string{
"$.spec.template.spec.containers[*].terminationMessagePath",
"$.spec.template.spec.containers[*].terminationMessagePolicy",
"$.spec.template.spec.containers[*].*.timeoutSeconds",
Expand All @@ -93,6 +97,10 @@ var reconcilerManagerAllowList = []string{
"$.spec.progressDeadlineSeconds",
}

// reconcilerContainerResourceField is the JSONPath to the resources field,
// which is optionally allowed to drift on the reconciler Deployment.
const reconcilerContainerResourceField = "$.spec.template.spec.containers[*].resources"

// reconcilerBase provides common data and methods for the RepoSync and RootSync reconcilers
type reconcilerBase struct {
loggingController
Expand All @@ -102,7 +110,7 @@ type reconcilerBase struct {
watcher client.WithWatch // non-caching
dynamicClient dynamic.Interface
scheme *runtime.Scheme
isAutopilotCluster *bool
autopilot *bool
reconcilerPollingPeriod time.Duration
hydrationPollingPeriod time.Duration
membership *hubv1.Membership
Expand Down Expand Up @@ -158,7 +166,7 @@ func (r *reconcilerBase) upsertServiceAccount(

type mutateFn func(client.Object) error

func (r *reconcilerBase) upsertDeployment(ctx context.Context, reconcilerRef types.NamespacedName, labelMap map[string]string, mutateObject mutateFn) (*unstructured.Unstructured, controllerutil.OperationResult, error) {
func (r *reconcilerBase) upsertDeployment(ctx context.Context, reconcilerRef types.NamespacedName, labelMap map[string]string, mutateObject mutateFn, ignoredFields []string) (*unstructured.Unstructured, controllerutil.OperationResult, error) {
reconcilerDeployment := &appsv1.Deployment{}
if err := parseDeployment(reconcilerDeployment); err != nil {
return nil, controllerutil.OperationResultNone, errors.Wrap(err, "failed to parse reconciler Deployment manifest from ConfigMap")
Expand Down Expand Up @@ -190,7 +198,7 @@ func (r *reconcilerBase) upsertDeployment(ctx context.Context, reconcilerRef typ
if err := mutateObject(reconcilerDeployment); err != nil {
return nil, controllerutil.OperationResultNone, err
}
appliedObj, op, err := r.createOrPatchDeployment(ctx, reconcilerDeployment)
appliedObj, op, err := r.createOrPatchDeployment(ctx, reconcilerDeployment, ignoredFields)

if op != controllerutil.OperationResultNone {
r.logger(ctx).Info("Managed object upsert successful",
Expand All @@ -204,7 +212,7 @@ func (r *reconcilerBase) upsertDeployment(ctx context.Context, reconcilerRef typ
// createOrPatchDeployment() first call Get() on the object. If the
// object does not exist, Create() will be called. If it does exist, Patch()
// will be called.
func (r *reconcilerBase) createOrPatchDeployment(ctx context.Context, declared *appsv1.Deployment) (*unstructured.Unstructured, controllerutil.OperationResult, error) {
func (r *reconcilerBase) createOrPatchDeployment(ctx context.Context, declared *appsv1.Deployment, ignoredFields []string) (*unstructured.Unstructured, controllerutil.OperationResult, error) {
id := core.ID{
ObjectKey: client.ObjectKeyFromObject(declared),
GroupKind: kinds.Deployment().GroupKind(),
Expand Down Expand Up @@ -232,14 +240,7 @@ func (r *reconcilerBase) createOrPatchDeployment(ctx context.Context, declared *
currentGeneration := currentDeploymentUnstructured.GetGeneration()
currentUID := currentDeploymentUnstructured.GetUID()

if r.isAutopilotCluster == nil {
isAutopilot, err := util.IsGKEAutopilotCluster(r.client)
if err != nil {
return nil, controllerutil.OperationResultNone, fmt.Errorf("unable to determine if it is an Autopilot cluster: %w", err)
}
r.isAutopilotCluster = &isAutopilot
}
dep, err := compareDeploymentsToCreatePatchData(*r.isAutopilotCluster, declared, currentDeploymentUnstructured, reconcilerManagerAllowList, r.scheme)
dep, err := r.compareDeploymentsToCreatePatchData(declared, currentDeploymentUnstructured, ignoredFields)
if err != nil {
return nil, controllerutil.OperationResultNone, err
}
Expand Down Expand Up @@ -285,6 +286,29 @@ func (r *reconcilerBase) createOrPatchDeployment(ctx context.Context, declared *
return appliedObj, controllerutil.OperationResultUpdated, nil
}

func (r *reconcilerBase) reconcilerFieldsAllowedToDrift(override *v1beta1.OverrideSpec) ([]string, error) {
vpaEnabled, err := r.isVPAEnabled(override)
if err != nil {
return nil, err
}
if !vpaEnabled {
return reconcilerDriftAllowList, nil
}
return append([]string{reconcilerContainerResourceField}, reconcilerDriftAllowList...), nil
}

func (r *reconcilerBase) isAutopilot() (bool, error) {
if r.autopilot != nil {
return *r.autopilot, nil
}
autopilot, err := util.IsGKEAutopilotCluster(r.client)
if err != nil {
return false, fmt.Errorf("unable to determine if it is an Autopilot cluster: %w", err)
}
r.autopilot = &autopilot
return autopilot, nil
}

// deleteDeploymentFields delete all the fields in allowlist from unstructured object and convert the unstructured object to Deployment object
func deleteDeploymentFields(allowList []string, unstructuredDeployment *unstructured.Unstructured) (*appsv1.Deployment, error) {
for _, path := range allowList {
Expand All @@ -306,30 +330,33 @@ type deploymentProcessResult struct {
}

// compareDeploymentsToCreatePatchData checks if current deployment is same with declared deployment when ignore the fields in allowlist. If not, it creates a byte array used for PATCH later
func compareDeploymentsToCreatePatchData(isAutopilot bool, declared *appsv1.Deployment, currentDeploymentUnstructured *unstructured.Unstructured, allowList []string, scheme *runtime.Scheme) (*deploymentProcessResult, error) {
processedCurrent, err := deleteDeploymentFields(allowList, currentDeploymentUnstructured)
func (r *reconcilerBase) compareDeploymentsToCreatePatchData(declared *appsv1.Deployment, currentDeploymentUnstructured *unstructured.Unstructured, ignoredFields []string) (*deploymentProcessResult, error) {
isAutopilot, err := r.isAutopilot()
if err != nil {
return &deploymentProcessResult{}, err
return nil, err
}
processedCurrent, err := deleteDeploymentFields(ignoredFields, currentDeploymentUnstructured)
if err != nil {
return nil, err
}
adjusted, err := adjustContainerResources(isAutopilot, declared, processedCurrent)
if err != nil {
return &deploymentProcessResult{}, err
return nil, err
}

unObjDeclared, err := kinds.ToUnstructured(declared, scheme)
unObjDeclared, err := kinds.ToUnstructured(declared, r.scheme)
if err != nil {
return &deploymentProcessResult{}, err
return nil, err
}
processedDeclared, err := deleteDeploymentFields(allowList, unObjDeclared)
processedDeclared, err := deleteDeploymentFields(ignoredFields, unObjDeclared)
if err != nil {
return &deploymentProcessResult{}, err
return nil, err
}
if equality.Semantic.DeepEqual(processedCurrent.Labels, processedDeclared.Labels) && equality.Semantic.DeepEqual(processedCurrent.Spec, processedDeclared.Spec) {
return &deploymentProcessResult{true, adjusted, nil}, nil
}
data, err := json.Marshal(unObjDeclared)
if err != nil {
return &deploymentProcessResult{}, err
return nil, err
}
return &deploymentProcessResult{false, adjusted, data}, nil
}
Expand Down Expand Up @@ -613,3 +640,52 @@ func (r *reconcilerBase) setupOrTeardown(ctx context.Context, syncObj client.Obj

return nil
}

func (r *reconcilerBase) upsertVerticalPodAutoscaler(ctx context.Context, deployID core.ID, labelMap map[string]string) (client.ObjectKey, error) {
vpaRef := deployID.ObjectKey
vpa := &autoscalingv1.VerticalPodAutoscaler{}
vpa.Name = vpaRef.Name
vpa.Namespace = vpaRef.Namespace
op, err := CreateOrUpdate(ctx, r.client, vpa, func() error {
r.addLabels(vpa, labelMap)
vpa.Spec.TargetRef = &autoscaling.CrossVersionObjectReference{
// TODO: APIVersion is optional but is it useful?
Kind: deployID.Kind,
Name: deployID.Name,
}
updateMode := autoscalingv1.UpdateModeAuto
vpa.Spec.UpdatePolicy = &autoscalingv1.PodUpdatePolicy{
UpdateMode: &updateMode,
// VPA is allowed to evict the last reconciler pod,
// because there's only one replica.
MinReplicas: pointer.Int32(0),
}
return nil
})
if err != nil {
return vpaRef, err
}
if op != controllerutil.OperationResultNone {
r.logger(ctx).Info("Managed object upsert successful",
logFieldObjectRef, vpaRef.String(),
logFieldObjectKind, "VerticalPodAutoscaler",
logFieldOperation, op)
}
return vpaRef, nil
}

func (r *reconcilerBase) isVPAEnabled(override *v1beta1.OverrideSpec) (bool, error) {
if override != nil && len(override.Resources) > 0 {
// Don't use VPA if resource overrides are specified by the user
return false, nil
}
vpaGVK := kinds.VerticalPodAutoscaler()
_, err := r.client.RESTMapper().RESTMapping(vpaGVK.GroupKind(), vpaGVK.Version)
if err != nil {
if errors.Is(err, &meta.NoKindMatchError{}) {
return false, nil
}
return false, err
}
return true, nil
}
13 changes: 12 additions & 1 deletion pkg/reconcilermanager/controllers/reconciler_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controllers

import (
"context"
"encoding/json"
"fmt"
"testing"
Expand All @@ -30,6 +31,7 @@ import (
"kpt.dev/configsync/pkg/kinds"
"kpt.dev/configsync/pkg/metadata"
"kpt.dev/configsync/pkg/reconcilermanager"
syncerFake "kpt.dev/configsync/pkg/syncer/syncertest/fake"
"kpt.dev/configsync/pkg/testing/fake"
"kpt.dev/configsync/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -608,7 +610,16 @@ func TestCompareDeploymentsToCreatePatchData(t *testing.T) {
} else {
testCurrent = tc.current.DeepCopy()
}
dep, err := compareDeploymentsToCreatePatchData(tc.isAutopilot, testDeclared, testCurrent, reconcilerManagerAllowList, core.Scheme)
fakeClient := syncerFake.NewClient(t, core.Scheme)
if tc.isAutopilot {
err := fakeClient.Create(context.Background(), util.FakeAutopilotWebhookObject())
require.NoError(t, err)
}
r := &reconcilerBase{
scheme: fakeClient.Scheme(),
client: fakeClient,
}
dep, err := r.compareDeploymentsToCreatePatchData(testDeclared, testCurrent, reconcilerDriftAllowList)
require.NoError(t, err)
require.Equal(t, tc.expectedSame, dep.same)
})
Expand Down
30 changes: 27 additions & 3 deletions pkg/reconcilermanager/controllers/reposync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,18 @@ func (r *RepoSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile
containerEnvs := r.populateContainerEnvs(ctx, rs, reconcilerRef.Name)
mut := r.mutationsFor(ctx, rs, containerEnvs)

vpaEnabled, err := r.isVPAEnabled(rs.Spec.Override)
if err != nil {
return err
}

ignoredFields := reconcilerDriftAllowList
if vpaEnabled {
ignoredFields = append([]string{reconcilerContainerResourceField}, ignoredFields...)
}

// Upsert Namespace reconciler deployment.
deployObj, op, err := r.upsertDeployment(ctx, reconcilerRef, labelMap, mut)
deployObj, op, err := r.upsertDeployment(ctx, reconcilerRef, labelMap, mut, ignoredFields)
if err != nil {
return err
}
Expand All @@ -254,13 +264,23 @@ func (r *RepoSyncReconciler) upsertManagedObjects(ctx context.Context, reconcile
}
}

gvk, err := kinds.Lookup(deployObj, r.scheme)
deployGVK, err := kinds.Lookup(deployObj, r.scheme)
if err != nil {
return err
}
deployID := core.ID{
ObjectKey: reconcilerRef,
GroupKind: gvk.GroupKind(),
GroupKind: deployGVK.GroupKind(),
}

if vpaEnabled {
if _, err := r.upsertVerticalPodAutoscaler(ctx, deployID, labelMap); err != nil {
return err
}
} else {
if err := r.deleteVerticalPodAutoscaler(ctx, reconcilerRef); err != nil {
return err
}
}

result, err := kstatus.Compute(deployObj)
Expand Down Expand Up @@ -393,6 +413,10 @@ func (r *RepoSyncReconciler) handleReconcileError(ctx context.Context, err error
func (r *RepoSyncReconciler) deleteManagedObjects(ctx context.Context, reconcilerRef, rsRef types.NamespacedName) error {
r.logger(ctx).Info("Deleting managed objects")

if err := r.deleteVerticalPodAutoscaler(ctx, reconcilerRef); err != nil {
return err
}

if err := r.deleteDeployment(ctx, reconcilerRef); err != nil {
return err
}
Expand Down
Loading

0 comments on commit d3c8734

Please sign in to comment.