Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsMgr information get for studio #194

Merged
merged 8 commits into from
Mar 14, 2022
Merged

statsMgr information get for studio #194

merged 8 commits into from
Mar 14, 2022

Conversation

xjlgod
Copy link
Contributor

@xjlgod xjlgod commented Jan 26, 2022

At present, Nebula Studio is doing multi task import. It need to know the total number of imported lines and the number of imported lines of each task.

@CLAassistant
Copy link

CLAassistant commented Jan 26, 2022

CLA assistant check
All committers have signed the CLA.

.idea/.gitignore Outdated
@@ -0,0 +1,8 @@
# Default ignored files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove files in .idea directory.

Comment on lines 38 to 48
var wg sync.WaitGroup
wg.Add(numReadingFiles)
for _, file := range files {
path := file.FailDataPath
go func(path string) {
totalLine += countLines(path)
defer wg.Done()
}(*path)
}
wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be no need for multiple goroutine here?

@@ -92,3 +133,16 @@ func (s *StatsMgr) startWorker(numReadingFiles int) {
}
}
}

func countLines(path string) int64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause the file to be read twice, will it affect performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the size of the file be considered, not the number of lines in the file? Add TotalSize, ProcessedSize?

func countLines(path string) int64 {
file, err := os.Open(path)
if err != nil {
return 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't eat err?

totalCount int64
totalBatches int64
totalLatency int64
totalReqTime int64
}

func NewStatsMgr(numReadingFiles int) *StatsMgr {
type StatsQuery struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about define StatsMgr as follows?

type StatsMgr struct {
	StatsCh       chan base.Stats
	DoneCh        chan bool
	statsRW       sync.RWMutex
  stats         Stats
}

type Stats struct {
	NumFailed     int64
	NumReadFailed int64
	TotalLine     int64
	TotalCount    int64
	TotalBatches  int64
	TotalLatency  int64
	TotalReqTime  int64
}

Then, you can define a function func (s *StatsMgr) Stats() Stats replace the current StatsQuery function.

Copy link
Contributor

@jievince jievince left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job!

.gitignore Outdated
@@ -21,3 +21,4 @@ nebula-importer

# IDE
.vscode/
.idea/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please configure your ide/editor to auto add a new blank line at the end of a file

Comment on lines 144 to 147
lineCount := int64(0)
for fileScanner.Scan() {
lineCount++
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jievince jievince marked this pull request as ready for review January 26, 2022 10:06
@veezhang veezhang self-requested a review February 21, 2022 06:53
Comment on lines 151 to 152
// There is no \n in the last line
count := 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know?
Maybe ?

a
b

or

a
b

@@ -92,3 +139,27 @@ func (s *StatsMgr) startWorker(numReadingFiles int) {
}
}
}

func countLines(path string) (int64, error) {
Copy link
Contributor

@veezhang veezhang Feb 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be considered?

type CSVConfig struct {
	WithHeader *bool   `json:"withHeader" yaml:"withHeader"`
	WithLabel  *bool   `json:"withLabel" yaml:"withLabel"`
	Delimiter  *string `json:"delimiter" yaml:"delimiter"`
}

https://github.com/vesoft-inc/nebula-importer/blob/master/pkg/config/config.go#L86-L90

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jievince How do you think?

veezhang
veezhang previously approved these changes Feb 28, 2022
Copy link
Contributor

@veezhang veezhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

path := file.Path
withHeader := file.CSV.WithHeader
go func(path string) {
lines, err := csv.CountLines(path, *withHeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think counting file lines is impractical for huge file in new coroutine. Why not to do this when reading these data files?

@xjlgod
Copy link
Contributor Author

xjlgod commented Mar 1, 2022

Because Studio needs to know in advance how many lines the file needs to be imported , before importing.

}

func CountFileBytes(path string, withHeader bool) (int64, error) {
file, err := os.Open(path)
Copy link
Contributor

@veezhang veezhang Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistical methods of the two functions are different.

  • countLineBytes is the sum of bytes of the csv field.
  • CountFileBytes is to get the file size and then subtract the header bytes.

You can take statistics bytes and don't care about headers.

  • countLineBytes get the bytes read.
  • CountFileBytes get file size.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var _ io.Reader = (*recordReader)(nil)

type recordReader struct {
	io.Reader
	lastReadBytes int
}

func (r *recordReader) Read(p []byte) (n int, err error) {
	n, err = r.Reader.Read(p)
	r.lastReadBytes = n
	return
}
func (r *CSVReader) InitReader(file *os.File) {
	r.rr = &recordReader{Reader: bufio.NewReader(file)}
	r.reader = csv.NewReader(r.rr)
	...
}

func (r *CSVReader) ReadLine() (base.Data, error) {
    line, err := r.reader.Read()
    // r.rr.lastReadBytes // get the last read bytes 
    ...
}

Latency int64
ReqTime int64
BatchSize int
ImportedBytesNum int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use ImportedBytes ?

@@ -34,26 +34,30 @@ func (op OpType) String() string {
type Data struct {
Type OpType
Record Record
BytesNum int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use Bytes ?

veezhang
veezhang previously approved these changes Mar 9, 2022
Copy link
Contributor

@veezhang veezhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

s.totalCount += int64(stat.BatchSize)
s.totalReqTime += stat.ReqTime
s.totalLatency += stat.Latency
s.statsRW.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this lock? Are these stats not be sent by one channel?

@yixinglu
Copy link
Contributor

yixinglu commented Mar 9, 2022

please sign the CLA firstly!

@xjlgod
Copy link
Contributor Author

xjlgod commented Mar 9, 2022

please sign the CLA firstly!

Ok, I hava signed it.

yixinglu
yixinglu previously approved these changes Mar 9, 2022
@veezhang
Copy link
Contributor

veezhang commented Mar 10, 2022

It's need not to use reflect as follows:

package main

import (
	"bufio"
	"encoding/csv"
	"fmt"
	"io"
	"os"
)

type ProgressReader struct {
	r              io.Reader
	totalBytes     int64
	bytes          int64
	remainingBytes int64
}

func NewProgressReader(totalBytes int64, r io.Reader) *ProgressReader {
	if totalBytes <= 0 {
		panic("the totalBytes must greater than 0")
	}
	return &ProgressReader{
		r:          r,
		totalBytes: totalBytes,
	}
}

func (r *ProgressReader) Read(p []byte) (n int, err error) {
	n, err = r.r.Read(p)
	r.bytes += int64(n)
	r.remainingBytes += int64(n)
	return n, err
}

func (r *ProgressReader) GetCurrentBytes(offset int64) int64 {
	n := r.remainingBytes + offset
	r.remainingBytes -= n
	return n
}

func (r *ProgressReader) Percentage() float64 {
	return 100 * float64(r.bytes) / float64(r.totalBytes)
}

func (r *ProgressReader) PercentageOffset(offset int64) float64 {
	return 100 * float64(r.bytes+offset) / float64(r.totalBytes)
}

func (r *ProgressReader) Bytes() int64 {
	return r.bytes
}

func main() {
	f, err := os.Open("/Users/veezhang/Downloads/annual-enterprise-survey-2020-financial-year-provisional-csv.csv")
	if err != nil {
		panic(err)
	}
	stat, err := f.Stat()
	if err != nil {
		panic(err)
	}

	totalBytes := stat.Size()
	pr := NewProgressReader(totalBytes, f)
	br := bufio.NewReader(pr)
	r := csv.NewReader(br)

	for {
		_, err := r.Read()
		if err != nil {
			if err == io.EOF {
				break
			}
			panic(err)
		}

		fmt.Printf(
			"%.3f %.3f %d %d\n",
			pr.Percentage(),
			pr.PercentageOffset(int64(-br.Buffered())),
			pr.GetCurrentBytes(int64(-br.Buffered())),
			-br.Buffered(),
		)
	}
}

The output :

...
100.000 99.979 192 -1245
100.000 99.982 186 -1059
100.000 99.985 170 -889
100.000 99.988 168 -721
100.000 99.991 192 -529
100.000 99.994 173 -356
100.000 99.997 178 -178
100.000 100.000 178 0

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/config"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
"io"
"os"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort it ?

Comment on lines 91 to 95
func (r *CSVReader) TotalBytes() (int64) {
for {
if r.initComplete {
return r.totalBytes
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's inappropriate?

if r.Readers == nil {
return &r.stataMgr.Stats
}
r.stataMgr.StatsCh <- base.NewOutputStats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add chan when it can be read directly?


func (r *Runner) QueryStats() *stats.Stats {
if r.stataMgr != nil {
if r.Readers == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to check the r.Readers?

}
}

func (r *Runner) QueryStats() *stats.Stats {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about return stats.Stats nor *stats.Stats?

Copy link
Contributor

@veezhang veezhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yixinglu yixinglu merged commit ece1a79 into vesoft-inc:master Mar 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants