Skip to content

Commit

Permalink
Ensure pprof endpoint is listening on startup (elastic#17028)
Browse files Browse the repository at this point in the history
pprof HTTP endpoint was being initialized asynchronously. In tests we
have found that sometimes after Metricbeat has started, it fails to
connect to the pprof endpoint. This can happen if the server is not
listening before the rest of the beat is initialized.

This change ensures that the listener for this endpoint is started
before continuing with the rest of the initialization.

Some comments and error messages have been polished.

(cherry picked from commit efdab6f)
  • Loading branch information
jsoriano committed Mar 18, 2020
1 parent 94ad401 commit ef5c782
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `NewContainerMetadataEnricher` to use default config for kubernetes module. {pull}16857[16857]
- Improve some logging messages for add_kubernetes_metadata processor {pull}16866{16866}
- Fix k8s metadata issue regarding node labels not shown up on root level of metadata. {pull}16834[16834]
- Fail to start if httpprof is used and it cannot be initialized. {pull}17028[17028]
- Fix concurrency issues in convert processor when used in the global context. {pull}17032[17032]

*Auditbeat*

Expand Down
66 changes: 42 additions & 24 deletions libbeat/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"expvar"
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -41,6 +41,7 @@ import (
// the service shut downs gracefully.
func HandleSignals(stopFunction func(), cancel context.CancelFunc) {
var callback sync.Once
logger := logp.NewLogger("service")

// On termination signals, gracefully stop the Beat
sigc := make(chan os.Signal, 1)
Expand All @@ -50,9 +51,9 @@ func HandleSignals(stopFunction func(), cancel context.CancelFunc) {

switch sig {
case syscall.SIGINT, syscall.SIGTERM:
logp.Debug("service", "Received sigterm/sigint, stopping")
logger.Debug("Received sigterm/sigint, stopping")
case syscall.SIGHUP:
logp.Debug("service", "Received sighup, stopping")
logger.Debug("Received sighup, stopping")
}

cancel()
Expand All @@ -61,7 +62,7 @@ func HandleSignals(stopFunction func(), cancel context.CancelFunc) {

// Handle the Windows service events
go ProcessWindowsControlEvents(func() {
logp.Debug("service", "Received svc stop/shutdown request")
logger.Debug("Received svc stop/shutdown request")
callback.Do(stopFunction)
})
}
Expand All @@ -87,34 +88,46 @@ func withCPUProfile() bool { return *cpuprofile != "" }
// BeforeRun takes care of necessary actions such as creating files
// before the beat should run.
func BeforeRun() {
logger := logp.NewLogger("service")
if withCPUProfile() {
cpuOut, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
logger.Errorf("Failed to create CPU profile: %v", err)
os.Exit(1)
}
pprof.StartCPUProfile(cpuOut)
}

if *httpprof != "" {
logp.Info("start pprof endpoint")
go func() {
mux := http.NewServeMux()
if *httpprof == "" {
return
}

// register pprof handler
mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) {
http.DefaultServeMux.ServeHTTP(w, r)
})
logger.Info("Start pprof endpoint")
mux := http.NewServeMux()

// register metrics handler
mux.HandleFunc("/debug/vars", metricsHandler)
// Register pprof handler
mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) {
http.DefaultServeMux.ServeHTTP(w, r)
})

endpoint := http.ListenAndServe(*httpprof, mux)
logp.Info("finished pprof endpoint: %v", endpoint)
}()
// Register metrics handler
mux.HandleFunc("/debug/vars", metricsHandler)

// Ensure we are listening before returning
listener, err := net.Listen("tcp", *httpprof)
if err != nil {
logger.Errorf("Failed to start pprof listener: %v", err)
os.Exit(1)
}

go func() {
// Serve returns always a non-nil error
err := http.Serve(listener, mux)
logger.Infof("Finished pprof endpoint: %v", err)
}()
}

// report expvar and all libbeat/monitoring metrics
// metricsHandler reports expvar and all libbeat/monitoring metrics
func metricsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")

Expand Down Expand Up @@ -142,6 +155,7 @@ func metricsHandler(w http.ResponseWriter, r *http.Request) {
// Cleanup handles cleaning up the runtime and OS environments. This includes
// tasks such as stopping the CPU profile if it is running.
func Cleanup() {
logger := logp.NewLogger("service")
if withCPUProfile() {
pprof.StopCPUProfile()
cpuOut.Close()
Expand All @@ -152,25 +166,29 @@ func Cleanup() {

writeHeapProfile(*memprofile)

debugMemStats()
debugMemStats(logger.Named("mem"))
}
}

func debugMemStats() {
func debugMemStats(logger *logp.Logger) {
if !logger.IsDebug() {
return
}
var m runtime.MemStats
runtime.ReadMemStats(&m)
logp.Debug("mem", "Memory stats: In use: %d Total (even if freed): %d System: %d",
logger.Debug("Memory stats: In use: %d Total (even if freed): %d System: %d",
m.Alloc, m.TotalAlloc, m.Sys)
}

func writeHeapProfile(filename string) {
logger := logp.NewLogger("service")
f, err := os.Create(filename)
if err != nil {
logp.Err("Failed creating file %s: %s", filename, err)
logger.Errorf("Failed creating file %s: %s", filename, err)
return
}
pprof.WriteHeapProfile(f)
f.Close()

logp.Info("Created memory profile file %s.", filename)
logger.Infof("Created memory profile file %s.", filename)
}

0 comments on commit ef5c782

Please sign in to comment.