diff --git a/task/handler/runner.go b/task/handler/runner.go index cc58bad..c0c8609 100644 --- a/task/handler/runner.go +++ b/task/handler/runner.go @@ -4,7 +4,6 @@ import ( "bufio" "context" "encoding/json" - "fmt" "github.com/apex/log" "github.com/cenkalti/backoff/v4" "github.com/crawlab-team/crawlab-core/constants" @@ -17,7 +16,6 @@ import ( "github.com/crawlab-team/crawlab-core/models/models" "github.com/crawlab-team/crawlab-core/sys_exec" "github.com/crawlab-team/crawlab-core/task/fs" - "github.com/crawlab-team/crawlab-core/utils" "github.com/crawlab-team/crawlab-db/mongo" grpc "github.com/crawlab-team/crawlab-grpc" "github.com/crawlab-team/go-trace" @@ -28,6 +26,7 @@ import ( "go.uber.org/dig" "os" "os/exec" + "strings" "time" ) @@ -38,6 +37,7 @@ type Runner struct { // settings subscribeTimeout time.Duration + bufferSize int // internals cmd *exec.Cmd // process command instance @@ -53,8 +53,8 @@ type Runner struct { sub grpc.TaskService_SubscribeClient // grpc task service stream client // log internals - scannerStdout *bufio.Scanner - scannerStderr *bufio.Scanner + scannerStdout *bufio.Reader + scannerStderr *bufio.Reader logBatchSize int } @@ -251,11 +251,11 @@ func (r *Runner) configureCmd() { func (r *Runner) configureLogging() { // set stdout reader stdout, _ := r.cmd.StdoutPipe() - r.scannerStdout = bufio.NewScanner(stdout) + r.scannerStdout = bufio.NewReaderSize(stdout, r.bufferSize) // set stderr reader stderr, _ := r.cmd.StderrPipe() - r.scannerStderr = bufio.NewScanner(stderr) + r.scannerStderr = bufio.NewReaderSize(stderr, r.bufferSize) } func (r *Runner) startLogging() { @@ -267,44 +267,25 @@ func (r *Runner) startLogging() { } func (r *Runner) startLoggingReaderStdout() { - utils.LogDebug("begin startLoggingReaderStdout") - var lines []string - for r.scannerStdout.Scan() { - line := r.scannerStdout.Text() - lines = append(lines, line) - if len(lines) >= r.logBatchSize { - r.writeLogLines(lines) - utils.LogDebug(fmt.Sprintf("scannerStdout lines: %s", lines)) - lines = []string{} + for { + line, err := r.scannerStdout.ReadString(byte('\n')) + if err != nil { + break } + line = strings.TrimSuffix(line, "\n") + r.writeLogLines([]string{line}) } - if len(lines) > 0 { - r.writeLogLines(lines) - utils.LogDebug(fmt.Sprintf("scannerStdout lines: %s", lines)) - - } - // reach end - utils.LogDebug("scannerStdout reached end") } func (r *Runner) startLoggingReaderStderr() { - utils.LogDebug("begin startLoggingReaderStderr") - var lines []string - for r.scannerStderr.Scan() { - line := r.scannerStderr.Text() - lines = append(lines, line) - if len(lines) >= r.logBatchSize { - r.writeLogLines(lines) - utils.LogDebug(fmt.Sprintf("scannerStderr lines: %s", lines)) - lines = []string{} + for { + line, err := r.scannerStderr.ReadString(byte('\n')) + if err != nil { + break } + line = strings.TrimSuffix(line, "\n") + r.writeLogLines([]string{line}) } - if len(lines) > 0 { - r.writeLogLines(lines) - utils.LogDebug(fmt.Sprintf("scannerStderr lines: %s", lines)) - } - // reach end - utils.LogDebug("scannerStderr reached end") } func (r *Runner) startHealthCheck() { @@ -603,6 +584,7 @@ func NewTaskRunner(id primitive.ObjectID, svc interfaces.TaskHandlerService, opt // runner r := &Runner{ subscribeTimeout: 30 * time.Second, + bufferSize: 1024 * 1024, svc: svc, tid: id, ch: make(chan constants.TaskSignal), diff --git a/task/log/file_driver.go b/task/log/file_driver.go index 0024738..f127c79 100644 --- a/task/log/file_driver.go +++ b/task/log/file_driver.go @@ -88,10 +88,16 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li } defer f.Close() - sc := bufio.NewScanner(f) + sc := bufio.NewReaderSize(f, 1024*1024*10) i := -1 - for sc.Scan() { + for { + line, err := sc.ReadString(byte('\n')) + if err != nil { + break + } + line = strings.TrimSuffix(line, "\n") + i++ if i < skip { @@ -102,12 +108,8 @@ func (d *FileLogDriver) Find(id string, pattern string, skip int, limit int) (li break } - line := sc.Text() lines = append(lines, line) } - if err := sc.Err(); err != nil { - return nil, trace.TraceError(err) - } return lines, nil }