Skip to content

Commit

Permalink
fix(metrics): Show memory metrics for zero (#6743)
Browse files Browse the repository at this point in the history
We were monitoring the memory for just Alpha and not zero. This PR enables it for zero as well.
This PR also moves the metrics related functions from x/x.go file to x/metrics.go .
  • Loading branch information
Ibrahim Jarif authored Oct 23, 2020
1 parent bb1adbf commit db993c1
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 125 deletions.
3 changes: 3 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ func run() {
x.RemoveCidFile()
}()

st.zero.closer.AddRunning(1)
go x.MonitorMemoryMetrics(st.zero.closer)

glog.Infoln("Running Dgraph Zero...")
st.zero.closer.Wait()
glog.Infoln("Closer closed.")
Expand Down
93 changes: 1 addition & 92 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ package posting
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -34,7 +28,6 @@ import (
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -44,90 +37,6 @@ const (
mb = 1 << 20
)

func getMemUsage() int {
if runtime.GOOS != "linux" {
pid := os.Getpid()
cmd := fmt.Sprintf("ps -ao rss,pid | grep %v", pid)
c1, err := exec.Command("bash", "-c", cmd).Output()
if err != nil {
// In case of error running the command, resort to go way
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
megs := ms.Alloc
return int(megs)
}

rss := strings.Split(string(c1), " ")[0]
kbs, err := strconv.Atoi(rss)
if err != nil {
return 0
}

megs := kbs << 10
return megs
}

contents, err := ioutil.ReadFile("/proc/self/stat")
if err != nil {
glog.Errorf("Can't read the proc file. Err: %v\n", err)
return 0
}

cont := strings.Split(string(contents), " ")
// 24th entry of the file is the RSS which denotes the number of pages
// used by the process.
if len(cont) < 24 {
glog.Errorln("Error in RSS from stat")
return 0
}

rss, err := strconv.Atoi(cont[23])
if err != nil {
glog.Errorln(err)
return 0
}

return rss * os.Getpagesize()
}

func updateMemoryMetrics(lc *z.Closer) {
defer lc.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

update := func() {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)

inUse := ms.HeapInuse + ms.StackInuse
// From runtime/mstats.go:
// HeapIdle minus HeapReleased estimates the amount of memory
// that could be returned to the OS, but is being retained by
// the runtime so it can grow the heap without requesting more
// memory from the OS. If this difference is significantly
// larger than the heap size, it indicates there was a recent
// transient spike in live heap size.
idle := ms.HeapIdle - ms.HeapReleased

ostats.Record(context.Background(),
x.MemoryInUse.M(int64(inUse)),
x.MemoryIdle.M(int64(idle)),
x.MemoryProc.M(int64(getMemUsage())))
}
// Call update immediately so that Dgraph reports memory stats without
// having to wait for the first tick.
update()

for {
select {
case <-lc.HasBeenClosed():
return
case <-ticker.C:
update()
}
}
}

var (
pstore *badger.DB
closer *z.Closer
Expand All @@ -138,7 +47,7 @@ var (
func Init(ps *badger.DB, cacheSize int64) {
pstore = ps
closer = z.NewCloser(1)
go updateMemoryMetrics(closer)
go x.MonitorMemoryMetrics(closer)
// Initialize cache.
if cacheSize == 0 {
return
Expand Down
126 changes: 126 additions & 0 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@ package x
import (
"context"
"expvar"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"time"

"go.opencensus.io/trace"

"contrib.go.opencensus.io/exporter/jaeger"
oc_prom "contrib.go.opencensus.io/exporter/prometheus"
datadog "github.com/DataDog/opencensus-go-exporter-datadog"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
"go.opencensus.io/stats"
ostats "go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -400,3 +410,119 @@ func RegisterExporters(conf *viper.Viper, service string) {
// glog.Fatalf("Unable to register OpenCensus stats: %v", err)
// }
}

// MonitorCacheHealth periodically monitors the cache metrics and reports if
// there is high contention in the cache.
func MonitorCacheHealth(db *badger.DB, closer *z.Closer) {
defer closer.Done()

record := func(ct string) {
switch ct {
case "pstore-block":
metrics := db.BlockCacheMetrics()
ostats.Record(context.Background(), PBlockHitRatio.M(metrics.Ratio()))
case "pstore-index":
metrics := db.IndexCacheMetrics()
ostats.Record(context.Background(), PIndexHitRatio.M(metrics.Ratio()))
default:
panic("invalid cache type")
}
}

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
record("pstore-block")
record("pstore-index")
case <-closer.HasBeenClosed():
return
}
}
}

func MonitorMemoryMetrics(lc *z.Closer) {
defer lc.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

update := func() {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)

inUse := ms.HeapInuse + ms.StackInuse
// From runtime/mstats.go:
// HeapIdle minus HeapReleased estimates the amount of memory
// that could be returned to the OS, but is being retained by
// the runtime so it can grow the heap without requesting more
// memory from the OS. If this difference is significantly
// larger than the heap size, it indicates there was a recent
// transient spike in live heap size.
idle := ms.HeapIdle - ms.HeapReleased

ostats.Record(context.Background(),
MemoryInUse.M(int64(inUse)),
MemoryIdle.M(int64(idle)),
MemoryProc.M(int64(getMemUsage())))
}
// Call update immediately so that Dgraph reports memory stats without
// having to wait for the first tick.
update()

for {
select {
case <-lc.HasBeenClosed():
return
case <-ticker.C:
update()
}
}
}

func getMemUsage() int {
if runtime.GOOS != "linux" {
pid := os.Getpid()
cmd := fmt.Sprintf("ps -ao rss,pid | grep %v", pid)
c1, err := exec.Command("bash", "-c", cmd).Output()
if err != nil {
// In case of error running the command, resort to go way
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
megs := ms.Alloc
return int(megs)
}

rss := strings.Split(string(c1), " ")[0]
kbs, err := strconv.Atoi(rss)
if err != nil {
return 0
}

megs := kbs << 10
return megs
}

contents, err := ioutil.ReadFile("/proc/self/stat")
if err != nil {
glog.Errorf("Can't read the proc file. Err: %v\n", err)
return 0
}

cont := strings.Split(string(contents), " ")
// 24th entry of the file is the RSS which denotes the number of pages
// used by the process.
if len(cont) < 24 {
glog.Errorln("Error in RSS from stat")
return 0
}

rss, err := strconv.Atoi(cont[23])
if err != nil {
glog.Errorln(err)
return 0
}

return rss * os.Getpagesize()
}
33 changes: 0 additions & 33 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.opencensus.io/plugin/ocgrpc"
ostats "go.opencensus.io/stats"
"go.opencensus.io/trace"
"golang.org/x/crypto/ssh/terminal"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1050,38 +1049,6 @@ func IsGuardian(groups []string) bool {
return false
}

// MonitorCacheHealth periodically monitors the cache metrics and reports if
// there is high contention in the cache.
func MonitorCacheHealth(db *badger.DB, closer *z.Closer) {
defer closer.Done()

record := func(ct string) {
switch ct {
case "pstore-block":
metrics := db.BlockCacheMetrics()
ostats.Record(context.Background(), PBlockHitRatio.M(metrics.Ratio()))
case "pstore-index":
metrics := db.IndexCacheMetrics()
ostats.Record(context.Background(), PIndexHitRatio.M(metrics.Ratio()))
default:
panic("invalid cache type")
}
}

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
record("pstore-block")
record("pstore-index")
case <-closer.HasBeenClosed():
return
}
}
}

// RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes.
// Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute.
func RunVlogGC(store *badger.DB, closer *z.Closer) {
Expand Down

0 comments on commit db993c1

Please sign in to comment.