Skip to content

Commit

Permalink
[chore][pkg/stanza] Skip persister operations if nil (open-telemetry#…
Browse files Browse the repository at this point in the history
…28580)

Although the persister is generally expected, we can easily protect
against cases where it is not provided and save some work as well. This
becomes more important with open-telemetry#27823 which interacts with the persister
during the Stop function.
  • Loading branch information
djaglowski authored and RoryCrispin committed Nov 24, 2023
1 parent a0addfc commit 210d94b
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.persister = persister

offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
}
}

Expand Down Expand Up @@ -169,12 +170,14 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

m.saveCurrent(readers)

rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
if m.persister != nil {
rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
if err := checkpoint.Save(ctx, m.persister, rmds); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}

m.clearCurrentFingerprints()
Expand Down

0 comments on commit 210d94b

Please sign in to comment.