diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 67777ca260f0..b2c3822ef09c 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -27,20 +27,15 @@ import ( ) func main() { - options, manager := operator.NewOptionsWithManagerOrDie() - cloudProvider := cloudprovider.CloudProvider(awscloudprovider.NewCloudProvider(options.Ctx, cloudprovider.Options{ - ClientSet: options.Clientset, - KubeClient: options.KubeClient, - StartAsync: options.StartAsync, - })) - if hp, ok := cloudProvider.(operator.HealthCheck); ok { - utilruntime.Must(manager.AddHealthzCheck("cloud-provider", hp.LivenessProbe)) - } - cloudProvider = cloudprovidermetrics.Decorate(cloudProvider) - if err := operator.RegisterControllers(options.Ctx, - manager, - controllers.GetControllers(options, cloudProvider)..., - ).Start(options.Ctx); err != nil { + ctx, manager := operator.NewOrDie() + cloudProvider := awscloudprovider.NewCloudProvider(ctx, cloudprovider.Options{ + ClientSet: ctx.Clientset, + KubeClient: ctx.KubeClient, + StartAsync: ctx.StartAsync, + }) + utilruntime.Must(manager.AddHealthzCheck("cloud-provider", cloudProvider.LivenessProbe)) + operator.RegisterControllers(ctx, manager, controllers.GetControllers(ctx, cloudprovidermetrics.Decorate(cloudProvider))...) + if err := manager.Start(ctx); err != nil { panic(fmt.Sprintf("Unable to start manager, %s", err)) } } diff --git a/hack/docs/configuration_gen_docs.go b/hack/docs/configuration_gen_docs.go index e00958aa7c36..cc23839404ea 100644 --- a/hack/docs/configuration_gen_docs.go +++ b/hack/docs/configuration_gen_docs.go @@ -21,7 +21,7 @@ import ( "os" "strings" - "github.com/aws/karpenter/pkg/utils/options" + "github.com/aws/karpenter/pkg/operator" ) func main() { @@ -48,7 +48,7 @@ func main() { topDoc := fmt.Sprintf("%s%s\n\n", startDocSections[0], genStart) bottomDoc := fmt.Sprintf("\n%s%s", genEnd, endDocSections[1]) - opts := options.New() + opts := operator.NewOptions() envVarsBlock := "| Environment Variable | CLI Flag | Description |\n" envVarsBlock += "|--|--|--|\n" diff --git a/hack/docs/instancetypes_gen_docs.go b/hack/docs/instancetypes_gen_docs.go index e50cad1846a8..449778018a4a 100644 --- a/hack/docs/instancetypes_gen_docs.go +++ b/hack/docs/instancetypes_gen_docs.go @@ -35,8 +35,7 @@ import ( "github.com/aws/karpenter/pkg/cloudproviders/aws/apis/v1alpha1" awscloudprovider "github.com/aws/karpenter/pkg/cloudproviders/aws/cloudprovider" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/options" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/karpenter/pkg/utils/resources" ) @@ -52,9 +51,9 @@ func main() { os.Setenv("CLUSTER_ENDPOINT", "https://docs-gen.aws") os.Setenv("AWS_ISOLATED_VPC", "true") // disable pricing lookup - opts := options.New() + opts := operator.NewOptions() opts = opts.MustParse() - ctx := injection.WithOptions(context.Background(), *opts) + ctx := operator.WithOptions(context.Background(), *opts) cp := awscloudprovider.NewCloudProvider(ctx, cloudprovider.Options{}) provider := v1alpha1.AWS{SubnetSelector: map[string]string{ diff --git a/pkg/cloudproviders/aws/apis/v1alpha1/tags.go b/pkg/cloudproviders/aws/apis/v1alpha1/tags.go index abb052c5ba22..e2ca7ee55246 100644 --- a/pkg/cloudproviders/aws/apis/v1alpha1/tags.go +++ b/pkg/cloudproviders/aws/apis/v1alpha1/tags.go @@ -22,8 +22,9 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/samber/lo" - "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" "github.com/aws/karpenter/pkg/utils/injection" + + "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" ) func MergeTags(ctx context.Context, custom ...map[string]string) (result []*ec2.Tag) { diff --git a/pkg/cloudproviders/aws/cloudprovider/cloudprovider.go b/pkg/cloudproviders/aws/cloudprovider/cloudprovider.go index 9da693c5f8df..ae502b1c3ffa 100644 --- a/pkg/cloudproviders/aws/cloudprovider/cloudprovider.go +++ b/pkg/cloudproviders/aws/cloudprovider/cloudprovider.go @@ -41,6 +41,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/transport" "knative.dev/pkg/apis" + "knative.dev/pkg/injection" "knative.dev/pkg/logging" "knative.dev/pkg/ptr" k8sClient "sigs.k8s.io/controller-runtime/pkg/client" @@ -52,7 +53,6 @@ import ( "github.com/aws/karpenter/pkg/cloudproviders/aws/cloudprovider/amifamily" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" "github.com/aws/karpenter/pkg/utils/functional" - "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/project" ) diff --git a/pkg/cloudproviders/aws/cloudprovider/instance.go b/pkg/cloudproviders/aws/cloudprovider/instance.go index bbd65313e473..6a8665262f8b 100644 --- a/pkg/cloudproviders/aws/cloudprovider/instance.go +++ b/pkg/cloudproviders/aws/cloudprovider/instance.go @@ -37,13 +37,12 @@ import ( "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" awserrors "github.com/aws/karpenter/pkg/cloudproviders/aws/errors" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/karpenter-core/pkg/scheduling" "github.com/aws/karpenter/pkg/cloudproviders/aws/apis/v1alpha1" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" "github.com/aws/karpenter/pkg/utils/functional" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/options" "github.com/aws/karpenter/pkg/utils/resources" ) @@ -145,7 +144,7 @@ func (p *InstanceProvider) launchInstance(ctx context.Context, provider *v1alpha logging.FromContext(ctx).Warn(err.Error()) } // Create fleet - tags := v1alpha1.MergeTags(ctx, provider.Tags, map[string]string{fmt.Sprintf("kubernetes.io/cluster/%s", injection.GetOptions(ctx).ClusterName): "owned"}) + tags := v1alpha1.MergeTags(ctx, provider.Tags, map[string]string{fmt.Sprintf("kubernetes.io/cluster/%s", operator.GetOptions(ctx).ClusterName): "owned"}) createFleetInput := &ec2.CreateFleetInput{ Type: aws.String(ec2.FleetTypeInstant), Context: provider.Context, @@ -317,7 +316,7 @@ func (p *InstanceProvider) getInstance(ctx context.Context, id string) (*ec2.Ins if *instance.State.Name == ec2.InstanceStateNameTerminated { return nil, awserrors.InstanceTerminatedError{Err: fmt.Errorf("instance is in terminated state")} } - if injection.GetOptions(ctx).GetAWSNodeNameConvention() == options.ResourceName { + if operator.GetOptions(ctx).GetAWSNodeNameConvention() == operator.ResourceName { return instance, nil } if len(aws.StringValue(instance.PrivateDnsName)) == 0 { @@ -330,7 +329,7 @@ func (p *InstanceProvider) instanceToNode(ctx context.Context, instance *ec2.Ins for _, instanceType := range instanceTypes { if instanceType.Name() == aws.StringValue(instance.InstanceType) { nodeName := strings.ToLower(aws.StringValue(instance.PrivateDnsName)) - if injection.GetOptions(ctx).GetAWSNodeNameConvention() == options.ResourceName { + if operator.GetOptions(ctx).GetAWSNodeNameConvention() == operator.ResourceName { nodeName = aws.StringValue(instance.InstanceId) } diff --git a/pkg/cloudproviders/aws/cloudprovider/instancetype.go b/pkg/cloudproviders/aws/cloudprovider/instancetype.go index 3cf307f0e03f..b076f6be7ff1 100644 --- a/pkg/cloudproviders/aws/cloudprovider/instancetype.go +++ b/pkg/cloudproviders/aws/cloudprovider/instancetype.go @@ -35,7 +35,7 @@ import ( "github.com/aws/karpenter/pkg/cloudproviders/aws/apis/v1alpha1" "github.com/aws/karpenter/pkg/cloudproviders/aws/cloudprovider/amifamily" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" - "github.com/aws/karpenter/pkg/utils/injection" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/karpenter/pkg/utils/resources" ) @@ -69,8 +69,8 @@ func NewInstanceType(ctx context.Context, info *ec2.InstanceTypeInfo, kc *v1alph instanceType.maxPods = instanceType.computeMaxPods(ctx, kc) // Precompute to minimize memory/compute overhead - instanceType.resources = instanceType.computeResources(injection.GetOptions(ctx).AWSEnablePodENI) - instanceType.overhead = instanceType.computeOverhead(injection.GetOptions(ctx).VMMemoryOverhead, kc) + instanceType.resources = instanceType.computeResources(operator.GetOptions(ctx).AWSEnablePodENI) + instanceType.overhead = instanceType.computeOverhead(operator.GetOptions(ctx).VMMemoryOverhead, kc) instanceType.requirements = instanceType.computeRequirements() return instanceType } @@ -370,7 +370,7 @@ func (i *InstanceType) computeMaxPods(ctx context.Context, kc *v1alpha5.KubeletC switch { case kc != nil && kc.MaxPods != nil: mp = ptr.Int64(int64(ptr.Int32Value(kc.MaxPods))) - case !injection.GetOptions(ctx).AWSENILimitedPodDensity: + case !operator.GetOptions(ctx).AWSENILimitedPodDensity: mp = ptr.Int64(110) default: mp = ptr.Int64(i.eniLimitedPods()) diff --git a/pkg/cloudproviders/aws/cloudprovider/instancetypes.go b/pkg/cloudproviders/aws/cloudprovider/instancetypes.go index 3009df711d4f..1ab892a6e110 100644 --- a/pkg/cloudproviders/aws/cloudprovider/instancetypes.go +++ b/pkg/cloudproviders/aws/cloudprovider/instancetypes.go @@ -24,6 +24,7 @@ import ( "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" awscache "github.com/aws/karpenter/pkg/cloudproviders/aws/cache" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -37,7 +38,6 @@ import ( "github.com/aws/karpenter/pkg/cloudproviders/aws/apis/v1alpha1" "github.com/aws/karpenter/pkg/utils/functional" - "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/pretty" ) @@ -71,7 +71,7 @@ func NewInstanceTypeProvider(ctx context.Context, sess *session.Session, options NewPricingAPI(sess, *sess.Config.Region), ec2api, *sess.Config.Region, - injection.GetOptions(ctx).AWSIsolatedVPC, options.StartAsync), + operator.GetOptions(ctx).AWSIsolatedVPC, options.StartAsync), cache: cache.New(InstanceTypesAndZonesCacheTTL, CacheCleanupInterval), unavailableOfferings: unavailableOfferings, cm: pretty.NewChangeMonitor(), diff --git a/pkg/cloudproviders/aws/cloudprovider/instancetypes_test.go b/pkg/cloudproviders/aws/cloudprovider/instancetypes_test.go index 3f5f2236ee9f..ecfd5103579d 100644 --- a/pkg/cloudproviders/aws/cloudprovider/instancetypes_test.go +++ b/pkg/cloudproviders/aws/cloudprovider/instancetypes_test.go @@ -37,9 +37,9 @@ import ( "github.com/aws/karpenter/pkg/cloudproviders/aws/fake" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" "github.com/aws/karpenter/pkg/controllers/provisioning" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/karpenter/pkg/test" . "github.com/aws/karpenter/pkg/test/expectations" - "github.com/aws/karpenter/pkg/utils/injection" ) var _ = Describe("Instance Types", func() { @@ -126,7 +126,7 @@ var _ = Describe("Instance Types", func() { // ensure the pod ENI option is off optsCopy := opts optsCopy.AWSEnablePodENI = false - cancelCtx, cancelFunc := context.WithCancel(injection.WithOptions(ctx, optsCopy)) + cancelCtx, cancelFunc := context.WithCancel(operator.WithOptions(ctx, optsCopy)) // ensure the provisioner is shut down at the end of this test defer cancelFunc() @@ -228,7 +228,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner() for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", 110)) } @@ -239,7 +239,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner() for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).ToNot(BeNumerically("==", 110)) } @@ -257,7 +257,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Cpu().String()).To(Equal("2080m")) }) @@ -271,7 +271,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("21473Mi")) }) @@ -292,7 +292,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("30820Mi")) Expect(overhead.Cpu().String()).To(Equal("3")) @@ -316,7 +316,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("31220Mi")) }) @@ -336,7 +336,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("33930241639")) }) @@ -356,7 +356,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("30Gi")) }) @@ -376,7 +376,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("30770Mi")) }) @@ -396,7 +396,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("31220Mi")) }) @@ -419,7 +419,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("33930241639")) }) @@ -439,7 +439,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("30Gi")) }) @@ -463,7 +463,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("31Gi")) }) @@ -486,7 +486,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("33Gi")) }) @@ -509,7 +509,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("33071248180")) }) @@ -532,7 +532,7 @@ var _ = Describe("Instance Types", func() { }, }, }) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("33930241639")) }) @@ -542,7 +542,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner(test.ProvisionerOptions{Kubelet: &v1alpha5.KubeletConfiguration{MaxPods: ptr.Int32(10)}}) for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", 10)) } @@ -553,7 +553,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner(test.ProvisionerOptions{Kubelet: &v1alpha5.KubeletConfiguration{MaxPods: ptr.Int32(10)}}) for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", 10)) } @@ -563,7 +563,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner(test.ProvisionerOptions{Kubelet: &v1alpha5.KubeletConfiguration{PodsPerCore: ptr.Int32(1)}}) for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", ptr.Int64Value(info.VCpuInfo.DefaultVCpus))) } @@ -573,7 +573,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner(test.ProvisionerOptions{Kubelet: &v1alpha5.KubeletConfiguration{PodsPerCore: ptr.Int32(4), MaxPods: ptr.Int32(20)}}) for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", lo.Min([]int64{20, ptr.Int64Value(info.VCpuInfo.DefaultVCpus) * 4}))) } @@ -584,7 +584,7 @@ var _ = Describe("Instance Types", func() { provider.AMIFamily = &awsv1alpha1.AMIFamilyBottlerocket provisioner = test.Provisioner(test.ProvisionerOptions{Kubelet: &v1alpha5.KubeletConfiguration{PodsPerCore: ptr.Int32(1)}, Provider: provider}) for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", it.eniLimitedPods())) } @@ -595,7 +595,7 @@ var _ = Describe("Instance Types", func() { Expect(err).To(BeNil()) provisioner = test.Provisioner(test.ProvisionerOptions{Kubelet: &v1alpha5.KubeletConfiguration{PodsPerCore: ptr.Int32(0)}}) for _, info := range instanceInfo { - it := NewInstanceType(injection.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), info, provisioner.Spec.KubeletConfiguration, "", provider, nil) resources := it.Resources() Expect(resources.Pods().Value()).To(BeNumerically("==", 110)) } diff --git a/pkg/cloudproviders/aws/cloudprovider/launchtemplate.go b/pkg/cloudproviders/aws/cloudprovider/launchtemplate.go index f0f2c7f3710c..976c3fe082cf 100644 --- a/pkg/cloudproviders/aws/cloudprovider/launchtemplate.go +++ b/pkg/cloudproviders/aws/cloudprovider/launchtemplate.go @@ -40,7 +40,7 @@ import ( "github.com/aws/karpenter/pkg/cloudproviders/aws/cloudprovider/amifamily" awserrors "github.com/aws/karpenter/pkg/cloudproviders/aws/errors" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" - "github.com/aws/karpenter/pkg/utils/injection" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/karpenter/pkg/utils/pretty" ) @@ -117,9 +117,9 @@ func (p *LaunchTemplateProvider) Get(ctx context.Context, provider *v1alpha1.AWS return nil, err } resolvedLaunchTemplates, err := p.amiFamily.Resolve(ctx, provider, nodeRequest, &amifamily.Options{ - ClusterName: injection.GetOptions(ctx).ClusterName, - ClusterEndpoint: injection.GetOptions(ctx).ClusterEndpoint, - AWSENILimitedPodDensity: injection.GetOptions(ctx).AWSENILimitedPodDensity, + ClusterName: operator.GetOptions(ctx).ClusterName, + ClusterEndpoint: operator.GetOptions(ctx).ClusterEndpoint, + AWSENILimitedPodDensity: operator.GetOptions(ctx).AWSENILimitedPodDensity, InstanceProfile: instanceProfile, SecurityGroupsIDs: securityGroupsIDs, Tags: provider.Tags, @@ -259,7 +259,7 @@ func (p *LaunchTemplateProvider) Invalidate(ctx context.Context, ltName string) // hydrateCache queries for existing Launch Templates created by Karpenter for the current cluster and adds to the LT cache. // Any error during hydration will result in a panic func (p *LaunchTemplateProvider) hydrateCache(ctx context.Context) { - clusterName := injection.GetOptions(ctx).ClusterName + clusterName := operator.GetOptions(ctx).ClusterName p.logger.Debugf("Hydrating the launch template cache with tags matching \"%s: %s\"", karpenterManagedTagKey, clusterName) if err := p.ec2api.DescribeLaunchTemplatesPagesWithContext(ctx, &ec2.DescribeLaunchTemplatesInput{ Filters: []*ec2.Filter{{Name: aws.String(fmt.Sprintf("tag:%s", karpenterManagedTagKey)), Values: []*string{aws.String(clusterName)}}}, @@ -295,7 +295,7 @@ func (p *LaunchTemplateProvider) getInstanceProfile(ctx context.Context, provide if provider.InstanceProfile != nil { return aws.StringValue(provider.InstanceProfile), nil } - defaultProfile := injection.GetOptions(ctx).AWSDefaultInstanceProfile + defaultProfile := operator.GetOptions(ctx).AWSDefaultInstanceProfile if defaultProfile == "" { return "", errors.New("neither spec.provider.instanceProfile nor --aws-default-instance-profile is specified") } diff --git a/pkg/cloudproviders/aws/cloudprovider/launchtemplate_test.go b/pkg/cloudproviders/aws/cloudprovider/launchtemplate_test.go index 2080cc95cb6d..446b2cb75371 100644 --- a/pkg/cloudproviders/aws/cloudprovider/launchtemplate_test.go +++ b/pkg/cloudproviders/aws/cloudprovider/launchtemplate_test.go @@ -45,9 +45,9 @@ import ( awsv1alpha1 "github.com/aws/karpenter/pkg/cloudproviders/aws/apis/v1alpha1" "github.com/aws/karpenter/pkg/cloudproviders/aws/cloudprovider/amifamily/bootstrap" "github.com/aws/karpenter/pkg/controllers/provisioning" + "github.com/aws/karpenter/pkg/operator" "github.com/aws/karpenter/pkg/test" . "github.com/aws/karpenter/pkg/test/expectations" - "github.com/aws/karpenter/pkg/utils/injection" "github.com/aws/karpenter/pkg/utils/ptr" ) @@ -565,7 +565,7 @@ var _ = Describe("LaunchTemplates", func() { provider.AMIFamily = &awsv1alpha1.AMIFamilyAL2 instanceInfo, err := instanceTypeProvider.getInstanceTypes(ctx) Expect(err).To(BeNil()) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("1093Mi")) }) @@ -575,7 +575,7 @@ var _ = Describe("LaunchTemplates", func() { provider.AMIFamily = &awsv1alpha1.AMIFamilyAL2 instanceInfo, err := instanceTypeProvider.getInstanceTypes(ctx) Expect(err).To(BeNil()) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("1093Mi")) }) @@ -587,7 +587,7 @@ var _ = Describe("LaunchTemplates", func() { provider.AMIFamily = &awsv1alpha1.AMIFamilyBottlerocket instanceInfo, err := instanceTypeProvider.getInstanceTypes(ctx) Expect(err).To(BeNil()) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("1093Mi")) }) @@ -597,7 +597,7 @@ var _ = Describe("LaunchTemplates", func() { provider.AMIFamily = &awsv1alpha1.AMIFamilyBottlerocket instanceInfo, err := instanceTypeProvider.getInstanceTypes(ctx) Expect(err).To(BeNil()) - it := NewInstanceType(injection.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) + it := NewInstanceType(operator.WithOptions(ctx, opts), instanceInfo["m5.xlarge"], provisioner.Spec.KubeletConfiguration, "", provider, nil) overhead := it.Overhead() Expect(overhead.Memory().String()).To(Equal("1665Mi")) }) @@ -605,7 +605,7 @@ var _ = Describe("LaunchTemplates", func() { Context("User Data", func() { It("should not specify --use-max-pods=false when using ENI-based pod density", func() { opts.AWSENILimitedPodDensity = true - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) ExpectApplied(ctx, env.Client, test.Provisioner(test.ProvisionerOptions{Provider: provider})) pod := ExpectProvisioned(ctx, env.Client, controllerWithOpts, test.UnschedulablePod())[0] @@ -617,7 +617,7 @@ var _ = Describe("LaunchTemplates", func() { }) It("should specify --use-max-pods=false when not using ENI-based pod density", func() { opts.AWSENILimitedPodDensity = false - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) ExpectApplied(ctx, env.Client, test.Provisioner(test.ProvisionerOptions{Provider: provider})) pod := ExpectProvisioned(ctx, env.Client, controllerWithOpts, test.UnschedulablePod())[0] @@ -887,7 +887,7 @@ var _ = Describe("LaunchTemplates", func() { Context("Bottlerocket", func() { It("should merge in custom user data", func() { opts.AWSENILimitedPodDensity = false - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) provider.AMIFamily = &awsv1alpha1.AMIFamilyBottlerocket @@ -921,7 +921,7 @@ var _ = Describe("LaunchTemplates", func() { }) It("should bootstrap when custom user data is empty", func() { opts.AWSENILimitedPodDensity = false - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) provider.AMIFamily = &awsv1alpha1.AMIFamilyBottlerocket @@ -951,7 +951,7 @@ var _ = Describe("LaunchTemplates", func() { }) It("should not bootstrap when provider ref points to a non-existent resource", func() { opts.AWSENILimitedPodDensity = false - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) provider.AMIFamily = &awsv1alpha1.AMIFamilyBottlerocket @@ -1088,7 +1088,7 @@ var _ = Describe("LaunchTemplates", func() { Context("AL2 Custom UserData", func() { It("should merge in custom user data", func() { opts.AWSENILimitedPodDensity = false - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) content, _ := os.ReadFile("testdata/al2_userdata_input.golden") @@ -1110,7 +1110,7 @@ var _ = Describe("LaunchTemplates", func() { }) It("should handle empty custom user data", func() { opts.AWSENILimitedPodDensity = false - prov := provisioning.NewProvisioner(injection.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) + prov := provisioning.NewProvisioner(operator.WithOptions(ctx, opts), cfg, env.Client, corev1.NewForConfigOrDie(env.Config), recorder, cloudProvider, cluster) controllerWithOpts := provisioning.NewController(env.Client, prov, recorder) nodeTemplate := test.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{ UserData: nil, diff --git a/pkg/cloudproviders/aws/cloudprovider/suite_test.go b/pkg/cloudproviders/aws/cloudprovider/suite_test.go index f9885f7a79d9..283e183befcc 100644 --- a/pkg/cloudproviders/aws/cloudprovider/suite_test.go +++ b/pkg/cloudproviders/aws/cloudprovider/suite_test.go @@ -39,6 +39,7 @@ import ( awscache "github.com/aws/karpenter/pkg/cloudproviders/aws/cache" "github.com/aws/karpenter/pkg/cloudproviders/aws/cloudprovider/amifamily" "github.com/aws/karpenter/pkg/cloudproviders/common/cloudprovider" + "github.com/aws/karpenter/pkg/operator" . "github.com/aws/karpenter/pkg/test/expectations" "github.com/aws/karpenter-core/pkg/apis/provisioning/v1alpha5" @@ -48,14 +49,12 @@ import ( "github.com/aws/karpenter/pkg/controllers/provisioning" "github.com/aws/karpenter/pkg/controllers/state" "github.com/aws/karpenter/pkg/test" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/options" "github.com/aws/karpenter/pkg/utils/pretty" ) var ctx context.Context var stop context.CancelFunc -var opts options.Options +var opts operator.Options var env *test.Environment var launchTemplateCache *cache.Cache var securityGroupCache *cache.Cache @@ -79,10 +78,10 @@ var provisioner *v1alpha5.Provisioner var provider *awsv1alpha1.AWS var pricingProvider *PricingProvider -var defaultOpts = options.Options{ +var defaultOpts = operator.Options{ ClusterName: "test-cluster", ClusterEndpoint: "https://test-cluster", - AWSNodeNameConvention: string(options.IPName), + AWSNodeNameConvention: string(operator.IPName), AWSENILimitedPodDensity: true, AWSEnablePodENI: true, AWSDefaultInstanceProfile: "test-instance-profile", @@ -98,7 +97,7 @@ var _ = BeforeSuite(func() { env = test.NewEnvironment(ctx, func(e *test.Environment) { opts = defaultOpts Expect(opts.Validate()).To(Succeed(), "Failed to validate options") - ctx = injection.WithOptions(ctx, opts) + ctx = operator.WithOptions(ctx, opts) ctx, stop = context.WithCancel(ctx) launchTemplateCache = cache.New(CacheTTL, CacheCleanupInterval) internalUnavailableOfferingsCache = cache.New(awscache.UnavailableOfferingsTTL, CacheCleanupInterval) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 389b3ece7f2d..5c927d806745 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -33,22 +33,22 @@ func init() { metrics.MustRegister() // Registers cross-controller metrics } -func GetControllers(opts operator.Options, cloudProvider cloudprovider.CloudProvider) []operator.Controller { - cluster := state.NewCluster(opts.Clock, opts.Config, opts.KubeClient, cloudProvider) - provisioner := provisioning.NewProvisioner(opts.Ctx, opts.Config, opts.KubeClient, opts.Clientset.CoreV1(), opts.Recorder, cloudProvider, cluster) +func GetControllers(ctx operator.Context, cloudProvider cloudprovider.CloudProvider) []operator.Controller { + cluster := state.NewCluster(ctx.Clock, ctx.Config, ctx.KubeClient, cloudProvider) + provisioner := provisioning.NewProvisioner(ctx, ctx.Config, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.Recorder, cloudProvider, cluster) - metricsstate.StartMetricScraper(opts.Ctx, cluster) + metricsstate.StartMetricScraper(ctx, cluster) return []operator.Controller{ - provisioning.NewController(opts.KubeClient, provisioner, opts.Recorder), - state.NewNodeController(opts.KubeClient, cluster), - state.NewPodController(opts.KubeClient, cluster), - state.NewProvisionerController(opts.KubeClient, cluster), - node.NewController(opts.Clock, opts.KubeClient, cloudProvider, cluster), - termination.NewController(opts.Ctx, opts.Clock, opts.KubeClient, opts.Clientset.CoreV1(), opts.Recorder, cloudProvider), - metricspod.NewController(opts.KubeClient), - metricsprovisioner.NewController(opts.KubeClient), - counter.NewController(opts.KubeClient, cluster), - consolidation.NewController(opts.Clock, opts.KubeClient, provisioner, cloudProvider, opts.Recorder, cluster), + provisioning.NewController(ctx.KubeClient, provisioner, ctx.Recorder), + state.NewNodeController(ctx.KubeClient, cluster), + state.NewPodController(ctx.KubeClient, cluster), + state.NewProvisionerController(ctx.KubeClient, cluster), + node.NewController(ctx.Clock, ctx.KubeClient, cloudProvider, cluster), + termination.NewController(ctx, ctx.Clock, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.Recorder, cloudProvider), + metricspod.NewController(ctx.KubeClient), + metricsprovisioner.NewController(ctx.KubeClient), + counter.NewController(ctx.KubeClient, cluster), + consolidation.NewController(ctx.Clock, ctx.KubeClient, provisioner, cloudProvider, ctx.Recorder, cluster), } } diff --git a/pkg/operator/context.go b/pkg/operator/context.go new file mode 100644 index 000000000000..507d3c291aae --- /dev/null +++ b/pkg/operator/context.go @@ -0,0 +1,114 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operator + +import ( + "context" + "runtime/debug" + + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/utils/clock" + "knative.dev/pkg/configmap/informer" + knativeinjection "knative.dev/pkg/injection" + "knative.dev/pkg/logging" + "knative.dev/pkg/system" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/aws/karpenter/pkg/apis" + "github.com/aws/karpenter/pkg/config" + "github.com/aws/karpenter/pkg/events" + "github.com/aws/karpenter/pkg/utils/injection" + "github.com/aws/karpenter/pkg/utils/project" +) + +const ( + appName = "karpenter" + component = "controller" +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(apis.AddToScheme(scheme)) +} + +// Context is the root context for the operator and exposes shared components used by the entire process +type Context struct { + context.Context + Recorder events.Recorder + Config config.Config + KubeClient client.Client + Clientset *kubernetes.Clientset + Clock clock.Clock + Options *Options + StartAsync <-chan struct{} +} + +func NewOrDie() (Context, manager.Manager) { + options := NewOptions().MustParse() + + // Setup Client + restConfig := controllerruntime.GetConfigOrDie() + restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(options.KubeClientQPS), options.KubeClientBurst) + restConfig.UserAgent = appName + clientSet := kubernetes.NewForConfigOrDie(restConfig) + + // Set up logger and watch for changes to log level defined in the ConfigMap `config-logging` + cmw := informer.NewInformedWatcher(clientSet, system.Namespace()) + ctx := injection.LoggingContextOrDie(component, restConfig, cmw) + ctx = knativeinjection.WithConfig(ctx, restConfig) + ctx = WithOptions(ctx, *options) + + logging.FromContext(ctx).Infof("Initializing with version %s", project.Version) + if options.MemoryLimit > 0 { + newLimit := int64(float64(options.MemoryLimit) * 0.9) + logging.FromContext(ctx).Infof("Setting GC memory limit to %d, container limit = %d", newLimit, options.MemoryLimit) + debug.SetMemoryLimit(newLimit) + } + + cfg, err := config.New(ctx, clientSet, cmw) + if err != nil { + // this does not happen if the config map is missing or invalid, only if some other error occurs + logging.FromContext(ctx).Fatalf("unable to load config, %s", err) + } + if err := cmw.Start(ctx.Done()); err != nil { + logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err) + } + + manager := NewManagerOrDie(ctx, restConfig, options) + recorder := events.NewRecorder(manager.GetEventRecorderFor(appName)) + recorder = events.NewLoadSheddingRecorder(recorder) + recorder = events.NewDedupeRecorder(recorder) + + return Context{ + Context: ctx, + Recorder: recorder, + Config: cfg, + Clientset: clientSet, + KubeClient: manager.GetClient(), + Clock: clock.RealClock{}, + Options: options, + StartAsync: manager.Elected(), + }, manager +} diff --git a/pkg/operator/manager.go b/pkg/operator/manager.go index d3dc62241ffa..9712b1936ce1 100644 --- a/pkg/operator/manager.go +++ b/pkg/operator/manager.go @@ -21,20 +21,17 @@ import ( "net/http/pprof" "github.com/go-logr/zapr" - "go.uber.org/zap" v1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" + "knative.dev/pkg/injection" "knative.dev/pkg/logging" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/options" ) // Controller is an interface implemented by Karpenter custom resources. @@ -53,16 +50,25 @@ type HealthCheck interface { } // NewManagerOrDie instantiates a controller manager or panics -func NewManagerOrDie(ctx context.Context, config *rest.Config, opts *options.Options) manager.Manager { +func NewManagerOrDie(ctx context.Context, config *rest.Config, options *Options) manager.Manager { + logger := logging.FromContext(ctx) newManager, err := controllerruntime.NewManager(config, controllerruntime.Options{ - Logger: ignoreDebugEvents(zapr.NewLogger(logging.FromContext(ctx).Desugar())), - LeaderElection: opts.EnableLeaderElection, + Logger: ignoreDebugEvents(zapr.NewLogger(logger.Desugar())), + LeaderElection: options.EnableLeaderElection, LeaderElectionID: "karpenter-leader-election", LeaderElectionResourceLock: resourcelock.LeasesResourceLock, Scheme: scheme, - MetricsBindAddress: fmt.Sprintf(":%d", opts.MetricsPort), - HealthProbeBindAddress: fmt.Sprintf(":%d", opts.HealthProbePort), - BaseContext: newRunnableContext(config, opts, logging.FromContext(ctx)), + MetricsBindAddress: fmt.Sprintf(":%d", options.MetricsPort), + HealthProbeBindAddress: fmt.Sprintf(":%d", options.HealthProbePort), + // Controller runtime injects this base context into internal + // controllers which are unsafe to shut down require a fresh context. + BaseContext: func() context.Context { + baseCtx := context.Background() + baseCtx = logging.WithLogger(baseCtx, logger) + baseCtx = injection.WithConfig(baseCtx, config) + baseCtx = WithOptions(baseCtx, *options) + return baseCtx + }, }) if err != nil { panic(fmt.Sprintf("Failed to create controller newManager, %s", err)) @@ -72,7 +78,7 @@ func NewManagerOrDie(ctx context.Context, config *rest.Config, opts *options.Opt }); err != nil { panic(fmt.Sprintf("Failed to setup pod indexer, %s", err)) } - if opts.EnableProfiling { + if options.EnableProfiling { utilruntime.Must(registerPprof(newManager)) } @@ -80,33 +86,22 @@ func NewManagerOrDie(ctx context.Context, config *rest.Config, opts *options.Opt } // RegisterControllers registers a set of controllers to the controller manager -func RegisterControllers(ctx context.Context, m manager.Manager, controllers ...Controller) manager.Manager { +func RegisterControllers(ctx Context, mgr manager.Manager, controllers ...Controller) { for _, c := range controllers { - if err := c.Register(ctx, m); err != nil { + if err := c.Register(ctx, mgr); err != nil { panic(err) } // if the controller implements a liveness check, connect it if lp, ok := c.(HealthCheck); ok { - utilruntime.Must(m.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe)) + utilruntime.Must(mgr.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe)) } } - if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil { + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { panic(fmt.Sprintf("Failed to add health probe, %s", err)) } - if err := m.AddReadyzCheck("readyz", healthz.Ping); err != nil { + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { panic(fmt.Sprintf("Failed to add ready probe, %s", err)) } - return m -} - -func newRunnableContext(config *rest.Config, options *options.Options, logger *zap.SugaredLogger) func() context.Context { - return func() context.Context { - ctx := context.Background() - ctx = logging.WithLogger(ctx, logger) - ctx = injection.WithConfig(ctx, config) - ctx = injection.WithOptions(ctx, *options) - return ctx - } } func registerPprof(manager manager.Manager) error { diff --git a/pkg/operator/options.go b/pkg/operator/options.go index fd3f05e6e0e9..a3b58e455e0b 100644 --- a/pkg/operator/options.go +++ b/pkg/operator/options.go @@ -16,99 +16,126 @@ package operator import ( "context" - "runtime/debug" - - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/kubernetes" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/util/flowcontrol" - "k8s.io/utils/clock" - "knative.dev/pkg/configmap/informer" - "knative.dev/pkg/logging" - "knative.dev/pkg/system" - controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" - - "github.com/aws/karpenter/pkg/apis" - "github.com/aws/karpenter/pkg/config" - "github.com/aws/karpenter/pkg/events" - "github.com/aws/karpenter/pkg/utils/injection" - "github.com/aws/karpenter/pkg/utils/options" - "github.com/aws/karpenter/pkg/utils/project" -) + "errors" + "flag" + "fmt" + "net/url" + "os" -const ( - appName = "karpenter" - component = "controller" + "go.uber.org/multierr" + + "github.com/aws/karpenter/pkg/utils/env" ) -var ( - scheme = runtime.NewScheme() +type AWSNodeNameConvention string + +const ( + IPName AWSNodeNameConvention = "ip-name" + ResourceName AWSNodeNameConvention = "resource-name" ) -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(apis.AddToScheme(scheme)) +// Options for running this binary +type Options struct { + *flag.FlagSet + // Vendor Neutral + MetricsPort int + HealthProbePort int + KubeClientQPS int + KubeClientBurst int + EnableProfiling bool + EnableLeaderElection bool + MemoryLimit int64 + // AWS Specific + ClusterName string + ClusterEndpoint string + VMMemoryOverhead float64 + AWSNodeNameConvention string + AWSENILimitedPodDensity bool + AWSDefaultInstanceProfile string + AWSEnablePodENI bool + AWSIsolatedVPC bool } -// Options exposes shared components that are initialized by the startup.Initialize() call -type Options struct { - Ctx context.Context - Recorder events.Recorder - Config config.Config - KubeClient client.Client - Clientset *kubernetes.Clientset - Clock clock.Clock - Options *options.Options - StartAsync <-chan struct{} +// New creates an Options struct and registers CLI flags and environment variables to fill-in the Options struct fields +func NewOptions() *Options { + opts := &Options{} + f := flag.NewFlagSet("karpenter", flag.ContinueOnError) + opts.FlagSet = f + + // Vendor Neutral + f.IntVar(&opts.MetricsPort, "metrics-port", env.WithDefaultInt("METRICS_PORT", 8080), "The port the metric endpoint binds to for operating metrics about the controller itself") + f.IntVar(&opts.HealthProbePort, "health-probe-port", env.WithDefaultInt("HEALTH_PROBE_PORT", 8081), "The port the health probe endpoint binds to for reporting controller health") + f.IntVar(&opts.KubeClientQPS, "kube-client-qps", env.WithDefaultInt("KUBE_CLIENT_QPS", 200), "The smoothed rate of qps to kube-apiserver") + f.IntVar(&opts.KubeClientBurst, "kube-client-burst", env.WithDefaultInt("KUBE_CLIENT_BURST", 300), "The maximum allowed burst of queries to the kube-apiserver") + f.BoolVar(&opts.EnableProfiling, "enable-profiling", env.WithDefaultBool("ENABLE_PROFILING", false), "Enable the profiling on the metric endpoint") + f.BoolVar(&opts.EnableLeaderElection, "leader-elect", env.WithDefaultBool("LEADER_ELECT", true), "Start leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") + f.Int64Var(&opts.MemoryLimit, "memory-limit", env.WithDefaultInt64("MEMORY_LIMIT", -1), "Memory limit on the container running the controller. The GC soft memory limit is set to 90% of this value.") + + // AWS Specific + f.StringVar(&opts.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "The kubernetes cluster name for resource discovery") + f.StringVar(&opts.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with") + f.Float64Var(&opts.VMMemoryOverhead, "vm-memory-overhead", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types") + f.StringVar(&opts.AWSNodeNameConvention, "aws-node-name-convention", env.WithDefaultString("AWS_NODE_NAME_CONVENTION", string(IPName)), "The node naming convention used by the AWS cloud provider. DEPRECATION WARNING: this field may be deprecated at any time") + f.BoolVar(&opts.AWSENILimitedPodDensity, "aws-eni-limited-pod-density", env.WithDefaultBool("AWS_ENI_LIMITED_POD_DENSITY", true), "Indicates whether new nodes should use ENI-based pod density. DEPRECATED: Use `.spec.kubeletConfiguration.maxPods` to set pod density on a per-provisioner basis") + f.StringVar(&opts.AWSDefaultInstanceProfile, "aws-default-instance-profile", env.WithDefaultString("AWS_DEFAULT_INSTANCE_PROFILE", ""), "The default instance profile to use when provisioning nodes in AWS") + f.BoolVar(&opts.AWSEnablePodENI, "aws-enable-pod-eni", env.WithDefaultBool("AWS_ENABLE_POD_ENI", false), "If true then instances that support pod ENI will report a vpc.amazonaws.com/pod-eni resource") + f.BoolVar(&opts.AWSIsolatedVPC, "aws-isolated-vpc", env.WithDefaultBool("AWS_ISOLATED_VPC", false), "If true then assume we can't reach AWS services which don't have a VPC endpoint. This also has the effect of disabling look-ups to the AWS pricing endpoint.") + return opts } -func NewOptionsWithManagerOrDie() (Options, manager.Manager) { - opts := options.New().MustParse() - - // Setup Client - controllerRuntimeConfig := controllerruntime.GetConfigOrDie() - controllerRuntimeConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(opts.KubeClientQPS), opts.KubeClientBurst) - controllerRuntimeConfig.UserAgent = appName - clientSet := kubernetes.NewForConfigOrDie(controllerRuntimeConfig) - - // Set up logger and watch for changes to log level - cmw := informer.NewInformedWatcher(clientSet, system.Namespace()) - ctx := injection.LoggingContextOrDie(component, controllerRuntimeConfig, cmw) - ctx = injection.WithConfig(ctx, controllerRuntimeConfig) - ctx = injection.WithOptions(ctx, *opts) - - logging.FromContext(ctx).Infof("Initializing with version %s", project.Version) - if opts.MemoryLimit > 0 { - newLimit := int64(float64(opts.MemoryLimit) * 0.9) - logging.FromContext(ctx).Infof("Setting GC memory limit to %d, container limit = %d", newLimit, opts.MemoryLimit) - debug.SetMemoryLimit(newLimit) - } +// MustParse reads the user passed flags, environment variables, and default values. +// Options are valided and panics if an error is returned +func (o *Options) MustParse() *Options { + err := o.Parse(os.Args[1:]) - cfg, err := config.New(ctx, clientSet, cmw) + if errors.Is(err, flag.ErrHelp) { + os.Exit(0) + } if err != nil { - // this does not happen if the config map is missing or invalid, only if some other error occurs - logging.FromContext(ctx).Fatalf("unable to load config, %s", err) + panic(err) } - if err := cmw.Start(ctx.Done()); err != nil { - logging.FromContext(ctx).Errorf("watching configmaps, config changes won't be applied immediately, %s", err) + if err := o.Validate(); err != nil { + panic(err) } + return o +} - manager := NewManagerOrDie(ctx, controllerRuntimeConfig, opts) - recorder := events.NewRecorder(manager.GetEventRecorderFor(appName)) - recorder = events.NewLoadSheddingRecorder(recorder) - recorder = events.NewDedupeRecorder(recorder) - - return Options{ - Ctx: ctx, - Recorder: recorder, - Config: cfg, - Clientset: clientSet, - KubeClient: manager.GetClient(), - Clock: clock.RealClock{}, - Options: opts, - StartAsync: manager.Elected(), - }, manager +func (o Options) Validate() (err error) { + err = multierr.Append(err, o.validateEndpoint()) + if o.ClusterName == "" { + err = multierr.Append(err, fmt.Errorf("CLUSTER_NAME is required")) + } + awsNodeNameConvention := AWSNodeNameConvention(o.AWSNodeNameConvention) + if awsNodeNameConvention != IPName && awsNodeNameConvention != ResourceName { + err = multierr.Append(err, fmt.Errorf("aws-node-name-convention may only be either ip-name or resource-name")) + } + return err +} + +func (o Options) validateEndpoint() error { + endpoint, err := url.Parse(o.ClusterEndpoint) + // url.Parse() will accept a lot of input without error; make + // sure it's a real URL + if err != nil || !endpoint.IsAbs() || endpoint.Hostname() == "" { + return fmt.Errorf("\"%s\" not a valid CLUSTER_ENDPOINT URL", o.ClusterEndpoint) + } + return nil +} + +func (o Options) GetAWSNodeNameConvention() AWSNodeNameConvention { + return AWSNodeNameConvention(o.AWSNodeNameConvention) +} + +type optionsKey struct{} + +func WithOptions(ctx context.Context, opts Options) context.Context { + return context.WithValue(ctx, optionsKey{}, opts) +} + +func GetOptions(ctx context.Context) Options { + retval := ctx.Value(optionsKey{}) + if retval == nil { + return Options{} + } + return retval.(Options) } diff --git a/pkg/utils/injection/injection.go b/pkg/utils/injection/injection.go index 2b2f12b3077e..fb5533242e75 100644 --- a/pkg/utils/injection/injection.go +++ b/pkg/utils/injection/injection.go @@ -20,12 +20,10 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "knative.dev/pkg/configmap/informer" - knativeinjection "knative.dev/pkg/injection" + "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/logging" "knative.dev/pkg/signals" - - "github.com/aws/karpenter/pkg/utils/options" ) type resourceKey struct{} @@ -33,7 +31,7 @@ type resourceKey struct{} // LoggingContextOrDie injects a logger into the returned context. The logger is // configured by the ConfigMap `config-logging` and live updates the level. func LoggingContextOrDie(componentName string, config *rest.Config, cmw *informer.InformedWatcher) context.Context { - ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), config) + ctx, startinformers := injection.EnableInjectionOrDie(signals.NewContext(), config) logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, componentName) ctx = logging.WithLogger(ctx, logger) rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger}) @@ -54,34 +52,6 @@ func GetNamespacedName(ctx context.Context) types.NamespacedName { return retval.(types.NamespacedName) } -type optionsKey struct{} - -func WithOptions(ctx context.Context, opts options.Options) context.Context { - return context.WithValue(ctx, optionsKey{}, opts) -} - -func GetOptions(ctx context.Context) options.Options { - retval := ctx.Value(optionsKey{}) - if retval == nil { - return options.Options{} - } - return retval.(options.Options) -} - -type configKey struct{} - -func WithConfig(ctx context.Context, config *rest.Config) context.Context { - return context.WithValue(ctx, configKey{}, config) -} - -func GetConfig(ctx context.Context) *rest.Config { - retval := ctx.Value(configKey{}) - if retval == nil { - return nil - } - return retval.(*rest.Config) -} - type controllerNameKeyType struct{} var controllerNameKey = controllerNameKeyType{} diff --git a/pkg/utils/options/options.go b/pkg/utils/options/options.go deleted file mode 100644 index c72754bb4e64..000000000000 --- a/pkg/utils/options/options.go +++ /dev/null @@ -1,126 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package options - -import ( - "errors" - "flag" - "fmt" - "net/url" - "os" - - "go.uber.org/multierr" - - "github.com/aws/karpenter/pkg/utils/env" -) - -type AWSNodeNameConvention string - -const ( - IPName AWSNodeNameConvention = "ip-name" - ResourceName AWSNodeNameConvention = "resource-name" -) - -// Options for running this binary -type Options struct { - *flag.FlagSet - // Vendor Neutral - MetricsPort int - HealthProbePort int - KubeClientQPS int - KubeClientBurst int - EnableProfiling bool - EnableLeaderElection bool - MemoryLimit int64 - // AWS Specific - ClusterName string - ClusterEndpoint string - VMMemoryOverhead float64 - AWSNodeNameConvention string - AWSENILimitedPodDensity bool - AWSDefaultInstanceProfile string - AWSEnablePodENI bool - AWSIsolatedVPC bool -} - -// New creates an Options struct and registers CLI flags and environment variables to fill-in the Options struct fields -func New() *Options { - opts := &Options{} - f := flag.NewFlagSet("karpenter", flag.ContinueOnError) - opts.FlagSet = f - - // Vendor Neutral - f.IntVar(&opts.MetricsPort, "metrics-port", env.WithDefaultInt("METRICS_PORT", 8080), "The port the metric endpoint binds to for operating metrics about the controller itself") - f.IntVar(&opts.HealthProbePort, "health-probe-port", env.WithDefaultInt("HEALTH_PROBE_PORT", 8081), "The port the health probe endpoint binds to for reporting controller health") - f.IntVar(&opts.KubeClientQPS, "kube-client-qps", env.WithDefaultInt("KUBE_CLIENT_QPS", 200), "The smoothed rate of qps to kube-apiserver") - f.IntVar(&opts.KubeClientBurst, "kube-client-burst", env.WithDefaultInt("KUBE_CLIENT_BURST", 300), "The maximum allowed burst of queries to the kube-apiserver") - f.BoolVar(&opts.EnableProfiling, "enable-profiling", env.WithDefaultBool("ENABLE_PROFILING", false), "Enable the profiling on the metric endpoint") - f.BoolVar(&opts.EnableLeaderElection, "leader-elect", env.WithDefaultBool("LEADER_ELECT", true), "Start leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") - f.Int64Var(&opts.MemoryLimit, "memory-limit", env.WithDefaultInt64("MEMORY_LIMIT", -1), "Memory limit on the container running the controller. The GC soft memory limit is set to 90% of this value.") - - // AWS Specific - f.StringVar(&opts.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "The kubernetes cluster name for resource discovery") - f.StringVar(&opts.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with") - f.Float64Var(&opts.VMMemoryOverhead, "vm-memory-overhead", env.WithDefaultFloat64("VM_MEMORY_OVERHEAD", 0.075), "The VM memory overhead as a percent that will be subtracted from the total memory for all instance types") - f.StringVar(&opts.AWSNodeNameConvention, "aws-node-name-convention", env.WithDefaultString("AWS_NODE_NAME_CONVENTION", string(IPName)), "The node naming convention used by the AWS cloud provider. DEPRECATION WARNING: this field may be deprecated at any time") - f.BoolVar(&opts.AWSENILimitedPodDensity, "aws-eni-limited-pod-density", env.WithDefaultBool("AWS_ENI_LIMITED_POD_DENSITY", true), "Indicates whether new nodes should use ENI-based pod density. DEPRECATED: Use `.spec.kubeletConfiguration.maxPods` to set pod density on a per-provisioner basis") - f.StringVar(&opts.AWSDefaultInstanceProfile, "aws-default-instance-profile", env.WithDefaultString("AWS_DEFAULT_INSTANCE_PROFILE", ""), "The default instance profile to use when provisioning nodes in AWS") - f.BoolVar(&opts.AWSEnablePodENI, "aws-enable-pod-eni", env.WithDefaultBool("AWS_ENABLE_POD_ENI", false), "If true then instances that support pod ENI will report a vpc.amazonaws.com/pod-eni resource") - f.BoolVar(&opts.AWSIsolatedVPC, "aws-isolated-vpc", env.WithDefaultBool("AWS_ISOLATED_VPC", false), "If true then assume we can't reach AWS services which don't have a VPC endpoint. This also has the effect of disabling look-ups to the AWS pricing endpoint.") - return opts -} - -// MustParse reads the user passed flags, environment variables, and default values. -// Options are valided and panics if an error is returned -func (o *Options) MustParse() *Options { - err := o.Parse(os.Args[1:]) - - if errors.Is(err, flag.ErrHelp) { - os.Exit(0) - } - if err != nil { - panic(err) - } - if err := o.Validate(); err != nil { - panic(err) - } - return o -} - -func (o Options) Validate() (err error) { - err = multierr.Append(err, o.validateEndpoint()) - if o.ClusterName == "" { - err = multierr.Append(err, fmt.Errorf("CLUSTER_NAME is required")) - } - awsNodeNameConvention := AWSNodeNameConvention(o.AWSNodeNameConvention) - if awsNodeNameConvention != IPName && awsNodeNameConvention != ResourceName { - err = multierr.Append(err, fmt.Errorf("aws-node-name-convention may only be either ip-name or resource-name")) - } - return err -} - -func (o Options) validateEndpoint() error { - endpoint, err := url.Parse(o.ClusterEndpoint) - // url.Parse() will accept a lot of input without error; make - // sure it's a real URL - if err != nil || !endpoint.IsAbs() || endpoint.Hostname() == "" { - return fmt.Errorf("\"%s\" not a valid CLUSTER_ENDPOINT URL", o.ClusterEndpoint) - } - return nil -} - -func (o Options) GetAWSNodeNameConvention() AWSNodeNameConvention { - return AWSNodeNameConvention(o.AWSNodeNameConvention) -}