diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5f7f9d79f44..aa737cb01cd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -88,6 +88,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `NewContainerMetadataEnricher` to use default config for kubernetes module. {pull}16857[16857] - Improve some logging messages for add_kubernetes_metadata processor {pull}16866{16866} - Fix k8s metadata issue regarding node labels not shown up on root level of metadata. {pull}16834[16834] +- Fail to start if httpprof is used and it cannot be initialized. {pull}17028[17028] +- Fix concurrency issues in convert processor when used in the global context. {pull}17032[17032] *Auditbeat* diff --git a/libbeat/service/service.go b/libbeat/service/service.go index 2d88b25f782..ec6e0fca672 100644 --- a/libbeat/service/service.go +++ b/libbeat/service/service.go @@ -22,7 +22,7 @@ import ( "expvar" "flag" "fmt" - "log" + "net" "net/http" _ "net/http/pprof" "os" @@ -41,6 +41,7 @@ import ( // the service shut downs gracefully. func HandleSignals(stopFunction func(), cancel context.CancelFunc) { var callback sync.Once + logger := logp.NewLogger("service") // On termination signals, gracefully stop the Beat sigc := make(chan os.Signal, 1) @@ -50,9 +51,9 @@ func HandleSignals(stopFunction func(), cancel context.CancelFunc) { switch sig { case syscall.SIGINT, syscall.SIGTERM: - logp.Debug("service", "Received sigterm/sigint, stopping") + logger.Debug("Received sigterm/sigint, stopping") case syscall.SIGHUP: - logp.Debug("service", "Received sighup, stopping") + logger.Debug("Received sighup, stopping") } cancel() @@ -61,7 +62,7 @@ func HandleSignals(stopFunction func(), cancel context.CancelFunc) { // Handle the Windows service events go ProcessWindowsControlEvents(func() { - logp.Debug("service", "Received svc stop/shutdown request") + logger.Debug("Received svc stop/shutdown request") callback.Do(stopFunction) }) } @@ -87,34 +88,46 @@ func withCPUProfile() bool { return *cpuprofile != "" } // BeforeRun takes care of necessary actions such as creating files // before the beat should run. func BeforeRun() { + logger := logp.NewLogger("service") if withCPUProfile() { cpuOut, err := os.Create(*cpuprofile) if err != nil { - log.Fatal(err) + logger.Errorf("Failed to create CPU profile: %v", err) + os.Exit(1) } pprof.StartCPUProfile(cpuOut) } - if *httpprof != "" { - logp.Info("start pprof endpoint") - go func() { - mux := http.NewServeMux() + if *httpprof == "" { + return + } - // register pprof handler - mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { - http.DefaultServeMux.ServeHTTP(w, r) - }) + logger.Info("Start pprof endpoint") + mux := http.NewServeMux() - // register metrics handler - mux.HandleFunc("/debug/vars", metricsHandler) + // Register pprof handler + mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) { + http.DefaultServeMux.ServeHTTP(w, r) + }) - endpoint := http.ListenAndServe(*httpprof, mux) - logp.Info("finished pprof endpoint: %v", endpoint) - }() + // Register metrics handler + mux.HandleFunc("/debug/vars", metricsHandler) + + // Ensure we are listening before returning + listener, err := net.Listen("tcp", *httpprof) + if err != nil { + logger.Errorf("Failed to start pprof listener: %v", err) + os.Exit(1) } + + go func() { + // Serve returns always a non-nil error + err := http.Serve(listener, mux) + logger.Infof("Finished pprof endpoint: %v", err) + }() } -// report expvar and all libbeat/monitoring metrics +// metricsHandler reports expvar and all libbeat/monitoring metrics func metricsHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") @@ -142,6 +155,7 @@ func metricsHandler(w http.ResponseWriter, r *http.Request) { // Cleanup handles cleaning up the runtime and OS environments. This includes // tasks such as stopping the CPU profile if it is running. func Cleanup() { + logger := logp.NewLogger("service") if withCPUProfile() { pprof.StopCPUProfile() cpuOut.Close() @@ -152,25 +166,29 @@ func Cleanup() { writeHeapProfile(*memprofile) - debugMemStats() + debugMemStats(logger.Named("mem")) } } -func debugMemStats() { +func debugMemStats(logger *logp.Logger) { + if !logger.IsDebug() { + return + } var m runtime.MemStats runtime.ReadMemStats(&m) - logp.Debug("mem", "Memory stats: In use: %d Total (even if freed): %d System: %d", + logger.Debug("Memory stats: In use: %d Total (even if freed): %d System: %d", m.Alloc, m.TotalAlloc, m.Sys) } func writeHeapProfile(filename string) { + logger := logp.NewLogger("service") f, err := os.Create(filename) if err != nil { - logp.Err("Failed creating file %s: %s", filename, err) + logger.Errorf("Failed creating file %s: %s", filename, err) return } pprof.WriteHeapProfile(f) f.Close() - logp.Info("Created memory profile file %s.", filename) + logger.Infof("Created memory profile file %s.", filename) }