From db993c1dcae6db13497980143f44e24029f70eb4 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 23 Oct 2020 16:59:02 +0530 Subject: [PATCH] fix(metrics): Show memory metrics for zero (#6743) We were monitoring the memory for just Alpha and not zero. This PR enables it for zero as well. This PR also moves the metrics related functions from x/x.go file to x/metrics.go . --- dgraph/cmd/zero/run.go | 3 + posting/lists.go | 93 +----------------------------- x/metrics.go | 126 +++++++++++++++++++++++++++++++++++++++++ x/x.go | 33 ----------- 4 files changed, 130 insertions(+), 125 deletions(-) diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 75437a92a96..581e82f554a 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -300,6 +300,9 @@ func run() { x.RemoveCidFile() }() + st.zero.closer.AddRunning(1) + go x.MonitorMemoryMetrics(st.zero.closer) + glog.Infoln("Running Dgraph Zero...") st.zero.closer.Wait() glog.Infoln("Closer closed.") diff --git a/posting/lists.go b/posting/lists.go index 5502935f5bc..0a5ec1d7f7a 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -19,12 +19,6 @@ package posting import ( "context" "fmt" - "io/ioutil" - "os" - "os/exec" - "runtime" - "strconv" - "strings" "sync" "time" @@ -34,7 +28,6 @@ import ( "github.com/dgraph-io/dgo/v200/protos/api" "github.com/dgraph-io/ristretto" "github.com/dgraph-io/ristretto/z" - "github.com/golang/glog" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -44,90 +37,6 @@ const ( mb = 1 << 20 ) -func getMemUsage() int { - if runtime.GOOS != "linux" { - pid := os.Getpid() - cmd := fmt.Sprintf("ps -ao rss,pid | grep %v", pid) - c1, err := exec.Command("bash", "-c", cmd).Output() - if err != nil { - // In case of error running the command, resort to go way - var ms runtime.MemStats - runtime.ReadMemStats(&ms) - megs := ms.Alloc - return int(megs) - } - - rss := strings.Split(string(c1), " ")[0] - kbs, err := strconv.Atoi(rss) - if err != nil { - return 0 - } - - megs := kbs << 10 - return megs - } - - contents, err := ioutil.ReadFile("/proc/self/stat") - if err != nil { - glog.Errorf("Can't read the proc file. Err: %v\n", err) - return 0 - } - - cont := strings.Split(string(contents), " ") - // 24th entry of the file is the RSS which denotes the number of pages - // used by the process. - if len(cont) < 24 { - glog.Errorln("Error in RSS from stat") - return 0 - } - - rss, err := strconv.Atoi(cont[23]) - if err != nil { - glog.Errorln(err) - return 0 - } - - return rss * os.Getpagesize() -} - -func updateMemoryMetrics(lc *z.Closer) { - defer lc.Done() - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - - update := func() { - var ms runtime.MemStats - runtime.ReadMemStats(&ms) - - inUse := ms.HeapInuse + ms.StackInuse - // From runtime/mstats.go: - // HeapIdle minus HeapReleased estimates the amount of memory - // that could be returned to the OS, but is being retained by - // the runtime so it can grow the heap without requesting more - // memory from the OS. If this difference is significantly - // larger than the heap size, it indicates there was a recent - // transient spike in live heap size. - idle := ms.HeapIdle - ms.HeapReleased - - ostats.Record(context.Background(), - x.MemoryInUse.M(int64(inUse)), - x.MemoryIdle.M(int64(idle)), - x.MemoryProc.M(int64(getMemUsage()))) - } - // Call update immediately so that Dgraph reports memory stats without - // having to wait for the first tick. - update() - - for { - select { - case <-lc.HasBeenClosed(): - return - case <-ticker.C: - update() - } - } -} - var ( pstore *badger.DB closer *z.Closer @@ -138,7 +47,7 @@ var ( func Init(ps *badger.DB, cacheSize int64) { pstore = ps closer = z.NewCloser(1) - go updateMemoryMetrics(closer) + go x.MonitorMemoryMetrics(closer) // Initialize cache. if cacheSize == 0 { return diff --git a/x/metrics.go b/x/metrics.go index 25a08e264e8..78e728f4122 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -19,8 +19,15 @@ package x import ( "context" "expvar" + "fmt" + "io/ioutil" "log" "net/http" + "os" + "os/exec" + "runtime" + "strconv" + "strings" "time" "go.opencensus.io/trace" @@ -28,10 +35,13 @@ import ( "contrib.go.opencensus.io/exporter/jaeger" oc_prom "contrib.go.opencensus.io/exporter/prometheus" datadog "github.com/DataDog/opencensus-go-exporter-datadog" + "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/viper" "go.opencensus.io/stats" + ostats "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) @@ -400,3 +410,119 @@ func RegisterExporters(conf *viper.Viper, service string) { // glog.Fatalf("Unable to register OpenCensus stats: %v", err) // } } + +// MonitorCacheHealth periodically monitors the cache metrics and reports if +// there is high contention in the cache. +func MonitorCacheHealth(db *badger.DB, closer *z.Closer) { + defer closer.Done() + + record := func(ct string) { + switch ct { + case "pstore-block": + metrics := db.BlockCacheMetrics() + ostats.Record(context.Background(), PBlockHitRatio.M(metrics.Ratio())) + case "pstore-index": + metrics := db.IndexCacheMetrics() + ostats.Record(context.Background(), PIndexHitRatio.M(metrics.Ratio())) + default: + panic("invalid cache type") + } + } + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + record("pstore-block") + record("pstore-index") + case <-closer.HasBeenClosed(): + return + } + } +} + +func MonitorMemoryMetrics(lc *z.Closer) { + defer lc.Done() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + update := func() { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + + inUse := ms.HeapInuse + ms.StackInuse + // From runtime/mstats.go: + // HeapIdle minus HeapReleased estimates the amount of memory + // that could be returned to the OS, but is being retained by + // the runtime so it can grow the heap without requesting more + // memory from the OS. If this difference is significantly + // larger than the heap size, it indicates there was a recent + // transient spike in live heap size. + idle := ms.HeapIdle - ms.HeapReleased + + ostats.Record(context.Background(), + MemoryInUse.M(int64(inUse)), + MemoryIdle.M(int64(idle)), + MemoryProc.M(int64(getMemUsage()))) + } + // Call update immediately so that Dgraph reports memory stats without + // having to wait for the first tick. + update() + + for { + select { + case <-lc.HasBeenClosed(): + return + case <-ticker.C: + update() + } + } +} + +func getMemUsage() int { + if runtime.GOOS != "linux" { + pid := os.Getpid() + cmd := fmt.Sprintf("ps -ao rss,pid | grep %v", pid) + c1, err := exec.Command("bash", "-c", cmd).Output() + if err != nil { + // In case of error running the command, resort to go way + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + megs := ms.Alloc + return int(megs) + } + + rss := strings.Split(string(c1), " ")[0] + kbs, err := strconv.Atoi(rss) + if err != nil { + return 0 + } + + megs := kbs << 10 + return megs + } + + contents, err := ioutil.ReadFile("/proc/self/stat") + if err != nil { + glog.Errorf("Can't read the proc file. Err: %v\n", err) + return 0 + } + + cont := strings.Split(string(contents), " ") + // 24th entry of the file is the RSS which denotes the number of pages + // used by the process. + if len(cont) < 24 { + glog.Errorln("Error in RSS from stat") + return 0 + } + + rss, err := strconv.Atoi(cont[23]) + if err != nil { + glog.Errorln(err) + return 0 + } + + return rss * os.Getpagesize() +} diff --git a/x/x.go b/x/x.go index f8deb23288f..24029a2b762 100644 --- a/x/x.go +++ b/x/x.go @@ -51,7 +51,6 @@ import ( "github.com/pkg/errors" "github.com/spf13/viper" "go.opencensus.io/plugin/ocgrpc" - ostats "go.opencensus.io/stats" "go.opencensus.io/trace" "golang.org/x/crypto/ssh/terminal" "google.golang.org/grpc" @@ -1050,38 +1049,6 @@ func IsGuardian(groups []string) bool { return false } -// MonitorCacheHealth periodically monitors the cache metrics and reports if -// there is high contention in the cache. -func MonitorCacheHealth(db *badger.DB, closer *z.Closer) { - defer closer.Done() - - record := func(ct string) { - switch ct { - case "pstore-block": - metrics := db.BlockCacheMetrics() - ostats.Record(context.Background(), PBlockHitRatio.M(metrics.Ratio())) - case "pstore-index": - metrics := db.IndexCacheMetrics() - ostats.Record(context.Background(), PIndexHitRatio.M(metrics.Ratio())) - default: - panic("invalid cache type") - } - } - - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - record("pstore-block") - record("pstore-index") - case <-closer.HasBeenClosed(): - return - } - } -} - // RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes. // Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute. func RunVlogGC(store *badger.DB, closer *z.Closer) {