diff --git a/pkg/logging/cri_logger.go b/pkg/logging/cri_logger.go index 081d896668b..c47606b2aa0 100644 --- a/pkg/logging/cri_logger.go +++ b/pkg/logging/cri_logger.go @@ -25,7 +25,6 @@ package logging import ( "bufio" "bytes" - "context" "errors" "fmt" "io" @@ -74,13 +73,13 @@ func viewLogsCRI(lvopts LogViewOptions, stdout, stderr io.Writer, stopChannel ch return fmt.Errorf("logpath is nil ") } - return ReadLogs(context.Background(), &lvopts, stdout, stderr) + return ReadLogs(&lvopts, stdout, stderr, stopChannel) } // ReadLogs read the container log and redirect into stdout and stderr. // Note that containerID is only needed when following the log, or else // just pass in empty string "". -func ReadLogs(ctx context.Context, opts *LogViewOptions, stdout, stderr io.Writer) error { +func ReadLogs(opts *LogViewOptions, stdout, stderr io.Writer, stopChannel chan os.Signal) error { var logPath = opts.LogPath evaluated, err := filepath.EvalSymlinks(logPath) if err != nil { @@ -113,56 +112,62 @@ func ReadLogs(ctx context.Context, opts *LogViewOptions, stdout, stderr io.Write writer := newLogWriter(stdout, stderr, opts) msg := &logMessage{} for { - if stop || (limitedMode && limitedNum == 0) { - logrus.Debugf("Finished parsing log file, path; %s", logPath) + select { + case <-stopChannel: + logrus.Debugf("received stop signal while reading cri logfile, returning") return nil - } - l, err := r.ReadBytes(eol[0]) - if err != nil { - if err != io.EOF { // This is an real error - return fmt.Errorf("failed to read log file %q: %v", logPath, err) + default: + if stop || (limitedMode && limitedNum == 0) { + logrus.Debugf("finished parsing log file, path: %s", logPath) + return nil } - if opts.Follow { + l, err := r.ReadBytes(eol[0]) + if err != nil { + if err != io.EOF { // This is an real error + return fmt.Errorf("failed to read log file %q: %v", logPath, err) + } + if opts.Follow { + + // Reset seek so that if this is an incomplete line, + // it will be read again. + if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { + return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err) + } - // Reset seek so that if this is an incomplete line, - // it will be read again. - if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { - return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err) + // If the container exited consume data until the next EOF + continue } + // Should stop after writing the remaining content. + stop = true + if len(l) == 0 { + continue + } + logrus.Debugf("incomplete line in log file, path: %s, line: %s", logPath, l) + } - // If the container exited consume data until the next EOF + // Parse the log line. + msg.reset() + if err := ParseCRILog(l, msg); err != nil { + logrus.WithError(err).Errorf("failed when parsing line in log file, path: %s, line: %s", logPath, l) continue } - // Should stop after writing the remaining content. - stop = true - if len(l) == 0 { - continue + // Write the log line into the stream. + if err := writer.write(msg, isNewLine); err != nil { + if err == errMaximumWrite { + logrus.Debugf("finished parsing log file, hit bytes limit path: %s", logPath) + return nil + } + logrus.WithError(err).Errorf("failed when writing line to log file, path: %s, line: %s", logPath, l) + return err } - logrus.Debugf("Incomplete line in log file, path: %s line: %s", logPath, l) - } - - // Parse the log line. - msg.reset() - if err := ParseCRILog(l, msg); err != nil { - logrus.WithError(err).Errorf("Failed when parsing line in log file, path: %s line: %s", logPath, l) - continue - } - // Write the log line into the stream. - if err := writer.write(msg, isNewLine); err != nil { - if err == errMaximumWrite { - logrus.Debugf("Finished parsing log file, hit bytes limit path: %s", logPath) - return nil + if limitedMode { + limitedNum-- + } + if len(msg.log) > 0 { + isNewLine = msg.log[len(msg.log)-1] == eol[0] + } else { + isNewLine = true } - logrus.WithError(err).Errorf("Failed when writing line to log file, path: %s line: %s", logPath, l) - return err - } - if limitedMode { - limitedNum-- - } - if len(msg.log) > 0 { - isNewLine = msg.log[len(msg.log)-1] == eol[0] - } else { - isNewLine = true } } } diff --git a/pkg/logging/cri_logger_test.go b/pkg/logging/cri_logger_test.go index daa469d1aab..fff79506ae6 100644 --- a/pkg/logging/cri_logger_test.go +++ b/pkg/logging/cri_logger_test.go @@ -25,7 +25,6 @@ package logging import ( "bufio" "bytes" - "context" "fmt" "io" "os" @@ -43,6 +42,8 @@ func TestReadLogs(t *testing.T) { file.WriteString(`2016-10-06T00:17:09.669794202Z stdout F line1` + "\n") file.WriteString(`2016-10-06T00:17:10.669794202Z stdout F line2` + "\n") file.WriteString(`2016-10-06T00:17:11.669794202Z stdout F line3` + "\n") + + stopChan := make(chan os.Signal) testCases := []struct { name string logViewOptions LogViewOptions @@ -85,7 +86,7 @@ func TestReadLogs(t *testing.T) { t.Run(tc.name, func(t *testing.T) { stdoutBuf := bytes.NewBuffer(nil) stderrBuf := bytes.NewBuffer(nil) - err = ReadLogs(context.TODO(), &tc.logViewOptions, stdoutBuf, stderrBuf) + err = ReadLogs(&tc.logViewOptions, stdoutBuf, stderrBuf, stopChan) if err != nil { t.Fatalf(err.Error()) @@ -177,6 +178,8 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) { t.Fatalf("unable to create temp file") } + stopChan := make(chan os.Signal) + count := 10000 for i := 0; i < count; i++ { @@ -198,7 +201,7 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) { var buf bytes.Buffer w := io.MultiWriter(&buf) - err = ReadLogs(context.Background(), &LogViewOptions{LogPath: tmpfile.Name(), Tail: 0, Timestamps: true}, w, w) + err = ReadLogs(&LogViewOptions{LogPath: tmpfile.Name(), Tail: 0, Timestamps: true}, w, w, stopChan) if err != nil { t.Errorf("ReadLogs file %s failed %s", tmpfile.Name(), err.Error()) }