From 11abb982fafeea36879767bbd7df95cba549fd20 Mon Sep 17 00:00:00 2001 From: cfbber <4050@xx.d> Date: Tue, 12 Nov 2024 15:44:04 +0800 Subject: [PATCH 1/7] support customer line delimeter --- main.go | 12 +++++++++--- reader/reader.go | 9 +++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index ecff344..3b473ba 100644 --- a/main.go +++ b/main.go @@ -81,8 +81,8 @@ var ( retryInfo map[int]int showVersion bool queueSize int - - bufferPool = sync.Pool{ + line_delimiter byte + bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 0, bufferSize) }, @@ -186,6 +186,7 @@ func initFlags() { fmt.Println("retry_times: ", maxRetryTimes) fmt.Println("retry_interval: ", retryInterval) fmt.Println("queue_size: ", queueSize) + fmt.Println("LineDelimiter: ", string(line_delimiter)) } utils.InitLog(logLevel) @@ -253,6 +254,11 @@ func paramCheck() { if strings.ToLower(kv[0]) == "format" && strings.ToLower(kv[1]) != "csv" { enableConcurrency = false } + + if strings.ToLower(kv[0]) == "line_delimiter" && len(kv[1]) == 1 { + line_delimiter = kv[1][0] + } + if len(kv) > 2 { headers[kv[0]] = strings.Join(kv[1:], ":") } else { @@ -369,7 +375,7 @@ func main() { streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, &retryInfo) reporter.Report() defer reporter.CloseWait() - reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount) + reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, line_delimiter) reader.Close() streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime) diff --git a/reader/reader.go b/reader/reader.go index 634b9f0..288290f 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -27,9 +27,9 @@ import ( "sync/atomic" "time" - log "github.com/sirupsen/logrus" - report "doris-streamloader/report" loader "doris-streamloader/loader" + report "doris-streamloader/report" + log "github.com/sirupsen/logrus" ) type FileReader struct { @@ -108,7 +108,8 @@ func NewFileReader(filePaths string, batchRows int, batchBytes int, bufferSize i } // Read File -func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTask int, retryInfo *map[int]int, loadResp *loader.Resp, retryCount int) { +func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTask int, retryInfo *map[int]int, + loadResp *loader.Resp, retryCount int, lineDelimiter byte) { index := 0 data := f.pool.Get().([]byte) count := f.batchRows @@ -129,7 +130,7 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas if atomic.LoadUint64(&reporter.FinishedWorkers) == atomic.LoadUint64(&reporter.TotalWorkers) { return } - line, err := reader.ReadBytes('\n') + line, err := reader.ReadBytes(lineDelimiter) if err == io.EOF { file.Close() break From 344476d2f3bf3cddc14ade977453177815ee72db Mon Sep 17 00:00:00 2001 From: cfbber Date: Tue, 12 Nov 2024 16:07:49 +0800 Subject: [PATCH 2/7] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=88=86=E5=89=B2?= =?UTF-8?q?=E7=AC=A6=E4=B8=8D=E4=B8=BA=E6=8D=A2=E8=A1=8C=E7=AC=A6=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E6=BC=8F=E6=95=B0=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 10 ++++++---- reader/reader.go | 8 +++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index 3b473ba..088ff4c 100644 --- a/main.go +++ b/main.go @@ -186,7 +186,6 @@ func initFlags() { fmt.Println("retry_times: ", maxRetryTimes) fmt.Println("retry_interval: ", retryInterval) fmt.Println("queue_size: ", queueSize) - fmt.Println("LineDelimiter: ", string(line_delimiter)) } utils.InitLog(logLevel) @@ -255,9 +254,12 @@ func paramCheck() { enableConcurrency = false } - if strings.ToLower(kv[0]) == "line_delimiter" && len(kv[1]) == 1 { - line_delimiter = kv[1][0] - } + if strings.ToLower(kv[0]) == "line_delimiter" { + if len(kv[1]) == 1 { + line_delimiter = kv[1][0] + } else { + log.Errorf("line_delimiter invalid: %s", line_delimiter) + } if len(kv) > 2 { headers[kv[0]] = strings.Join(kv[1:], ":") diff --git a/reader/reader.go b/reader/reader.go index 288290f..55d40b4 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -126,14 +126,20 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas for _, file := range f.files { loadResp.LoadFiles = append(loadResp.LoadFiles, file.Name()) reader := bufio.NewReaderSize(file, f.bufferSize) + + var breakFlag bool = false for { + if breakFlag { + break + } + if atomic.LoadUint64(&reporter.FinishedWorkers) == atomic.LoadUint64(&reporter.TotalWorkers) { return } line, err := reader.ReadBytes(lineDelimiter) if err == io.EOF { file.Close() - break + breakFlag = true } else if err != nil { log.Errorf("Read file failed, error message: %v, before retrying, we suggest:\n1.Check the input data files and fix if there is any problem.\n2.Do select count(*) to check whether data is partially loaded.\n3.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n4.Otherwise, just retry.\n", err) os.Exit(1) From 83f56ed0ea2c9a0db8b2ef5fbb2b998658f031b9 Mon Sep 17 00:00:00 2001 From: cfbber Date: Tue, 12 Nov 2024 16:27:04 +0800 Subject: [PATCH 3/7] 1 --- main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 088ff4c..b28b270 100644 --- a/main.go +++ b/main.go @@ -254,12 +254,13 @@ func paramCheck() { enableConcurrency = false } - if strings.ToLower(kv[0]) == "line_delimiter" { + if strings.ToLower(kv[0]) == "line_delimiter" { if len(kv[1]) == 1 { line_delimiter = kv[1][0] } else { log.Errorf("line_delimiter invalid: %s", line_delimiter) } + } if len(kv) > 2 { headers[kv[0]] = strings.Join(kv[1:], ":") From 33750a32927dc85fc9cf33836a6fb4c0f8e4834f Mon Sep 17 00:00:00 2001 From: cfbber Date: Fri, 15 Nov 2024 16:24:00 +0800 Subject: [PATCH 4/7] Supports escape characters --- main.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index b28b270..115dff5 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "os" + "regexp" "strconv" "strings" "sync" @@ -191,6 +192,21 @@ func initFlags() { utils.InitLog(logLevel) } +// Restore hex escape sequences like \xNN to their corresponding characters +func restoreHexEscapes(s1 string) (string, error) { + + re := regexp.MustCompile(`\\x([0-9A-Fa-f]{2})`) + + return re.ReplaceAllStringFunc(s1, func(match string) string { + hexValue := match[2:] // Remove the \x prefix + decValue, err := strconv.ParseInt(hexValue, 16, 0) + if err != nil { + return match + } + return string(rune(decValue)) + }), nil +} + //go:generate go run gen_version.go func paramCheck() { if showVersion { @@ -255,11 +271,15 @@ func paramCheck() { } if strings.ToLower(kv[0]) == "line_delimiter" { - if len(kv[1]) == 1 { - line_delimiter = kv[1][0] + + restored, err := restoreHexEscapes(kv[1]) + if err != nil || len(restored) != 1 { + line_delimiter = '\n' + log.Errorf("line_delimiter invalid: %s", kv[1]) } else { - log.Errorf("line_delimiter invalid: %s", line_delimiter) + line_delimiter = restored[0] } + } if len(kv) > 2 { From b80bcb17c6b1d2ce384a37bf5e2a8635f0b963aa Mon Sep 17 00:00:00 2001 From: cfbber Date: Fri, 15 Nov 2024 17:01:46 +0800 Subject: [PATCH 5/7] Fix TotalRows count issue --- reader/reader.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/reader/reader.go b/reader/reader.go index 55d40b4..a62f5af 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -29,6 +29,7 @@ import ( loader "doris-streamloader/loader" report "doris-streamloader/report" + log "github.com/sirupsen/logrus" ) @@ -127,19 +128,14 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas loadResp.LoadFiles = append(loadResp.LoadFiles, file.Name()) reader := bufio.NewReaderSize(file, f.bufferSize) - var breakFlag bool = false for { - if breakFlag { - break - } - if atomic.LoadUint64(&reporter.FinishedWorkers) == atomic.LoadUint64(&reporter.TotalWorkers) { return } line, err := reader.ReadBytes(lineDelimiter) - if err == io.EOF { + if err == io.EOF && len(line) == 0 { file.Close() - breakFlag = true + break } else if err != nil { log.Errorf("Read file failed, error message: %v, before retrying, we suggest:\n1.Check the input data files and fix if there is any problem.\n2.Do select count(*) to check whether data is partially loaded.\n3.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n4.Otherwise, just retry.\n", err) os.Exit(1) From 339e52c3e1d73adbfd6d24df272017c6ebec37a5 Mon Sep 17 00:00:00 2001 From: cfbber Date: Mon, 18 Nov 2024 09:55:13 +0800 Subject: [PATCH 6/7] Standardized Naming --- main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 115dff5..93930a4 100644 --- a/main.go +++ b/main.go @@ -82,7 +82,7 @@ var ( retryInfo map[int]int showVersion bool queueSize int - line_delimiter byte + lineDelimiter byte bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 0, bufferSize) @@ -274,10 +274,10 @@ func paramCheck() { restored, err := restoreHexEscapes(kv[1]) if err != nil || len(restored) != 1 { - line_delimiter = '\n' + lineDelimiter = '\n' log.Errorf("line_delimiter invalid: %s", kv[1]) } else { - line_delimiter = restored[0] + lineDelimiter = restored[0] } } @@ -398,7 +398,7 @@ func main() { streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, &retryInfo) reporter.Report() defer reporter.CloseWait() - reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, line_delimiter) + reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, lineDelimiter) reader.Close() streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime) From 59fe6805c0b5cffe08fb28925b2a7154635fa58f Mon Sep 17 00:00:00 2001 From: cfbber Date: Thu, 21 Nov 2024 15:47:34 +0800 Subject: [PATCH 7/7] Specify a default delimiter and optimize error messages. --- main.go | 7 +++++-- reader/reader.go | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 93930a4..38d8799 100644 --- a/main.go +++ b/main.go @@ -82,7 +82,7 @@ var ( retryInfo map[int]int showVersion bool queueSize int - lineDelimiter byte + lineDelimiter byte = '\n' bufferPool = sync.Pool{ New: func() interface{} { return make([]byte, 0, bufferSize) @@ -194,6 +194,9 @@ func initFlags() { // Restore hex escape sequences like \xNN to their corresponding characters func restoreHexEscapes(s1 string) (string, error) { + if s1 == `\n` { + return "\n", nil + } re := regexp.MustCompile(`\\x([0-9A-Fa-f]{2})`) @@ -274,8 +277,8 @@ func paramCheck() { restored, err := restoreHexEscapes(kv[1]) if err != nil || len(restored) != 1 { - lineDelimiter = '\n' log.Errorf("line_delimiter invalid: %s", kv[1]) + os.Exit(1) } else { lineDelimiter = restored[0] } diff --git a/reader/reader.go b/reader/reader.go index a62f5af..08f24e7 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -138,6 +138,9 @@ func (f *FileReader) Read(reporter *report.Reporter, workers int, maxBytesPerTas break } else if err != nil { log.Errorf("Read file failed, error message: %v, before retrying, we suggest:\n1.Check the input data files and fix if there is any problem.\n2.Do select count(*) to check whether data is partially loaded.\n3.If the data is partially loaded and duplication is unacceptable, consider dropping the table (with caution that all data in the table will be lost) and retry.\n4.Otherwise, just retry.\n", err) + if len(line) !=0 { + log.Error("5.When using a specified line delimiter, the file must end with that delimiter.") + } os.Exit(1) }