Skip to content

Commit

Permalink
optimize count file bytes and isDone
Browse files Browse the repository at this point in the history
  • Loading branch information
xjlgod committed Mar 11, 2022
1 parent ca82699 commit 06d7eb2
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
8 changes: 6 additions & 2 deletions pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {

r.Readers = freaders
r.stataMgr = statsMgr
r.stataMgr.CountFileBytes(r.Readers)

<-statsMgr.DoneCh

r.stataMgr.CountFileBytes(r.Readers)
r.Readers = nil
r.NumFailed = statsMgr.Stats.NumFailed

Expand All @@ -99,7 +100,10 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {

func (r *Runner) QueryStats() *stats.Stats {
if r.stataMgr != nil {
if r.Readers == nil {
if r.Readers != nil {
r.stataMgr.CountFileBytes(r.Readers)
}
if r.stataMgr.Done == true {
return &r.stataMgr.Stats
}
r.stataMgr.StatsCh <- base.NewOutputStats()
Expand Down
20 changes: 12 additions & 8 deletions pkg/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package csv
import (
"bufio"
"encoding/csv"
"errors"
"fmt"
"io"
"os"

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/config"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
"io"
"os"
)

type CSVReader struct {
Expand All @@ -19,6 +21,7 @@ type CSVReader struct {
br *bufio.Reader
totalBytes int64
initComplete bool

}

type recordReader struct {
Expand Down Expand Up @@ -50,7 +53,9 @@ func (r *CSVReader) InitReader(file *os.File) {
logger.Infof("The stat of %s is wrong, %s", file.Name(), err)
}
r.totalBytes = stat.Size()
r.initComplete = true
defer func() {
r.initComplete = true
}()
}

func (r *CSVReader) ReadLine() (base.Data, error) {
Expand Down Expand Up @@ -88,10 +93,9 @@ func (r *CSVReader) ReadLine() (base.Data, error) {
}
}

func (r *CSVReader) TotalBytes() (int64) {
for {
if r.initComplete {
return r.totalBytes
}
func (r *CSVReader) TotalBytes() (int64, error) {
if r.initComplete {
return r.totalBytes, nil
}
return 0, errors.New("init not complete")
}
2 changes: 1 addition & 1 deletion pkg/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type DataFileReader interface {
InitReader(*os.File)
ReadLine() (base.Data, error)
TotalBytes() (int64)
TotalBytes() (int64, error)
}

// FIXME: private fields
Expand Down
20 changes: 19 additions & 1 deletion pkg/stats/statsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type StatsMgr struct {
StatsCh chan base.Stats
DoneCh chan bool
Stats Stats
Done bool
CountFileDone bool
}

type Stats struct {
Expand Down Expand Up @@ -52,6 +54,8 @@ func NewStatsMgr(files []*config.File) *StatsMgr {
func (s *StatsMgr) Close() {
close(s.StatsCh)
close(s.DoneCh)
close(s.OutputStatsCh)
s.Done = true
}

func (s *StatsMgr) updateStat(stat base.Stats) {
Expand Down Expand Up @@ -86,11 +90,25 @@ func (s *StatsMgr) print(prefix string, now time.Time) {
}

func (s *StatsMgr) CountFileBytes(freaders []*reader.FileReader) {
if s.CountFileDone {
return
}
flag := true
s.Stats.TotalCount = 0
for _, r := range freaders {
if r == nil {
continue
}
s.Stats.TotalBytes += r.DataReader.TotalBytes()
bytes, err := r.DataReader.TotalBytes()
if err != nil {
logger.Infof("file %s count file bytes fail, $s", *r.File.Path, err)
flag = false
continue
}
s.Stats.TotalBytes += bytes
}
if flag {
s.CountFileDone = true
}
}

Expand Down

0 comments on commit 06d7eb2

Please sign in to comment.