Skip to content

Commit

Permalink
[chore][pkg/stanaza] Remove currentFps from fileconsumer.Manager (ope…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and RoryCrispin committed Nov 24, 2023
1 parent ffe2723 commit 4b91be0
Showing 1 changed file with 26 additions and 43 deletions.
69 changes: 26 additions & 43 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type Manager struct {

previousPollFiles []*reader.Reader
knownFiles []*reader.Metadata

currentFps []*fingerprint.Fingerprint
}

func (m *Manager) Start(persister operator.Persister) error {
Expand Down Expand Up @@ -146,14 +144,8 @@ func (m *Manager) poll(ctx context.Context) {
}

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
readers := make([]*reader.Reader, 0, len(paths))
for _, path := range paths {
r := m.makeReader(path)
if r != nil {
readers = append(readers, r)
}
}
m.Debug("Consuming files", zap.Strings("paths", paths))
readers := m.makeReaders(paths)

// take care of files which disappeared from the pattern since the last poll cycle
// this can mean either files which were removed, or rotated into a name not matching the pattern
Expand All @@ -173,7 +165,6 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
wg.Wait()

m.previousPollFiles = readers
m.clearCurrentFingerprints()
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -201,45 +192,37 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
return fp, file
}

func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
for i := 0; i < len(m.currentFps); i++ {
if fp.Equal(m.currentFps[i]) {
return true
}
}
return false
}

// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *reader.Reader {
// Open the files first to minimize the time between listing and opening
fp, file := m.makeFingerprint(path)
if fp == nil {
return nil
}

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
if m.checkDuplicates(fp) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
func (m *Manager) makeReaders(paths []string) []*reader.Reader {
readers := make([]*reader.Reader, 0, len(paths))
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}
return nil
}

m.currentFps = append(m.currentFps, fp)
reader, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
return nil
}
// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
for _, r := range readers {
if fp.Equal(r.Fingerprint) {
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
continue
}
}

return reader
}
r, err := m.newReader(file, fp)
if err != nil {
m.Errorw("Failed to create reader", zap.Error(err))
continue
}

func (m *Manager) clearCurrentFingerprints() {
m.currentFps = make([]*fingerprint.Fingerprint, 0)
readers = append(readers, r)
}
return readers
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
Expand Down

0 comments on commit 4b91be0

Please sign in to comment.