Skip to content

Commit

Permalink
Merge pull request #1943 from zqzten/pprof
Browse files Browse the repository at this point in the history
✨ Introduce pprof server to manager
  • Loading branch information
k8s-ci-robot authored Apr 12, 2023
2 parents a33d038 + 482ed05 commit fbd6b94
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"net/http"
"net/http/pprof"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -105,6 +106,9 @@ type controllerManager struct {
// Healthz probe handler
healthzHandler *healthz.Handler

// pprofListener is used to serve pprof
pprofListener net.Listener

// controllerConfig are the global controller options.
controllerConfig config.Controller

Expand Down Expand Up @@ -326,6 +330,24 @@ func (cm *controllerManager) serveHealthProbes() {
go cm.httpServe("health probe", cm.logger, server, cm.healthProbeListener)
}

func (cm *controllerManager) addPprofServer() error {
mux := http.NewServeMux()
srv := httpserver.New(mux)

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

return cm.add(&server{
Kind: "pprof",
Log: cm.logger,
Server: srv,
Listener: cm.pprofListener,
})
}

func (cm *controllerManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
log = log.WithValues("kind", kind, "addr", ln.Addr())

Expand Down Expand Up @@ -423,6 +445,13 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
cm.serveHealthProbes()
}

// Add pprof server
if cm.pprofListener != nil {
if err := cm.addPprofServer(); err != nil {
return fmt.Errorf("failed to add pprof server: %w", err)
}
}

// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
//
Expand Down
33 changes: 33 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ type Options struct {
// Liveness probe endpoint name, defaults to "healthz"
LivenessEndpointName string

// PprofBindAddress is the TCP address that the controller should bind to
// for serving pprof.
// It can be set to "" or "0" to disable the pprof serving.
// Since pprof may contain sensitive information, make sure to protect it
// before exposing it to public.
PprofBindAddress string

// Port is the port that the webhook server serves at.
// It is used to set webhook.Server.Port if WebhookServer is not set.
//
Expand Down Expand Up @@ -347,6 +354,7 @@ type Options struct {
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newMetricsListener func(addr string) (net.Listener, error)
newHealthProbeListener func(addr string) (net.Listener, error)
newPprofListener func(addr string) (net.Listener, error)
}

// BaseContextFunc is a function used to provide a base Context to Runnables
Expand Down Expand Up @@ -458,6 +466,13 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

// Create pprof listener. This will throw an error if the bind
// address is invalid or already in use.
pprofListener, err := options.newPprofListener(options.PprofBindAddress)
if err != nil {
return nil, fmt.Errorf("failed to new pprof listener: %w", err)
}

errChan := make(chan error)
runnables := newRunnables(options.BaseContext, errChan)

Expand All @@ -481,6 +496,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
pprofListener: pprofListener,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
Expand Down Expand Up @@ -626,6 +642,19 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) {
return ln, nil
}

// defaultPprofListener creates the default pprof listener bound to the given address.
func defaultPprofListener(addr string) (net.Listener, error) {
if addr == "" || addr == "0" {
return nil, nil
}

ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error listening on %s: %w", addr, err)
}
return ln, nil
}

// defaultBaseContext is used as the BaseContext value in Options if one
// has not already been set.
func defaultBaseContext() context.Context {
Expand Down Expand Up @@ -686,6 +715,10 @@ func setOptionsDefaults(options Options) Options {
options.newHealthProbeListener = defaultHealthProbeListener
}

if options.newPprofListener == nil {
options.newPprofListener = defaultPprofListener
}

if options.GracefulShutdownTimeout == nil {
gracefulShutdownTimeout := defaultGracefulShutdownPeriod
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
Expand Down
97 changes: 97 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ var _ = Describe("manger.Manager", func() {
LeaderElectionID: "test-leader-election-id-2",
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).To(BeNil())

Expand Down Expand Up @@ -347,6 +348,7 @@ var _ = Describe("manger.Manager", func() {
LeaderElectionID: "test-leader-election-id-3",
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).To(BeNil())

Expand Down Expand Up @@ -381,6 +383,7 @@ var _ = Describe("manger.Manager", func() {
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
Expect(m1).ToNot(BeNil())
Expand All @@ -401,6 +404,7 @@ var _ = Describe("manger.Manager", func() {
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
PprofBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
Expect(m2).ToNot(BeNil())
Expand Down Expand Up @@ -1478,6 +1482,99 @@ var _ = Describe("manger.Manager", func() {
})
})

Context("should start serving pprof", func() {
var listener net.Listener
var opts Options

BeforeEach(func() {
listener = nil
opts = Options{
newPprofListener: func(addr string) (net.Listener, error) {
var err error
listener, err = defaultPprofListener(addr)
return listener, err
},
}
})

AfterEach(func() {
if listener != nil {
listener.Close()
}
})

It("should stop serving pprof when stop is called", func() {
opts.PprofBindAddress = ":0"
m, err := New(cfg, opts)
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()
<-m.Elected()

// Check the pprof started
endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
_, err = http.Get(endpoint)
Expect(err).NotTo(HaveOccurred())

// Shutdown the server
cancel()

// Expect the pprof server to shutdown
Eventually(func() error {
_, err = http.Get(endpoint)
return err
}, 10*time.Second).ShouldNot(Succeed())
})

It("should serve pprof endpoints", func() {
opts.PprofBindAddress = ":0"
m, err := New(cfg, opts)
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).NotTo(HaveOccurred())
}()
<-m.Elected()

pprofIndexEndpoint := fmt.Sprintf("http://%s/debug/pprof/", listener.Addr().String())
resp, err := http.Get(pprofIndexEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofCmdlineEndpoint := fmt.Sprintf("http://%s/debug/pprof/cmdline", listener.Addr().String())
resp, err = http.Get(pprofCmdlineEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofProfileEndpoint := fmt.Sprintf("http://%s/debug/pprof/profile", listener.Addr().String())
resp, err = http.Get(pprofProfileEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofSymbolEndpoint := fmt.Sprintf("http://%s/debug/pprof/symbol", listener.Addr().String())
resp, err = http.Get(pprofSymbolEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))

pprofTraceEndpoint := fmt.Sprintf("http://%s/debug/pprof/trace", listener.Addr().String())
resp, err = http.Get(pprofTraceEndpoint)
Expect(err).NotTo(HaveOccurred())
defer resp.Body.Close()
Expect(resp.StatusCode).To(Equal(http.StatusOK))
})
})

Describe("Add", func() {
It("should immediately start the Component if the Manager has already Started another Component",
func() {
Expand Down
61 changes: 61 additions & 0 deletions pkg/manager/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package manager

import (
"context"
"errors"
"net"
"net/http"

"github.com/go-logr/logr"
)

// server is a general purpose HTTP server Runnable for a manager
// to serve some internal handlers such as health probes, metrics and profiling.
type server struct {
Kind string
Log logr.Logger
Server *http.Server
Listener net.Listener
}

func (s *server) Start(ctx context.Context) error {
log := s.Log.WithValues("kind", s.Kind, "addr", s.Listener.Addr())

serverShutdown := make(chan struct{})
go func() {
<-ctx.Done()
log.Info("shutting down server")
if err := s.Server.Shutdown(context.Background()); err != nil {
log.Error(err, "error shutting down server")
}
close(serverShutdown)
}()

log.Info("starting server")
if err := s.Server.Serve(s.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}

<-serverShutdown
return nil
}

func (s *server) NeedLeaderElection() bool {
return false
}

0 comments on commit fbd6b94

Please sign in to comment.