diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index b368bc20dda4..0da539a08bad 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2), - knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles), + knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles), seenPaths: make(map[string]struct{}, 100), }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index acb0dec0c4ba..3179363a313b 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -33,7 +33,7 @@ type Manager struct { maxBatchFiles int previousPollFiles []*reader.Reader - knownFiles []*reader.Reader + knownFiles []*reader.Metadata seenPaths map[string]struct{} currentFps []*fingerprint.Fingerprint @@ -52,9 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error { if len(offsets) > 0 { m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") m.readerFactory.FromBeginning = true - for _, offset := range offsets { - m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset}) - } + m.knownFiles = append(m.knownFiles, offsets...) } } @@ -68,12 +66,12 @@ func (m *Manager) Start(persister operator.Persister) error { return nil } -func (m *Manager) closeFiles() { - for _, r := range m.previousPollFiles { - r.Close() +func (m *Manager) closePreviousFiles() { + if forgetNum := len(m.previousPollFiles) + len(m.knownFiles) - cap(m.knownFiles); forgetNum > 0 { + m.knownFiles = m.knownFiles[forgetNum:] } - for _, r := range m.knownFiles { - r.Close() + for _, r := range m.previousPollFiles { + m.knownFiles = append(m.knownFiles, r.Close()) } } @@ -81,7 +79,12 @@ func (m *Manager) closeFiles() { func (m *Manager) Stop() error { m.cancel() m.wg.Wait() - m.closeFiles() + m.closePreviousFiles() + if m.persister != nil { + if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil { + m.Errorw("save offsets", zap.Error(err)) + } + } m.cancel = nil return nil } @@ -136,6 +139,11 @@ func (m *Manager) poll(ctx context.Context) { // Any new files that appear should be consumed entirely m.readerFactory.FromBeginning = true + if m.persister != nil { + if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil { + m.Errorw("save offsets", zap.Error(err)) + } + } } func (m *Manager) consume(ctx context.Context, paths []string) { @@ -152,7 +160,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) { // this can mean either files which were removed, or rotated into a name not matching the pattern // we do this before reading existing files to ensure we emit older log lines before newer ones m.readLostFiles(ctx, readers) + m.closePreviousFiles() + // read new readers to end var wg sync.WaitGroup for _, r := range readers { wg.Add(1) @@ -163,23 +173,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - for _, r := range m.previousPollFiles { - r.Close() - } m.previousPollFiles = readers - - m.saveCurrent(readers) - - if m.persister != nil { - rmds := make([]*reader.Metadata, 0, len(readers)) - for _, r := range readers { - rmds = append(rmds, r.Metadata) - } - if err := checkpoint.Save(ctx, m.persister, rmds); err != nil { - m.Errorw("save offsets", zap.Error(err)) - } - } - m.clearCurrentFingerprints() } @@ -257,38 +251,28 @@ func (m *Manager) clearCurrentFingerprints() { m.currentFps = make([]*fingerprint.Fingerprint, 0) } -// saveCurrent adds the readers from this polling interval to this list of -// known files, then increments the generation of all tracked old readers -// before clearing out readers that have existed for 3 generations. -func (m *Manager) saveCurrent(readers []*reader.Reader) { - forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles) - if forgetNum > 0 { - m.knownFiles = append(m.knownFiles[forgetNum:], readers...) - return - } - m.knownFiles = append(m.knownFiles, readers...) -} - func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { - // Check if the new path has the same fingerprint as an old path - if oldReader, ok := m.findFingerprintMatch(fp); ok { - return m.readerFactory.Copy(oldReader, file) + // Check previous poll cycle for match + for i := 0; i < len(m.previousPollFiles); i++ { + oldReader := m.previousPollFiles[i] + if fp.StartsWith(oldReader.Fingerprint) { + // Keep the new reader and discard the old. This ensures that if the file was + // copied to another location and truncated, our handle is updated. + m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...) + return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) + } } - // If we don't match any previously known files, create a new reader from scratch - return m.readerFactory.NewReader(file, fp) -} - -func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) { // Iterate backwards to match newest first for i := len(m.knownFiles) - 1; i >= 0; i-- { - oldReader := m.knownFiles[i] - if fp.StartsWith(oldReader.Fingerprint) { - // Remove the old reader from the list of known files. We will - // add it back in saveCurrent if it is still alive. + oldMetadata := m.knownFiles[i] + if fp.StartsWith(oldMetadata.Fingerprint) { + // Remove the old metadata from the list. We will keep updating it and save it again later. m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...) - return oldReader, true + return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) } } - return nil, false + + // If we don't match any previously known files, create a new reader from scratch + return m.readerFactory.NewReader(file, fp) } diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index c206fcc86a3c..c083122e42fe 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -17,7 +17,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) @@ -32,40 +31,27 @@ type Factory struct { TrimFunc trim.Func } +func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { + return fingerprint.New(file, f.Config.FingerprintSize) +} + func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { - m := &Metadata{ - Fingerprint: fp, - FileAttributes: map[string]any{}, - } + m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}} if f.Config.FlushTimeout > 0 { m.FlushState = &flush.State{LastDataChange: time.Now()} } - return f.build(file, m) -} - -// copy creates a deep copy of a reader -func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) { - return f.build(newFile, &Metadata{ - Fingerprint: old.Fingerprint.Copy(), - Offset: old.Offset, - FileAttributes: util.MapCopy(old.FileAttributes), - HeaderFinalized: old.HeaderFinalized, - FlushState: old.FlushState.Copy(), - }) -} - -func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { - return fingerprint.New(file, f.Config.FingerprintSize) + return f.NewReaderFromMetadata(file, m) } -func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) { +func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) { r = &Reader{ - Config: f.Config, - Metadata: m, - file: file, - fileName: file.Name(), - logger: f.SugaredLogger.With("path", file.Name()), - decoder: decode.New(f.Encoding), + Config: f.Config, + Metadata: m, + file: file, + fileName: file.Name(), + logger: f.SugaredLogger.With("path", file.Name()), + decoder: decode.New(f.Encoding), + lineSplitFunc: f.SplitFunc, } flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index c2782dec46dd..720f50b9d9d5 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -146,8 +146,8 @@ func (r *Reader) delete() { } } -// Close will close the file -func (r *Reader) Close() { +// Close will close the file and return the metadata +func (r *Reader) Close() *Metadata { if r.file != nil { if err := r.file.Close(); err != nil { r.logger.Debugw("Problem closing reader", zap.Error(err)) @@ -160,6 +160,9 @@ func (r *Reader) Close() { r.logger.Errorw("Failed to stop header pipeline", zap.Error(err)) } } + m := r.Metadata + r.Metadata = nil + return m } // Read from the file and update the fingerprint if necessary diff --git a/pkg/stanza/fileconsumer/internal/splitter/factory_test.go b/pkg/stanza/fileconsumer/internal/splitter/factory_test.go deleted file mode 100644 index 83628cff78fa..000000000000 --- a/pkg/stanza/fileconsumer/internal/splitter/factory_test.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package splitter - -import ( - "bufio" - "testing" - "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split/splittest" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" -) - -func TestFactorySplitFunc(t *testing.T) { - testCases := []struct { - name string - baseFunc bufio.SplitFunc - flushPeriod time.Duration - maxLength int - trimFunc trim.Func - input []byte - steps []splittest.Step - }{ - { - name: "ScanLinesStrict", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(len(" hello \n"), " hello "), - splittest.ExpectAdvanceToken(len(" world \n"), " world "), - }, - }, - { - name: "ScanLinesStrictWithFlush", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - flushPeriod: 100 * time.Millisecond, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(len(" hello \n"), " hello "), - splittest.ExpectAdvanceToken(len(" world \n"), " world "), - splittest.ExpectReadMore(), - splittest.Eventually(splittest.ExpectAdvanceToken(len(" extra "), " extra "), - 200*time.Millisecond, 10*time.Millisecond), - }, - }, - { - name: "ScanLinesStrictWithMaxLength", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - maxLength: 4, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(4, " hel"), - splittest.ExpectAdvanceToken(4, "lo "), - splittest.ExpectAdvanceToken(4, " wor"), - splittest.ExpectAdvanceToken(4, "ld "), - splittest.ExpectAdvanceToken(4, " ext"), - }, - }, - { - name: "ScanLinesStrictWithTrim", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - trimFunc: trim.Whitespace, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(len(" hello \n"), "hello"), - splittest.ExpectAdvanceToken(len(" world \n"), "world"), - }, - }, - { - name: "ScanLinesStrictWithFlushAndMaxLength", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - flushPeriod: 100 * time.Millisecond, - maxLength: 4, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(4, " hel"), - splittest.ExpectAdvanceToken(4, "lo "), - splittest.ExpectAdvanceToken(4, " wor"), - splittest.ExpectAdvanceToken(4, "ld "), - splittest.ExpectAdvanceToken(4, " ext"), - splittest.ExpectReadMore(), - splittest.Eventually(splittest.ExpectAdvanceToken(3, "ra "), - 200*time.Millisecond, 10*time.Millisecond), - }, - }, - { - name: "ScanLinesStrictWithFlushAndTrim", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - flushPeriod: 100 * time.Millisecond, - trimFunc: trim.Whitespace, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(len(" hello \n"), "hello"), - splittest.ExpectAdvanceToken(len(" world \n"), "world"), - splittest.ExpectReadMore(), - splittest.Eventually(splittest.ExpectAdvanceToken(len(" extra "), "extra"), - 200*time.Millisecond, 10*time.Millisecond), - }, - }, - { - name: "ScanLinesStrictWithMaxLengthAndTrim", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - flushPeriod: 0, - maxLength: 4, - trimFunc: trim.Whitespace, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(4, "hel"), // trimmed to length before whitespace - splittest.ExpectAdvanceToken(4, "lo"), - splittest.ExpectAdvanceToken(4, "wor"), - splittest.ExpectAdvanceToken(4, "ld"), - splittest.ExpectAdvanceToken(4, "ext"), - }, - }, - { - name: "ScanLinesStrictWithFlushAndMaxLengthAndTrim", - input: []byte(" hello \n world \n extra "), - baseFunc: splittest.ScanLinesStrict, - flushPeriod: 100 * time.Millisecond, - maxLength: 4, - trimFunc: trim.Whitespace, - steps: []splittest.Step{ - splittest.ExpectAdvanceToken(4, "hel"), - splittest.ExpectAdvanceToken(4, "lo"), - splittest.ExpectAdvanceToken(4, "wor"), - splittest.ExpectAdvanceToken(4, "ld"), - splittest.ExpectAdvanceToken(4, "ext"), - splittest.ExpectReadMore(), - splittest.Eventually(splittest.ExpectAdvanceToken(3, "ra"), - 200*time.Millisecond, 10*time.Millisecond), - }, - }, - } - - for _, tc := range testCases { - var splitFunc bufio.SplitFunc - if tc.flushPeriod > 0 { - s := &flush.State{LastDataChange: time.Now()} - splitFunc = Func(s.Func(tc.baseFunc, tc.flushPeriod), tc.maxLength, tc.trimFunc) - } else { - splitFunc = Func(tc.baseFunc, tc.maxLength, tc.trimFunc) - } - t.Run(tc.name, splittest.New(splitFunc, tc.input, tc.steps...)) - } -} diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index cc19ab44d5c3..723c8a31de54 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -22,21 +22,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -func TestCopyReaderWithoutFlusher(t *testing.T) { - f, _ := testReaderFactory(t, split.Config{}, defaultMaxLogSize, 0) - - temp := openTemp(t, t.TempDir()) - fp, err := f.NewFingerprint(temp) - require.NoError(t, err) - - r, err := f.NewReader(temp, fp) - require.NoError(t, err) - - // A copy of the reader should not panic - _, err = f.Copy(r, temp) - assert.NoError(t, err) -} - func TestPersistFlusher(t *testing.T) { flushPeriod := 100 * time.Millisecond f, emitChan := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod) @@ -60,7 +45,7 @@ func TestPersistFlusher(t *testing.T) { expectNoTokensUntil(t, emitChan, 2*flushPeriod) // A copy of the reader should remember that we last emitted about 200ms ago. - copyReader, err := f.Copy(r, temp) + copyReader, err := f.NewReaderFromMetadata(temp, r.Metadata) assert.NoError(t, err) // This time, the flusher will kick in and we should emit the unfinished log. diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 44d061dcb85c..0407333c7f4e 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -91,9 +91,7 @@ func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Ma } input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan)) require.NoError(t, err) - t.Cleanup(func() { - input.closeFiles() - }) + t.Cleanup(func() { input.closePreviousFiles() }) return input, tmc.emitChan } diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index 36d49e1a3b26..1906aed08b3f 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -61,7 +61,7 @@ func Whitespace(data []byte) []byte { } func ToLength(splitFunc bufio.SplitFunc, maxLength int) bufio.SplitFunc { - if maxLength == 0 { + if maxLength <= 0 { return splitFunc } return func(data []byte, atEOF bool) (int, []byte, error) {