Skip to content

Commit

Permalink
[pkg/stanza] Major overhaul of how we manage readers
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Oct 25, 2023
1 parent 3eb1507 commit 72f421f
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 103 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}
Expand Down
88 changes: 36 additions & 52 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Manager struct {
maxBatchFiles int

previousPollFiles []*reader.Reader
knownFiles []*reader.Reader
knownFiles []*reader.Metadata
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
Expand All @@ -52,9 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
m.knownFiles = append(m.knownFiles, offsets...)
}
}

Expand All @@ -68,20 +66,25 @@ func (m *Manager) Start(persister operator.Persister) error {
return nil
}

func (m *Manager) closeFiles() {
for _, r := range m.previousPollFiles {
r.Close()
func (m *Manager) closePreviousFiles() {
if forgetNum := len(m.previousPollFiles) + len(m.knownFiles) - cap(m.knownFiles); forgetNum > 0 {
m.knownFiles = m.knownFiles[forgetNum:]
}
for _, r := range m.knownFiles {
r.Close()
for _, r := range m.previousPollFiles {
m.knownFiles = append(m.knownFiles, r.Close())
}
}

// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
m.cancel()
m.wg.Wait()
m.closeFiles()
m.closePreviousFiles()
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
m.cancel = nil
return nil
}
Expand Down Expand Up @@ -136,6 +139,11 @@ func (m *Manager) poll(ctx context.Context) {

// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.knownFiles); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
}

func (m *Manager) consume(ctx context.Context, paths []string) {
Expand All @@ -152,7 +160,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
m.readLostFiles(ctx, readers)
m.closePreviousFiles()

// read new readers to end
var wg sync.WaitGroup
for _, r := range readers {
wg.Add(1)
Expand All @@ -163,23 +173,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

for _, r := range m.previousPollFiles {
r.Close()
}
m.previousPollFiles = readers

m.saveCurrent(readers)

if m.persister != nil {
rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}

m.clearCurrentFingerprints()
}

Expand Down Expand Up @@ -257,38 +251,28 @@ func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*fingerprint.Fingerprint, 0)
}

// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (m *Manager) saveCurrent(readers []*reader.Reader) {
forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles)
if forgetNum > 0 {
m.knownFiles = append(m.knownFiles[forgetNum:], readers...)
return
}
m.knownFiles = append(m.knownFiles, readers...)
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.Copy(oldReader, file)
// Check previous poll cycle for match
for i := 0; i < len(m.previousPollFiles); i++ {
oldReader := m.previousPollFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Keep the new reader and discard the old. This ensures that if the file was
// copied to another location and truncated, our handle is updated.
m.previousPollFiles = append(m.previousPollFiles[:i], m.previousPollFiles[i+1:]...)
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}
}

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
if fp.StartsWith(oldReader.Fingerprint) {
// Remove the old reader from the list of known files. We will
// add it back in saveCurrent if it is still alive.
oldMetadata := m.knownFiles[i]
if fp.StartsWith(oldMetadata.Fingerprint) {
// Remove the old metadata from the list. We will keep updating it and save it again later.
m.knownFiles = append(m.knownFiles[:i], m.knownFiles[i+1:]...)
return oldReader, true
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
}
return nil, false

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.NewReader(file, fp)
}
42 changes: 14 additions & 28 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)
Expand All @@ -32,40 +31,27 @@ type Factory struct {
TrimFunc trim.Func
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
}

func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) {
m := &Metadata{
Fingerprint: fp,
FileAttributes: map[string]any{},
}
m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}}
if f.Config.FlushTimeout > 0 {
m.FlushState = &flush.State{LastDataChange: time.Now()}
}
return f.build(file, m)
}

// copy creates a deep copy of a reader
func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) {
return f.build(newFile, &Metadata{
Fingerprint: old.Fingerprint.Copy(),
Offset: old.Offset,
FileAttributes: util.MapCopy(old.FileAttributes),
HeaderFinalized: old.HeaderFinalized,
FlushState: old.FlushState.Copy(),
})
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
return fingerprint.New(file, f.Config.FingerprintSize)
return f.NewReaderFromMetadata(file, m)
}

func (f *Factory) build(file *os.File, m *Metadata) (r *Reader, err error) {
func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
r = &Reader{
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
Config: f.Config,
Metadata: m,
file: file,
fileName: file.Name(),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
lineSplitFunc: f.SplitFunc,
}

flushFunc := m.FlushState.Func(f.SplitFunc, f.Config.FlushTimeout)
Expand Down
7 changes: 5 additions & 2 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (r *Reader) delete() {
}
}

// Close will close the file
func (r *Reader) Close() {
// Close will close the file and return the metadata
func (r *Reader) Close() *Metadata {
if r.file != nil {
if err := r.file.Close(); err != nil {
r.logger.Debugw("Problem closing reader", zap.Error(err))
Expand All @@ -160,6 +160,9 @@ func (r *Reader) Close() {
r.logger.Errorw("Failed to stop header pipeline", zap.Error(err))
}
}
m := r.Metadata
r.Metadata = nil
return m
}

// Read from the file and update the fingerprint if necessary
Expand Down
17 changes: 1 addition & 16 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func TestCopyReaderWithoutFlusher(t *testing.T) {
f, _ := testReaderFactory(t, split.Config{}, defaultMaxLogSize, 0)

temp := openTemp(t, t.TempDir())
fp, err := f.NewFingerprint(temp)
require.NoError(t, err)

r, err := f.NewReader(temp, fp)
require.NoError(t, err)

// A copy of the reader should not panic
_, err = f.Copy(r, temp)
assert.NoError(t, err)
}

func TestPersistFlusher(t *testing.T) {
flushPeriod := 100 * time.Millisecond
f, emitChan := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod)
Expand All @@ -60,7 +45,7 @@ func TestPersistFlusher(t *testing.T) {
expectNoTokensUntil(t, emitChan, 2*flushPeriod)

// A copy of the reader should remember that we last emitted about 200ms ago.
copyReader, err := f.Copy(r, temp)
copyReader, err := f.NewReaderFromMetadata(temp, r.Metadata)
assert.NoError(t, err)

// This time, the flusher will kick in and we should emit the unfinished log.
Expand Down
4 changes: 1 addition & 3 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ func buildTestManager(t *testing.T, cfg *Config, opts ...testManagerOption) (*Ma
}
input, err := cfg.Build(testutil.Logger(t), testEmitFunc(tmc.emitChan))
require.NoError(t, err)
t.Cleanup(func() {
input.closeFiles()
})
t.Cleanup(func() { input.closePreviousFiles() })
return input, tmc.emitChan
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/trim/trim.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Whitespace(data []byte) []byte {
}

func ToLength(splitFunc bufio.SplitFunc, maxLength int) bufio.SplitFunc {
if maxLength == 0 {
if maxLength <= 0 {
return splitFunc
}
return func(data []byte, atEOF bool) (int, []byte, error) {
Expand Down

0 comments on commit 72f421f

Please sign in to comment.