diff --git a/pkg/config/v1alpha1/types.go b/pkg/config/v1alpha1/types.go index 1af58a0348..3cbc607bfe 100644 --- a/pkg/config/v1alpha1/types.go +++ b/pkg/config/v1alpha1/types.go @@ -68,6 +68,10 @@ type ControllerManagerConfigurationSpec struct { // +optional Health ControllerHealth `json:"health,omitempty"` + // Profiling contains the controller profiling configuration + // +optional + Profiling ControllerProfiling `json:"profiling,omitempty"` + // Webhook contains the controllers webhook configuration // +optional Webhook ControllerWebhook `json:"webhook,omitempty"` @@ -122,6 +126,17 @@ type ControllerHealth struct { LivenessEndpointName string `json:"livenessEndpointName,omitempty"` } +// ControllerProfiling defines profiling configuration for controllers. +type ControllerProfiling struct { + // PprofBindAddress is the TCP address that the controller should bind to + // for serving pprof. + // It can be set to "" or "0" to disable the pprof serving. + // Since pprof may contain sensitive information, make sure to protect it + // before exposing it to public. + // +optional + PprofBindAddress string `json:"pprofBindAddress,omitempty"` +} + // ControllerWebhook defines the webhook server for the controller. type ControllerWebhook struct { // Port is the port that the webhook server serves at. diff --git a/pkg/config/v1alpha1/zz_generated.deepcopy.go b/pkg/config/v1alpha1/zz_generated.deepcopy.go index 5329bef667..0fd9eab8f2 100644 --- a/pkg/config/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/config/v1alpha1/zz_generated.deepcopy.go @@ -104,6 +104,7 @@ func (in *ControllerManagerConfigurationSpec) DeepCopyInto(out *ControllerManage } out.Metrics = in.Metrics out.Health = in.Health + out.Profiling = in.Profiling in.Webhook.DeepCopyInto(&out.Webhook) } @@ -132,6 +133,21 @@ func (in *ControllerMetrics) DeepCopy() *ControllerMetrics { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerProfiling) DeepCopyInto(out *ControllerProfiling) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerProfiling. +func (in *ControllerProfiling) DeepCopy() *ControllerProfiling { + if in == nil { + return nil + } + out := new(ControllerProfiling) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ControllerWebhook) DeepCopyInto(out *ControllerWebhook) { *out = *in diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 073d252718..ca597e3e91 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -23,6 +23,7 @@ import ( "fmt" "net" "net/http" + "net/http/pprof" "sync" "sync/atomic" "time" @@ -107,6 +108,9 @@ type controllerManager struct { // Healthz probe handler healthzHandler *healthz.Handler + // pprofListener is used to serve pprof + pprofListener net.Listener + // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -362,6 +366,24 @@ func (cm *controllerManager) serveHealthProbes() { go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener) } +func (cm *controllerManager) addPprofServer() error { + mux := http.NewServeMux() + srv := httpserver.New(mux) + + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + return cm.Add(&server{ + Kind: "pprof", + Log: cm.logger, + Server: srv, + Listener: cm.pprofListener, + }) +} + func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) { log = log.WithValues("kind", kind, "addr", ln.Addr()) @@ -457,6 +479,13 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { cm.serveHealthProbes() } + // Add pprof server + if cm.pprofListener != nil { + if err := cm.addPprofServer(); err != nil { + return fmt.Errorf("failed to add pprof server: %w", err) + } + } + // First start any webhook servers, which includes conversion, validation, and defaulting // webhooks that are registered. // diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2facb1c915..f00626d1d0 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -235,6 +235,13 @@ type Options struct { // Liveness probe endpoint name, defaults to "healthz" LivenessEndpointName string + // PprofBindAddress is the TCP address that the controller should bind to + // for serving pprof. + // It can be set to "" or "0" to disable the pprof serving. + // Since pprof may contain sensitive information, make sure to protect it + // before exposing it to public. + PprofBindAddress string + // Port is the port that the webhook server serves at. // It is used to set webhook.Server.Port if WebhookServer is not set. Port int @@ -310,6 +317,7 @@ type Options struct { newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) newMetricsListener func(addr string) (net.Listener, error) newHealthProbeListener func(addr string) (net.Listener, error) + newPprofListener func(addr string) (net.Listener, error) } // BaseContextFunc is a function used to provide a base Context to Runnables @@ -419,6 +427,13 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + // Create pprof listener. This will throw an error if the bind + // address is invalid or already in use. + pprofListener, err := options.newPprofListener(options.PprofBindAddress) + if err != nil { + return nil, fmt.Errorf("failed to new pprof listener: %w", err) + } + errChan := make(chan error) runnables := newRunnables(options.BaseContext, errChan) @@ -446,6 +461,7 @@ func New(config *rest.Config, options Options) (Manager, error) { healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, + pprofListener: pprofListener, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), @@ -495,6 +511,10 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options, o.LivenessEndpointName = newObj.Health.LivenessEndpointName } + if o.PprofBindAddress == "" && newObj.Profiling.PprofBindAddress != "" { + o.PprofBindAddress = newObj.Profiling.PprofBindAddress + } + if o.Port == 0 && newObj.Webhook.Port != nil { o.Port = *newObj.Webhook.Port } @@ -579,6 +599,19 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) { return ln, nil } +// defaultPprofListener creates the default pprof listener bound to the given address. +func defaultPprofListener(addr string) (net.Listener, error) { + if addr == "" || addr == "0" { + return nil, nil + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("error listening on %s: %w", addr, err) + } + return ln, nil +} + // defaultBaseContext is used as the BaseContext value in Options if one // has not already been set. func defaultBaseContext() context.Context { @@ -639,6 +672,10 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.newPprofListener == nil { + options.newPprofListener = defaultPprofListener + } + if options.GracefulShutdownTimeout == nil { gracefulShutdownTimeout := defaultGracefulShutdownPeriod options.GracefulShutdownTimeout = &gracefulShutdownTimeout diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 6b01d48293..a811fb7e95 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -148,6 +148,9 @@ var _ = Describe("manger.Manager", func() { ReadinessEndpointName: "/readyz", LivenessEndpointName: "/livez", }, + Profiling: v1alpha1.ControllerProfiling{ + PprofBindAddress: ":6080", + }, Webhook: v1alpha1.ControllerWebhook{ Port: &port, Host: "localhost", @@ -172,6 +175,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.HealthProbeBindAddress).To(Equal("6060")) Expect(m.ReadinessEndpointName).To(Equal("/readyz")) Expect(m.LivenessEndpointName).To(Equal("/livez")) + Expect(m.PprofBindAddress).To(Equal(":6080")) Expect(m.Port).To(Equal(port)) Expect(m.Host).To(Equal("localhost")) Expect(m.CertDir).To(Equal("/certs")) @@ -204,6 +208,9 @@ var _ = Describe("manger.Manager", func() { ReadinessEndpointName: "/readyz", LivenessEndpointName: "/livez", }, + Profiling: v1alpha1.ControllerProfiling{ + PprofBindAddress: ":6080", + }, Webhook: v1alpha1.ControllerWebhook{ Port: &port, Host: "localhost", @@ -229,6 +236,7 @@ var _ = Describe("manger.Manager", func() { HealthProbeBindAddress: "5000", ReadinessEndpointName: "/readiness", LivenessEndpointName: "/liveness", + PprofBindAddress: ":6000", Port: 8080, Host: "example.com", CertDir: "/pki", @@ -249,6 +257,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.HealthProbeBindAddress).To(Equal("5000")) Expect(m.ReadinessEndpointName).To(Equal("/readiness")) Expect(m.LivenessEndpointName).To(Equal("/liveness")) + Expect(m.PprofBindAddress).To(Equal(":6000")) Expect(m.Port).To(Equal(8080)) Expect(m.Host).To(Equal("example.com")) Expect(m.CertDir).To(Equal("/pki")) @@ -288,6 +297,7 @@ var _ = Describe("manger.Manager", func() { LeaderElectionID: "test-leader-election-id-2", HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).To(BeNil()) @@ -333,6 +343,7 @@ var _ = Describe("manger.Manager", func() { LeaderElectionID: "test-leader-election-id-3", HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).To(BeNil()) @@ -367,6 +378,7 @@ var _ = Describe("manger.Manager", func() { }, HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).ToNot(HaveOccurred()) Expect(m1).ToNot(BeNil()) @@ -387,6 +399,7 @@ var _ = Describe("manger.Manager", func() { }, HealthProbeBindAddress: "0", MetricsBindAddress: "0", + PprofBindAddress: "0", }) Expect(err).ToNot(HaveOccurred()) Expect(m2).ToNot(BeNil()) @@ -1420,6 +1433,94 @@ var _ = Describe("manger.Manager", func() { }) }) + Context("should start serving pprof", func() { + var listener net.Listener + var opts Options + + BeforeEach(func() { + listener = nil + opts = Options{ + newPprofListener: func(addr string) (net.Listener, error) { + var err error + listener, err = defaultPprofListener(addr) + return listener, err + }, + } + }) + + AfterEach(func() { + if listener != nil { + listener.Close() + } + }) + + It("should stop serving pprof when stop is called", func() { + opts.PprofBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + // Check the pprof started + endpoint := fmt.Sprintf("http://%s", listener.Addr().String()) + _, err = http.Get(endpoint) + Expect(err).NotTo(HaveOccurred()) + + // Shutdown the server + cancel() + + // Expect the pprof server to shutdown + Eventually(func() error { + _, err = http.Get(endpoint) + return err + }).ShouldNot(Succeed()) + }) + + It("should serve pprof endpoints", func() { + opts.PprofBindAddress = ":0" + m, err := New(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) + }() + <-m.Elected() + + pprofIndexEndpoint := fmt.Sprintf("http://%s/debug/pprof/", listener.Addr().String()) + resp, err := http.Get(pprofIndexEndpoint) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(200)) + + pprofCmdlineEndpoint := fmt.Sprintf("http://%s/debug/pprof/cmdline", listener.Addr().String()) + resp, err = http.Get(pprofCmdlineEndpoint) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(200)) + + pprofProfileEndpoint := fmt.Sprintf("http://%s/debug/pprof/profile", listener.Addr().String()) + resp, err = http.Get(pprofProfileEndpoint) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(200)) + + pprofSymbolEndpoint := fmt.Sprintf("http://%s/debug/pprof/symbol", listener.Addr().String()) + resp, err = http.Get(pprofSymbolEndpoint) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(200)) + + pprofTraceEndpoint := fmt.Sprintf("http://%s/debug/pprof/trace", listener.Addr().String()) + resp, err = http.Get(pprofTraceEndpoint) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.StatusCode).To(Equal(200)) + }) + }) + Describe("Add", func() { It("should immediately start the Component if the Manager has already Started another Component", func() { diff --git a/pkg/manager/server.go b/pkg/manager/server.go new file mode 100644 index 0000000000..b6509f48f2 --- /dev/null +++ b/pkg/manager/server.go @@ -0,0 +1,61 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "context" + "errors" + "net" + "net/http" + + "github.com/go-logr/logr" +) + +// server is a general purpose HTTP server Runnable for a manager +// to serve some internal handlers such as health probes, metrics and profiling. +type server struct { + Kind string + Log logr.Logger + Server *http.Server + Listener net.Listener +} + +func (s *server) Start(ctx context.Context) error { + log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr()) + + serverShutdown := make(chan struct{}) + go func() { + <-ctx.Done() + log.Info("shutting down server") + if err := s.Server.Shutdown(context.Background()); err != nil { + log.Error(err, "error shutting down server") + } + close(serverShutdown) + }() + + log.Info("starting server") + if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + + <-serverShutdown + return nil +} + +func (s *server) NeedLeaderElection() bool { + return false +}