Skip to content

Commit

Permalink
statsMgr information get for studio
Browse files Browse the repository at this point in the history
  • Loading branch information
xjlgod committed Feb 28, 2022
1 parent 1dd8e73 commit bf87ee7
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ nebula-importer

# IDE
.vscode/
.idea/
25 changes: 20 additions & 5 deletions pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type Runner struct {
errs []error
Readers []*reader.FileReader
stataMgr *stats.StatsMgr
NumFailed int64
}

Expand All @@ -28,6 +29,10 @@ func (r *Runner) Error() error {
return r.errs[0]
}

func (r *Runner) Errors() []error {
return r.errs
}

func (r *Runner) Run(yaml *config.YAMLConfig) {
defer func() {
if re := recover(); re != nil {
Expand All @@ -39,7 +44,8 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
logger.Init(*yaml.LogPath)
}

statsMgr := stats.NewStatsMgr(len(yaml.Files))
statsMgr := stats.NewStatsMgr(yaml.Files)
r.stataMgr = statsMgr
defer statsMgr.Close()

clientMgr, err := client.NewNebulaClientMgr(yaml.NebulaClientSettings, statsMgr.StatsCh)
Expand Down Expand Up @@ -68,7 +74,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
} else {
go func(fr *reader.FileReader, filename string) {
numReadFailed, err := fr.Read()
statsMgr.NumReadFailed += numReadFailed
statsMgr.Stats.NumReadFailed += numReadFailed
if err != nil {
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(filename)
Expand All @@ -83,10 +89,19 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
<-statsMgr.DoneCh

r.Readers = nil
r.NumFailed = statsMgr.NumFailed
r.NumFailed = statsMgr.Stats.NumFailed

if statsMgr.NumFailed > 0 {
if statsMgr.Stats.NumFailed > 0 {
r.errs = append(r.errs, errors.Wrap(errors.NotCompleteError,
fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.NumFailed)))
fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.Stats.NumFailed)))
}
}

func (r *Runner) QueryStats() *stats.Stats {
if r.stataMgr != nil {
stats := r.stataMgr.StatsQuery()
return &stats
} else {
return nil
}
}
31 changes: 31 additions & 0 deletions pkg/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package csv

import (
"bufio"
"bytes"
"encoding/csv"
"fmt"
"io"
"os"

"github.com/vesoft-inc/nebula-importer/pkg/base"
Expand Down Expand Up @@ -58,3 +60,32 @@ func (r *CSVReader) ReadLine() (base.Data, error) {
return base.InsertData(line), nil
}
}

func CountLines(path string, withHeader bool) (int64, error) {
file, err := os.Open(path)
if err != nil {
logger.Errorf("count line fail: %s", path)
return 0, err
}
r := bufio.NewReader(file)
buf := make([]byte, 32*1024)
count := 0
lineSep := []byte{'\n'}
for {
n, err := r.Read(buf)
isLastEndline := n > 0 && '\n' == buf[n-1]
count += bytes.Count(buf[:n], lineSep)
switch {
case err == io.EOF:
if !isLastEndline {
count++
}
if withHeader && count > 0 {
count--
}
return int64(count), nil
case err != nil:
return 0, err
}
}
}
88 changes: 64 additions & 24 deletions pkg/stats/statsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,62 @@ package stats

import (
"fmt"
"sync"
"time"

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/config"
"github.com/vesoft-inc/nebula-importer/pkg/csv"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
)

type StatsMgr struct {
StatsCh chan base.Stats
DoneCh chan bool
NumFailed int64
NumReadFailed int64
totalCount int64
totalBatches int64
totalLatency int64
totalReqTime int64
statsRW sync.RWMutex
Stats Stats
}

func NewStatsMgr(numReadingFiles int) *StatsMgr {
type Stats struct {
NumFailed int64 `json:"numFailed"`
NumReadFailed int64 `json:"numReadFailed"`
TotalLine int64 `json:"totalLine"`
TotalCount int64 `json:"totalCount"`
TotalBatches int64 `json:"totalBatches"`
TotalLatency int64 `json:"totalLatency"`
TotalReqTime int64 `json:"totalReqTime"`
}

func NewStatsMgr(files []*config.File) *StatsMgr {
numReadingFiles := len(files)
totalLine := int64(0)
var wg sync.WaitGroup
wg.Add(numReadingFiles)
for _, file := range files {
path := file.Path
withHeader := file.CSV.WithHeader
go func(path string) {
lines, err := csv.CountLines(path, *withHeader)
if err != nil {
logger.Errorf("count line fail: %s, %s", path, err.Error())
}
totalLine += lines
defer wg.Done()
}(*path)
}
wg.Wait()
stats := Stats {
NumFailed: 0,
TotalLine: totalLine,
TotalCount: 0,
TotalLatency: 0,
TotalBatches: 0,
TotalReqTime: 0.0,
}
m := StatsMgr{
StatsCh: make(chan base.Stats),
DoneCh: make(chan bool),
NumFailed: 0,
totalCount: 0,
totalLatency: 0,
totalBatches: 0,
totalReqTime: 0.0,
Stats: stats,
}
go m.startWorker(numReadingFiles)
return &m
Expand All @@ -39,28 +69,38 @@ func (s *StatsMgr) Close() {
}

func (s *StatsMgr) updateStat(stat base.Stats) {
s.totalBatches++
s.totalCount += int64(stat.BatchSize)
s.totalReqTime += stat.ReqTime
s.totalLatency += stat.Latency
s.statsRW.Lock()
s.Stats.TotalBatches++
s.Stats.TotalCount += int64(stat.BatchSize)
s.Stats.TotalReqTime += stat.ReqTime
s.Stats.TotalLatency += stat.Latency
s.statsRW.Unlock()
}

func (s *StatsMgr) updateFailed(stat base.Stats) {
s.totalBatches++
s.totalCount += int64(stat.BatchSize)
s.NumFailed += int64(stat.BatchSize)
s.statsRW.Lock()
s.Stats.TotalBatches++
s.Stats.TotalCount += int64(stat.BatchSize)
s.Stats.NumFailed += int64(stat.BatchSize)
s.statsRW.Unlock()
}

func (s *StatsMgr) print(prefix string, now time.Time) {
if s.totalCount == 0 {
if s.Stats.TotalCount == 0 {
return
}
secs := time.Since(now).Seconds()
avgLatency := s.totalLatency / s.totalBatches
avgReq := s.totalReqTime / s.totalBatches
rps := float64(s.totalCount) / secs
avgLatency := s.Stats.TotalLatency / s.Stats.TotalBatches
avgReq := s.Stats.TotalReqTime / s.Stats.TotalBatches
rps := float64(s.Stats.TotalCount) / secs
logger.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Read Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)",
prefix, secs, s.totalCount, s.NumFailed, s.NumReadFailed, avgLatency, avgReq, rps)
prefix, secs, s.Stats.TotalCount, s.Stats.NumFailed, s.Stats.NumReadFailed, avgLatency, avgReq, rps)
}

func (s *StatsMgr) StatsQuery() Stats {
s.statsRW.RLock()
defer s.statsRW.RUnlock()
return s.Stats
}

func (s *StatsMgr) startWorker(numReadingFiles int) {
Expand Down

0 comments on commit bf87ee7

Please sign in to comment.