From ef5c7823a13044a76456d514919d0b664e6a9f4f Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 18 Mar 2020 15:00:36 +0100 Subject: [PATCH] Ensure pprof endpoint is listening on startup (#17028) pprof HTTP endpoint was being initialized asynchronously. In tests we have found that sometimes after Metricbeat has started, it fails to connect to the pprof endpoint. This can happen if the server is not listening before the rest of the beat is initialized. This change ensures that the listener for this endpoint is started before continuing with the rest of the initialization. Some comments and error messages have been polished. (cherry picked from commit efdab6f5f54ef5ba4fe37756bca5209f2f7a6ec7) --- CHANGELOG.next.asciidoc | 2 ++ libbeat/service/service.go | 66 ++++++++++++++++++++++++-------------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5f7f9d79f44..aa737cb01cd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -88,6 +88,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `NewContainerMetadataEnricher` to use default config for kubernetes module. {pull}16857[16857] - Improve some logging messages for add_kubernetes_metadata processor {pull}16866{16866} - Fix k8s metadata issue regarding node labels not shown up on root level of metadata. {pull}16834[16834] +- Fail to start if httpprof is used and it cannot be initialized. {pull}17028[17028] +- Fix concurrency issues in convert processor when used in the global context. {pull}17032[17032] *Auditbeat* diff --git a/libbeat/service/service.go b/libbeat/service/service.go index 2d88b25f782..ec6e0fca672 100644 --- a/libbeat/service/service.go +++ b/libbeat/service/service.go @@ -22,7 +22,7 @@ import ( "expvar" "flag" "fmt" - "log" + "net" "net/http" _ "net/http/pprof" "os" @@ -41,6 +41,7 @@ import ( // the service shut downs gracefully. func HandleSignals(stopFunction func(), cancel context.CancelFunc) { var callback sync.Once + logger := logp.NewLogger("service") // On termination signals, gracefully stop the Beat sigc := make(chan os.Signal, 1) @@ -50,9 +51,9 @@ func HandleSignals(stopFunction func(), cancel context.CancelFunc) { switch sig { case syscall.SIGINT, syscall.SIGTERM: - logp.Debug("service", "Received sigterm/sigint, stopping") + logger.Debug("Received sigterm/sigint, stopping") case syscall.SIGHUP: - logp.Debug("service", "Received sighup, stopping") + logger.Debug("Received sighup, stopping") } cancel() @@ -61,7 +62,7 @@ func HandleSignals(stopFunction func(), cancel context.CancelFunc) { // Handle the Windows service events go ProcessWindowsControlEvents(func() { - logp.Debug("service", "Received svc stop/shutdown request") + logger.Debug("Received svc stop/shutdown request") callback.Do(stopFunction) }) } @@ -87,34 +88,46 @@ func withCPUProfile() bool { return *cpuprofile != "" } // BeforeRun takes care of necessary actions such as creating files // before the beat should run. func BeforeRun() { + logger := logp.NewLogger("service") if withCPUProfile() { cpuOut, err := os.Create(*cpuprofile) if err != nil { - log.Fatal(err) + logger.Errorf("Failed to create CPU profile: %v", err) + os.Exit(1) } pprof.StartCPUProfile(cpuOut) } - if *httpprof != "" { - logp.Info("start pprof endpoint") - go func() { - mux := http.NewServeMux() + if *httpprof == "" { + return + } - // register pprof handler - mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { - http.DefaultServeMux.ServeHTTP(w, r) - }) + logger.Info("Start pprof endpoint") + mux := http.NewServeMux() - // register metrics handler - mux.HandleFunc("/debug/vars", metricsHandler) + // Register pprof handler + mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { + http.DefaultServeMux.ServeHTTP(w, r) + }) - endpoint := http.ListenAndServe(*httpprof, mux) - logp.Info("finished pprof endpoint: %v", endpoint) - }() + // Register metrics handler + mux.HandleFunc("/debug/vars", metricsHandler) + + // Ensure we are listening before returning + listener, err := net.Listen("tcp", *httpprof) + if err != nil { + logger.Errorf("Failed to start pprof listener: %v", err) + os.Exit(1) } + + go func() { + // Serve returns always a non-nil error + err := http.Serve(listener, mux) + logger.Infof("Finished pprof endpoint: %v", err) + }() } -// report expvar and all libbeat/monitoring metrics +// metricsHandler reports expvar and all libbeat/monitoring metrics func metricsHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") @@ -142,6 +155,7 @@ func metricsHandler(w http.ResponseWriter, r *http.Request) { // Cleanup handles cleaning up the runtime and OS environments. This includes // tasks such as stopping the CPU profile if it is running. func Cleanup() { + logger := logp.NewLogger("service") if withCPUProfile() { pprof.StopCPUProfile() cpuOut.Close() @@ -152,25 +166,29 @@ func Cleanup() { writeHeapProfile(*memprofile) - debugMemStats() + debugMemStats(logger.Named("mem")) } } -func debugMemStats() { +func debugMemStats(logger *logp.Logger) { + if !logger.IsDebug() { + return + } var m runtime.MemStats runtime.ReadMemStats(&m) - logp.Debug("mem", "Memory stats: In use: %d Total (even if freed): %d System: %d", + logger.Debug("Memory stats: In use: %d Total (even if freed): %d System: %d", m.Alloc, m.TotalAlloc, m.Sys) } func writeHeapProfile(filename string) { + logger := logp.NewLogger("service") f, err := os.Create(filename) if err != nil { - logp.Err("Failed creating file %s: %s", filename, err) + logger.Errorf("Failed creating file %s: %s", filename, err) return } pprof.WriteHeapProfile(f) f.Close() - logp.Info("Created memory profile file %s.", filename) + logger.Infof("Created memory profile file %s.", filename) }