Skip to content

Commit

Permalink
statsMgr information get for studio
Browse files Browse the repository at this point in the history
  • Loading branch information
xjlgod committed Mar 9, 2022
1 parent 1dd8e73 commit 7ed0efb
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ nebula-importer

# IDE
.vscode/
.idea/
24 changes: 17 additions & 7 deletions pkg/base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,35 @@ const (
SUCCESS StatType = 0
FAILURE StatType = 1
FILEDONE StatType = 2
OUTPUT StatType = 3
)

const STAT_FILEDONE string = "FILEDONE"

type Stats struct {
Type StatType
Latency int64
ReqTime int64
BatchSize int
Filename string
Type StatType
Latency int64
ReqTime int64
BatchSize int
ImportedBytes int64
Filename string
}

func NewSuccessStats(latency int64, reqTime int64, batchSize int) Stats {
func NewSuccessStats(latency int64, reqTime int64, batchSize int, importedBytes int64) Stats {
return Stats{
Type: SUCCESS,
Latency: latency,
ReqTime: reqTime,
BatchSize: batchSize,
ImportedBytes: importedBytes,
}
}

func NewFailureStats(batchSize int) Stats {
func NewFailureStats(batchSize int, importedBytes int64) Stats {
return Stats{
Type: FAILURE,
BatchSize: batchSize,
ImportedBytes: importedBytes,
}
}

Expand All @@ -43,3 +47,9 @@ func NewFileDoneStats(filename string) Stats {
Filename: fpath,
}
}

func NewOutputStats() Stats {
return Stats{
Type: OUTPUT,
}
}
10 changes: 7 additions & 3 deletions pkg/base/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,30 @@ func (op OpType) String() string {
type Data struct {
Type OpType
Record Record
Bytes int
}

func InsertData(record Record) Data {
func InsertData(record Record, bytes int) Data {
return Data{
Type: INSERT,
Record: record,
Bytes: bytes,
}
}

func DeleteData(record Record) Data {
func DeleteData(record Record, bytes int) Data {
return Data{
Type: DELETE,
Record: record,
Bytes: bytes,
}
}

func HeaderData(record Record) Data {
func HeaderData(record Record, bytes int) Data {
return Data{
Type: HEADER,
Record: record,
Bytes: bytes,
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ func (p *ClientPool) startWorker(i int) {
}
} else {
timeInMs := time.Since(now).Nanoseconds() / 1e3
p.statsCh <- base.NewSuccessStats(int64(resp.GetLatency()), timeInMs, len(data.Data))
var importedBytes int64
for _, d := range data.Data {
importedBytes += int64(d.Bytes)
}
p.statsCh <- base.NewSuccessStats(int64(resp.GetLatency()), timeInMs, len(data.Data), importedBytes)
}
}
}
34 changes: 29 additions & 5 deletions pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type Runner struct {
errs []error
Readers []*reader.FileReader
stataMgr *stats.StatsMgr
NumFailed int64
}

Expand All @@ -28,6 +29,10 @@ func (r *Runner) Error() error {
return r.errs[0]
}

func (r *Runner) Errors() []error {
return r.errs
}

func (r *Runner) Run(yaml *config.YAMLConfig) {
defer func() {
if re := recover(); re != nil {
Expand All @@ -39,7 +44,8 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
logger.Init(*yaml.LogPath)
}

statsMgr := stats.NewStatsMgr(len(yaml.Files))
statsMgr := stats.NewStatsMgr(yaml.Files)
r.stataMgr = statsMgr
defer statsMgr.Close()

clientMgr, err := client.NewNebulaClientMgr(yaml.NebulaClientSettings, statsMgr.StatsCh)
Expand Down Expand Up @@ -68,7 +74,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
} else {
go func(fr *reader.FileReader, filename string) {
numReadFailed, err := fr.Read()
statsMgr.NumReadFailed += numReadFailed
statsMgr.Stats.NumReadFailed += numReadFailed
if err != nil {
r.errs = append(r.errs, err)
statsMgr.StatsCh <- base.NewFileDoneStats(filename)
Expand All @@ -83,10 +89,28 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
<-statsMgr.DoneCh

r.Readers = nil
r.NumFailed = statsMgr.NumFailed
r.NumFailed = statsMgr.Stats.NumFailed

if statsMgr.NumFailed > 0 {
if statsMgr.Stats.NumFailed > 0 {
r.errs = append(r.errs, errors.Wrap(errors.NotCompleteError,
fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.NumFailed)))
fmt.Errorf("Total %d lines fail to insert into nebula graph database", statsMgr.Stats.NumFailed)))
}
}

func (r *Runner) QueryStats() *stats.Stats {
if r.stataMgr != nil {
if r.Readers == nil {
return &r.stataMgr.Stats
}
r.stataMgr.StatsCh <- base.NewOutputStats()
select {
case stats, ok := <- r.stataMgr.OutputStatsCh:
if !ok {
return nil
}
return &stats
}
} else {
return nil
}
}
57 changes: 50 additions & 7 deletions pkg/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"bufio"
"encoding/csv"
"fmt"
"io"
"os"
"reflect"
"unsafe"

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/config"
Expand All @@ -15,17 +18,37 @@ type CSVReader struct {
CSVConfig *config.CSVConfig
reader *csv.Reader
lineNum uint64
rr *recordReader
br *bufio.Reader
}

type recordReader struct {
io.Reader
remainingBytes int
}

func (r *recordReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.remainingBytes += n
return
}

func (r *CSVReader) InitReader(file *os.File) {
r.reader = csv.NewReader(bufio.NewReader(file))
r.rr = &recordReader{
Reader: bufio.NewReader(file),
}
r.reader = csv.NewReader(r.rr)
if r.CSVConfig.Delimiter != nil {
d := []rune(*r.CSVConfig.Delimiter)
if len(d) > 0 {
r.reader.Comma = d[0]
logger.Infof("The delimiter of %s is %#U", file.Name(), r.reader.Comma)
}
}
rf := reflect.ValueOf(r.reader).Elem().FieldByName("r")
rf = reflect.NewAt(rf.Type(), unsafe.Pointer(rf.UnsafeAddr())).Elem()
br := rf.Interface().(*bufio.Reader)
r.br = br
}

func (r *CSVReader) ReadLine() (base.Data, error) {
Expand All @@ -36,25 +59,45 @@ func (r *CSVReader) ReadLine() (base.Data, error) {
}

r.lineNum++
n := r.rr.remainingBytes - r.br.Buffered()
r.rr.remainingBytes -= n

if *r.CSVConfig.WithHeader && r.lineNum == 1 {
if *r.CSVConfig.WithLabel {
return base.HeaderData(line[1:]), nil
return base.HeaderData(line[1:], n), nil
} else {
return base.HeaderData(line), nil
return base.HeaderData(line, n), nil
}
}

if *r.CSVConfig.WithLabel {
switch line[0] {
case "+":
return base.InsertData(line[1:]), nil
return base.InsertData(line[1:], n), nil
case "-":
return base.DeleteData(line[1:]), nil
return base.DeleteData(line[1:], n), nil
default:
return base.Data{}, fmt.Errorf("Invalid label: %s", line[0])
return base.Data{
Bytes: n,
}, fmt.Errorf("Invalid label: %s", line[0])
}
} else {
return base.InsertData(line), nil
return base.InsertData(line,n), nil
}
}

func CountFileBytes(path string) (int64, error) {
file, err := os.Open(path)
defer file.Close()
if err != nil {
logger.Errorf("count bytes fail: %s", path)
return 0, err
}
stat, err := file.Stat()
if err != nil {
logger.Errorf("count bytes fail: %s", path)
return 0, err
}
bytesCount := stat.Size()
return bytesCount, nil
}
6 changes: 5 additions & 1 deletion pkg/errhandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (w *Handler) Init(file *config.File, concurrency int, cleanup bool) (chan b
} else {
dataWriter.Write(rawErr.Data)
logger.Error(rawErr.Error.Error())
w.statsCh <- base.NewFailureStats(len(rawErr.Data))
var importedBytes int64
for _, d := range rawErr.Data {
importedBytes += int64(d.Bytes)
}
w.statsCh <- base.NewFailureStats(len(rawErr.Data), importedBytes)
}
}

Expand Down
Loading

0 comments on commit 7ed0efb

Please sign in to comment.