Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsMgr information get for studio #194

Merged
merged 8 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ nebula-importer

# IDE
.vscode/
.idea/
36 changes: 23 additions & 13 deletions pkg/base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,35 @@ const (
SUCCESS StatType = 0
FAILURE StatType = 1
FILEDONE StatType = 2
OUTPUT StatType = 3
)

const STAT_FILEDONE string = "FILEDONE"

type Stats struct {
Type StatType
Latency int64
ReqTime int64
BatchSize int
Filename string
Type StatType
Latency int64
ReqTime int64
BatchSize int
ImportedBytes int64
Filename string
}

func NewSuccessStats(latency int64, reqTime int64, batchSize int) Stats {
func NewSuccessStats(latency int64, reqTime int64, batchSize int, importedBytes int64) Stats {
return Stats{
Type: SUCCESS,
Latency: latency,
ReqTime: reqTime,
BatchSize: batchSize,
Type: SUCCESS,
Latency: latency,
ReqTime: reqTime,
BatchSize: batchSize,
ImportedBytes: importedBytes,
}
}

func NewFailureStats(batchSize int) Stats {
func NewFailureStats(batchSize int, importedBytes int64) Stats {
return Stats{
Type: FAILURE,
BatchSize: batchSize,
Type: FAILURE,
BatchSize: batchSize,
ImportedBytes: importedBytes,
}
}

Expand All @@ -43,3 +47,9 @@ func NewFileDoneStats(filename string) Stats {
Filename: fpath,
}
}

func NewOutputStats() Stats {
return Stats{
Type: OUTPUT,
}
}
10 changes: 7 additions & 3 deletions pkg/base/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,30 @@ func (op OpType) String() string {
type Data struct {
Type OpType
Record Record
Bytes int
}

func InsertData(record Record) Data {
func InsertData(record Record, bytes int) Data {
return Data{
Type: INSERT,
Record: record,
Bytes: bytes,
}
}

func DeleteData(record Record) Data {
func DeleteData(record Record, bytes int) Data {
return Data{
Type: DELETE,
Record: record,
Bytes: bytes,
}
}

func HeaderData(record Record) Data {
func HeaderData(record Record, bytes int) Data {
return Data{
Type: HEADER,
Record: record,
Bytes: bytes,
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ func (p *ClientPool) startWorker(i int) {
}
} else {
timeInMs := time.Since(now).Nanoseconds() / 1e3
p.statsCh <- base.NewSuccessStats(int64(resp.GetLatency()), timeInMs, len(data.Data))
var importedBytes int64
for _, d := range data.Data {
importedBytes += int64(d.Bytes)
}
p.statsCh <- base.NewSuccessStats(int64(resp.GetLatency()), timeInMs, len(data.Data), importedBytes)
}
}
}
34 changes: 29 additions & 5 deletions pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type Runner struct {
errs []error
Readers []*reader.FileReader
stataMgr *stats.StatsMgr
NumFailed int64
}

Expand All @@ -28,6 +29,10 @@ func (r *Runner) Error() error {
return r.errs[0]
}

func (r *Runner) Errors() []error {
return r.errs
}

func (r *Runner) Run(yaml *config.YAMLConfig) {
defer func() {
if re := recover(); re != nil {
Expand All @@ -39,7 +44,8 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
logger.Init(*yaml.LogPath)
}

statsMgr := stats.NewStatsMgr(len(yaml.Files))
statsMgr := stats.NewStatsMgr(yaml.Files)
r.stataMgr = statsMgr
defer statsMgr.Close()

clientMgr, err := client.NewNebulaClientMgr(yaml.NebulaClientSettings, statsMgr.StatsCh)
Expand Down Expand Up @@ -68,7 +74,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
} else {
go func(fr *reader.FileReader, filename string) {
numReadFailed, err := fr.Read()
statsMgr.NumReadFailed += numReadFailed
statsMgr.Stats.NumReadFailed += numReadFailed
if err != nil {
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(filename)
Expand All @@ -83,10 +89,28 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
<-statsMgr.DoneCh

r.Readers = nil
r.NumFailed = statsMgr.NumFailed
r.NumFailed = statsMgr.Stats.NumFailed

if statsMgr.NumFailed > 0 {
if statsMgr.Stats.NumFailed > 0 {
r.errs = append(r.errs, errors.Wrap(errors.NotCompleteError,
fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.NumFailed)))
fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.Stats.NumFailed)))
}
}

func (r *Runner) QueryStats() *stats.Stats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about return stats.Stats nor *stats.Stats?

if r.stataMgr != nil {
if r.Readers == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to check the r.Readers?

return &r.stataMgr.Stats
}
r.stataMgr.StatsCh <- base.NewOutputStats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add chan when it can be read directly?

select {
case stats, ok := <-r.stataMgr.OutputStatsCh:
if !ok {
return nil
}
return &stats
}
} else {
return nil
}
}
57 changes: 50 additions & 7 deletions pkg/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"bufio"
"encoding/csv"
"fmt"
"io"
"os"
"reflect"
"unsafe"

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/config"
Expand All @@ -15,17 +18,37 @@ type CSVReader struct {
CSVConfig *config.CSVConfig
reader *csv.Reader
lineNum uint64
rr *recordReader
br *bufio.Reader
}

type recordReader struct {
io.Reader
remainingBytes int
}

func (r *recordReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.remainingBytes += n
return
}

func (r *CSVReader) InitReader(file *os.File) {
r.reader = csv.NewReader(bufio.NewReader(file))
r.rr = &recordReader{
Reader: bufio.NewReader(file),
}
r.reader = csv.NewReader(r.rr)
if r.CSVConfig.Delimiter != nil {
d := []rune(*r.CSVConfig.Delimiter)
if len(d) > 0 {
r.reader.Comma = d[0]
logger.Infof("The delimiter of %s is %#U", file.Name(), r.reader.Comma)
}
}
rf := reflect.ValueOf(r.reader).Elem().FieldByName("r")
rf = reflect.NewAt(rf.Type(), unsafe.Pointer(rf.UnsafeAddr())).Elem()
br := rf.Interface().(*bufio.Reader)
r.br = br
}

func (r *CSVReader) ReadLine() (base.Data, error) {
Expand All @@ -36,25 +59,45 @@ func (r *CSVReader) ReadLine() (base.Data, error) {
}

r.lineNum++
n := r.rr.remainingBytes - r.br.Buffered()
r.rr.remainingBytes -= n

if *r.CSVConfig.WithHeader && r.lineNum == 1 {
if *r.CSVConfig.WithLabel {
return base.HeaderData(line[1:]), nil
return base.HeaderData(line[1:], n), nil
} else {
return base.HeaderData(line), nil
return base.HeaderData(line, n), nil
}
}

if *r.CSVConfig.WithLabel {
switch line[0] {
case "+":
return base.InsertData(line[1:]), nil
return base.InsertData(line[1:], n), nil
case "-":
return base.DeleteData(line[1:]), nil
return base.DeleteData(line[1:], n), nil
default:
return base.Data{}, fmt.Errorf("Invalid label: %s", line[0])
return base.Data{
Bytes: n,
}, fmt.Errorf("Invalid label: %s", line[0])
}
} else {
return base.InsertData(line), nil
return base.InsertData(line, n), nil
}
}

func CountFileBytes(path string) (int64, error) {
file, err := os.Open(path)
Copy link
Contributor

@veezhang veezhang Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistical methods of the two functions are different.

  • countLineBytes is the sum of bytes of the csv field.
  • CountFileBytes is to get the file size and then subtract the header bytes.

You can take statistics bytes and don't care about headers.

  • countLineBytes get the bytes read.
  • CountFileBytes get file size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var _ io.Reader = (*recordReader)(nil)

type recordReader struct {
	io.Reader
	lastReadBytes int
}

func (r *recordReader) Read(p []byte) (n int, err error) {
	n, err = r.Reader.Read(p)
	r.lastReadBytes = n
	return
}
func (r *CSVReader) InitReader(file *os.File) {
	r.rr = &recordReader{Reader: bufio.NewReader(file)}
	r.reader = csv.NewReader(r.rr)
	...
}

func (r *CSVReader) ReadLine() (base.Data, error) {
    line, err := r.reader.Read()
    // r.rr.lastReadBytes // get the last read bytes 
    ...
}

defer file.Close()
if err != nil {
logger.Errorf("count bytes fail: %s", path)
return 0, err
}
stat, err := file.Stat()
if err != nil {
logger.Errorf("count bytes fail: %s", path)
return 0, err
}
bytesCount := stat.Size()
return bytesCount, nil
}
6 changes: 5 additions & 1 deletion pkg/errhandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (w *Handler) Init(file *config.File, concurrency int, cleanup bool) (chan b
} else {
dataWriter.Write(rawErr.Data)
logger.Error(rawErr.Error.Error())
w.statsCh <- base.NewFailureStats(len(rawErr.Data))
var importedBytes int64
for _, d := range rawErr.Data {
importedBytes += int64(d.Bytes)
}
w.statsCh <- base.NewFailureStats(len(rawErr.Data), importedBytes)
}
}

Expand Down
Loading