diff --git a/.gitignore b/.gitignore index 50f6a154..5e8af774 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ nebula-importer # IDE .vscode/ +.idea/ diff --git a/pkg/cmd/runner.go b/pkg/cmd/runner.go index 4702584a..01f93d03 100644 --- a/pkg/cmd/runner.go +++ b/pkg/cmd/runner.go @@ -16,6 +16,7 @@ import ( type Runner struct { errs []error Readers []*reader.FileReader + stataMgr *stats.StatsMgr NumFailed int64 } @@ -28,6 +29,10 @@ func (r *Runner) Error() error { return r.errs[0] } +func (r *Runner) Errors() []error { + return r.errs +} + func (r *Runner) Run(yaml *config.YAMLConfig) { defer func() { if re := recover(); re != nil { @@ -39,7 +44,8 @@ func (r *Runner) Run(yaml *config.YAMLConfig) { logger.Init(*yaml.LogPath) } - statsMgr := stats.NewStatsMgr(len(yaml.Files)) + statsMgr := stats.NewStatsMgr(yaml.Files) + r.stataMgr = statsMgr defer statsMgr.Close() clientMgr, err := client.NewNebulaClientMgr(yaml.NebulaClientSettings, statsMgr.StatsCh) @@ -68,7 +74,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) { } else { go func(fr *reader.FileReader, filename string) { numReadFailed, err := fr.Read() - statsMgr.NumReadFailed += numReadFailed + statsMgr.Stats.NumReadFailed += numReadFailed if err != nil { r.errs = append(r.errs, err) statsMgr.StatsCh <- base.NewFileDoneStats(filename) @@ -83,10 +89,19 @@ func (r *Runner) Run(yaml *config.YAMLConfig) { <-statsMgr.DoneCh r.Readers = nil - r.NumFailed = statsMgr.NumFailed + r.NumFailed = statsMgr.Stats.NumFailed - if statsMgr.NumFailed > 0 { + if statsMgr.Stats.NumFailed > 0 { r.errs = append(r.errs, errors.Wrap(errors.NotCompleteError, - fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.NumFailed))) + fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.Stats.NumFailed))) + } +} + +func (r *Runner) QueryStats() *stats.Stats { + if r.stataMgr != nil { + stats := r.stataMgr.StatsQuery() + return &stats + } else { + return nil } } diff --git a/pkg/csv/reader.go b/pkg/csv/reader.go index 30ee3209..b0f7800b 100644 --- a/pkg/csv/reader.go +++ b/pkg/csv/reader.go @@ -2,8 +2,10 @@ package csv import ( "bufio" + "bytes" "encoding/csv" "fmt" + "io" "os" "github.com/vesoft-inc/nebula-importer/pkg/base" @@ -58,3 +60,32 @@ func (r *CSVReader) ReadLine() (base.Data, error) { return base.InsertData(line), nil } } + +func CountLines(path string, withHeader bool) (int64, error) { + file, err := os.Open(path) + if err != nil { + logger.Errorf("count line fail: %s", path) + return 0, err + } + r := bufio.NewReader(file) + buf := make([]byte, 32*1024) + count := 0 + lineSep := []byte{'\n'} + for { + n, err := r.Read(buf) + isLastEndline := n > 0 && '\n' == buf[n-1] + count += bytes.Count(buf[:n], lineSep) + switch { + case err == io.EOF: + if !isLastEndline { + count++ + } + if withHeader && count > 0 { + count-- + } + return int64(count), nil + case err != nil: + return 0, err + } + } +} diff --git a/pkg/stats/statsmgr.go b/pkg/stats/statsmgr.go index e905482c..a809fcda 100644 --- a/pkg/stats/statsmgr.go +++ b/pkg/stats/statsmgr.go @@ -2,32 +2,62 @@ package stats import ( "fmt" + "sync" "time" "github.com/vesoft-inc/nebula-importer/pkg/base" + "github.com/vesoft-inc/nebula-importer/pkg/config" + "github.com/vesoft-inc/nebula-importer/pkg/csv" "github.com/vesoft-inc/nebula-importer/pkg/logger" ) type StatsMgr struct { StatsCh chan base.Stats DoneCh chan bool - NumFailed int64 - NumReadFailed int64 - totalCount int64 - totalBatches int64 - totalLatency int64 - totalReqTime int64 + statsRW sync.RWMutex + Stats Stats } -func NewStatsMgr(numReadingFiles int) *StatsMgr { +type Stats struct { + NumFailed int64 `json:"numFailed"` + NumReadFailed int64 `json:"numReadFailed"` + TotalLine int64 `json:"totalLine"` + TotalCount int64 `json:"totalCount"` + TotalBatches int64 `json:"totalBatches"` + TotalLatency int64 `json:"totalLatency"` + TotalReqTime int64 `json:"totalReqTime"` +} + +func NewStatsMgr(files []*config.File) *StatsMgr { + numReadingFiles := len(files) + totalLine := int64(0) + var wg sync.WaitGroup + wg.Add(numReadingFiles) + for _, file := range files { + path := file.Path + withHeader := file.CSV.WithHeader + go func(path string) { + lines, err := csv.CountLines(path, *withHeader) + if err != nil { + logger.Errorf("count line fail: %s, %s", path, err.Error()) + } + totalLine += lines + defer wg.Done() + }(*path) + } + wg.Wait() + stats := Stats { + NumFailed: 0, + TotalLine: totalLine, + TotalCount: 0, + TotalLatency: 0, + TotalBatches: 0, + TotalReqTime: 0.0, + } m := StatsMgr{ StatsCh: make(chan base.Stats), DoneCh: make(chan bool), - NumFailed: 0, - totalCount: 0, - totalLatency: 0, - totalBatches: 0, - totalReqTime: 0.0, + Stats: stats, } go m.startWorker(numReadingFiles) return &m @@ -39,28 +69,38 @@ func (s *StatsMgr) Close() { } func (s *StatsMgr) updateStat(stat base.Stats) { - s.totalBatches++ - s.totalCount += int64(stat.BatchSize) - s.totalReqTime += stat.ReqTime - s.totalLatency += stat.Latency + s.statsRW.Lock() + s.Stats.TotalBatches++ + s.Stats.TotalCount += int64(stat.BatchSize) + s.Stats.TotalReqTime += stat.ReqTime + s.Stats.TotalLatency += stat.Latency + s.statsRW.Unlock() } func (s *StatsMgr) updateFailed(stat base.Stats) { - s.totalBatches++ - s.totalCount += int64(stat.BatchSize) - s.NumFailed += int64(stat.BatchSize) + s.statsRW.Lock() + s.Stats.TotalBatches++ + s.Stats.TotalCount += int64(stat.BatchSize) + s.Stats.NumFailed += int64(stat.BatchSize) + s.statsRW.Unlock() } func (s *StatsMgr) print(prefix string, now time.Time) { - if s.totalCount == 0 { + if s.Stats.TotalCount == 0 { return } secs := time.Since(now).Seconds() - avgLatency := s.totalLatency / s.totalBatches - avgReq := s.totalReqTime / s.totalBatches - rps := float64(s.totalCount) / secs + avgLatency := s.Stats.TotalLatency / s.Stats.TotalBatches + avgReq := s.Stats.TotalReqTime / s.Stats.TotalBatches + rps := float64(s.Stats.TotalCount) / secs logger.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Read Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)", - prefix, secs, s.totalCount, s.NumFailed, s.NumReadFailed, avgLatency, avgReq, rps) + prefix, secs, s.Stats.TotalCount, s.Stats.NumFailed, s.Stats.NumReadFailed, avgLatency, avgReq, rps) +} + +func (s *StatsMgr) StatsQuery() Stats { + s.statsRW.RLock() + defer s.statsRW.RUnlock() + return s.Stats } func (s *StatsMgr) startWorker(numReadingFiles int) {