Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(metrics): Show memory metrics for zero #6743

Merged
merged 2 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func run() {
x.RemoveCidFile()
}()

st.zero.closer.AddRunning(1)
go x.MonitorMemoryMetrics(st.zero.closer)

glog.Infoln("Running Dgraph Zero...")
st.zero.closer.Wait()
glog.Infoln("Closer closed.")
Expand Down
93 changes: 1 addition & 92 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ package posting
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -34,7 +28,6 @@ import (
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -44,90 +37,6 @@ const (
mb = 1 << 20
)

func getMemUsage() int {
if runtime.GOOS != "linux" {
pid := os.Getpid()
cmd := fmt.Sprintf("ps -ao rss,pid | grep %v", pid)
c1, err := exec.Command("bash", "-c", cmd).Output()
if err != nil {
// In case of error running the command, resort to go way
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
megs := ms.Alloc
return int(megs)
}

rss := strings.Split(string(c1), " ")[0]
kbs, err := strconv.Atoi(rss)
if err != nil {
return 0
}

megs := kbs << 10
return megs
}

contents, err := ioutil.ReadFile("/proc/self/stat")
if err != nil {
glog.Errorf("Can't read the proc file. Err: %v\n", err)
return 0
}

cont := strings.Split(string(contents), " ")
// 24th entry of the file is the RSS which denotes the number of pages
// used by the process.
if len(cont) < 24 {
glog.Errorln("Error in RSS from stat")
return 0
}

rss, err := strconv.Atoi(cont[23])
if err != nil {
glog.Errorln(err)
return 0
}

return rss * os.Getpagesize()
}

func updateMemoryMetrics(lc *z.Closer) {
defer lc.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

update := func() {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)

inUse := ms.HeapInuse + ms.StackInuse
// From runtime/mstats.go:
// HeapIdle minus HeapReleased estimates the amount of memory
// that could be returned to the OS, but is being retained by
// the runtime so it can grow the heap without requesting more
// memory from the OS. If this difference is significantly
// larger than the heap size, it indicates there was a recent
// transient spike in live heap size.
idle := ms.HeapIdle - ms.HeapReleased

ostats.Record(context.Background(),
x.MemoryInUse.M(int64(inUse)),
x.MemoryIdle.M(int64(idle)),
x.MemoryProc.M(int64(getMemUsage())))
}
// Call update immediately so that Dgraph reports memory stats without
// having to wait for the first tick.
update()

for {
select {
case <-lc.HasBeenClosed():
return
case <-ticker.C:
update()
}
}
}

var (
pstore *badger.DB
closer *z.Closer
Expand All @@ -138,7 +47,7 @@ var (
func Init(ps *badger.DB, cacheSize int64) {
pstore = ps
closer = z.NewCloser(1)
go updateMemoryMetrics(closer)
go x.MonitorMemoryMetrics(closer)
// Initialize cache.
if cacheSize == 0 {
return
Expand Down
126 changes: 126 additions & 0 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@ package x
import (
"context"
"expvar"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"time"

"go.opencensus.io/trace"

"contrib.go.opencensus.io/exporter/jaeger"
oc_prom "contrib.go.opencensus.io/exporter/prometheus"
datadog "github.com/DataDog/opencensus-go-exporter-datadog"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
"go.opencensus.io/stats"
ostats "go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -400,3 +410,119 @@ func RegisterExporters(conf *viper.Viper, service string) {
// glog.Fatalf("Unable to register OpenCensus stats: %v", err)
// }
}

// MonitorCacheHealth periodically monitors the cache metrics and reports if
// there is high contention in the cache.
func MonitorCacheHealth(db *badger.DB, closer *z.Closer) {
defer closer.Done()

record := func(ct string) {
switch ct {
case "pstore-block":
metrics := db.BlockCacheMetrics()
ostats.Record(context.Background(), PBlockHitRatio.M(metrics.Ratio()))
case "pstore-index":
metrics := db.IndexCacheMetrics()
ostats.Record(context.Background(), PIndexHitRatio.M(metrics.Ratio()))
default:
panic("invalid cache type")
}
}

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
record("pstore-block")
record("pstore-index")
case <-closer.HasBeenClosed():
return
}
}
}

func MonitorMemoryMetrics(lc *z.Closer) {
defer lc.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

update := func() {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)

inUse := ms.HeapInuse + ms.StackInuse
// From runtime/mstats.go:
// HeapIdle minus HeapReleased estimates the amount of memory
// that could be returned to the OS, but is being retained by
// the runtime so it can grow the heap without requesting more
// memory from the OS. If this difference is significantly
// larger than the heap size, it indicates there was a recent
// transient spike in live heap size.
idle := ms.HeapIdle - ms.HeapReleased

ostats.Record(context.Background(),
MemoryInUse.M(int64(inUse)),
MemoryIdle.M(int64(idle)),
MemoryProc.M(int64(getMemUsage())))
}
// Call update immediately so that Dgraph reports memory stats without
// having to wait for the first tick.
update()

for {
select {
case <-lc.HasBeenClosed():
return
case <-ticker.C:
update()
}
}
}

func getMemUsage() int {
if runtime.GOOS != "linux" {
pid := os.Getpid()
cmd := fmt.Sprintf("ps -ao rss,pid | grep %v", pid)
c1, err := exec.Command("bash", "-c", cmd).Output()
if err != nil {
// In case of error running the command, resort to go way
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
megs := ms.Alloc
return int(megs)
}

rss := strings.Split(string(c1), " ")[0]
kbs, err := strconv.Atoi(rss)
if err != nil {
return 0
}

megs := kbs << 10
return megs
}

contents, err := ioutil.ReadFile("/proc/self/stat")
if err != nil {
glog.Errorf("Can't read the proc file. Err: %v\n", err)
return 0
}

cont := strings.Split(string(contents), " ")
// 24th entry of the file is the RSS which denotes the number of pages
// used by the process.
if len(cont) < 24 {
glog.Errorln("Error in RSS from stat")
return 0
}

rss, err := strconv.Atoi(cont[23])
if err != nil {
glog.Errorln(err)
return 0
}

return rss * os.Getpagesize()
}
33 changes: 0 additions & 33 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.opencensus.io/plugin/ocgrpc"
ostats "go.opencensus.io/stats"
"go.opencensus.io/trace"
"golang.org/x/crypto/ssh/terminal"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1044,38 +1043,6 @@ func IsGuardian(groups []string) bool {
return false
}

// MonitorCacheHealth periodically monitors the cache metrics and reports if
// there is high contention in the cache.
func MonitorCacheHealth(db *badger.DB, closer *z.Closer) {
defer closer.Done()

record := func(ct string) {
switch ct {
case "pstore-block":
metrics := db.BlockCacheMetrics()
ostats.Record(context.Background(), PBlockHitRatio.M(metrics.Ratio()))
case "pstore-index":
metrics := db.IndexCacheMetrics()
ostats.Record(context.Background(), PIndexHitRatio.M(metrics.Ratio()))
default:
panic("invalid cache type")
}
}

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
record("pstore-block")
record("pstore-index")
case <-closer.HasBeenClosed():
return
}
}
}

// RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes.
// Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute.
func RunVlogGC(store *badger.DB, closer *z.Closer) {
Expand Down