diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index be635014373d..214dc8c187e5 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -226,12 +226,20 @@ func (i *Ingester) starting(ctx context.Context) error { } defer checkpointCloser.Close() - if err = RecoverCheckpoint(checkpointReader, recoverer); err != nil { + checkpointRecoveryErr := RecoverCheckpoint(checkpointReader, recoverer) + if checkpointRecoveryErr != nil { i.metrics.walCorruptionsTotal.WithLabelValues(walTypeCheckpoint).Inc() - level.Error(util.Logger).Log("msg", "failed to recover from checkpoint", "elapsed", time.Since(start).String()) - return err + level.Error(util.Logger).Log( + "msg", + `Recovered from checkpoint with errors. Some streams were likely not recovered due to WAL checkpoint file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.`, + "elapsed", time.Since(start).String(), + ) } - level.Info(util.Logger).Log("msg", "recovered from checkpoint", "elapsed", time.Since(start).String()) + level.Info(util.Logger).Log( + "msg", "recovered WAL checkpoint recovery finished", + "elapsed", time.Since(start).String(), + "errors", checkpointRecoveryErr != nil, + ) level.Info(util.Logger).Log("msg", "recovering from WAL") segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1) @@ -240,16 +248,24 @@ func (i *Ingester) starting(ctx context.Context) error { } defer segmentCloser.Close() - if err = RecoverWAL(segmentReader, recoverer); err != nil { + segmentRecoveryErr := RecoverWAL(segmentReader, recoverer) + if segmentRecoveryErr != nil { i.metrics.walCorruptionsTotal.WithLabelValues(walTypeSegment).Inc() - level.Error(util.Logger).Log("msg", "failed to recover from WAL segments", "elapsed", time.Since(start).String()) - return err + level.Error(util.Logger).Log( + "msg", + "Recovered from WAL segments with errors. Some streams and/or entries were likely not recovered due to WAL segment file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.", + "elapsed", time.Since(start).String(), + ) } - level.Info(util.Logger).Log("msg", "recovered from WAL segments", "elapsed", time.Since(start).String()) + level.Info(util.Logger).Log( + "msg", "WAL segment recovery finished", + "elapsed", time.Since(start).String(), + "errors", segmentRecoveryErr != nil, + ) elapsed := time.Since(start) i.metrics.walReplayDuration.Set(elapsed.Seconds()) - level.Info(util.Logger).Log("msg", "recovery completed", "time", elapsed.String()) + level.Info(util.Logger).Log("msg", "recovery finished", "time", elapsed.String()) } diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index 8f0c540951f9..fb2b355a687c 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -194,34 +194,32 @@ func (r *ingesterRecoverer) Done() <-chan struct{} { } func RecoverWAL(reader WALReader, recoverer Recoverer) error { - dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error { + dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error { rec := recordPool.GetRecord() if err := decodeWALRecord(b, rec); err != nil { return err } // First process all series to ensure we don't write entries to nonexistant series. + var firstErr error for _, s := range rec.Series { if err := recoverer.SetStream(rec.UserID, s); err != nil { - return err + if firstErr == nil { + firstErr = err + } } } for _, entries := range rec.RefEntries { worker := int(entries.Ref % uint64(len(inputs))) - select { - case err := <-errCh: - return err - - case inputs[worker] <- recoveryInput{ + inputs[worker] <- recoveryInput{ userID: rec.UserID, data: entries, - }: } } - return nil + return firstErr } process := func(recoverer Recoverer, input <-chan recoveryInput, errCh chan<- error) { @@ -244,11 +242,7 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error { // Pass the error back, but respect the quit signal. if err != nil { - select { - case errCh <- err: - case <-recoverer.Done(): - } - return + errCh <- err } } } @@ -264,23 +258,17 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error { } func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error { - dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error { + dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error { s := &Series{} if err := decodeCheckpointRecord(b, s); err != nil { return err } worker := int(s.Fingerprint % uint64(len(inputs))) - select { - case err := <-errCh: - return err - - case inputs[worker] <- recoveryInput{ + inputs[worker] <- recoveryInput{ userID: s.UserID, data: s, - }: } - return nil } @@ -302,13 +290,8 @@ func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error { err = recoverer.Series(series) } - // Pass the error back, but respect the quit signal. if err != nil { - select { - case errCh <- err: - case <-recoverer.Done(): - } - return + errCh <- err } } } @@ -335,11 +318,11 @@ type recoveryInput struct { func recoverGeneric( reader WALReader, recoverer Recoverer, - dispatch func(Recoverer, []byte, []chan recoveryInput, <-chan error) error, + dispatch func(Recoverer, []byte, []chan recoveryInput) error, process func(Recoverer, <-chan recoveryInput, chan<- error), ) error { var wg sync.WaitGroup - var lastErr error + var firstErr error nWorkers := recoverer.NumWorkers() if nWorkers < 1 { @@ -359,26 +342,24 @@ func recoverGeneric( } -outer: - for reader.Next() { - b := reader.Record() - if lastErr = reader.Err(); lastErr != nil { - break outer - } + go func() { + for reader.Next() { + b := reader.Record() + if err := reader.Err(); err != nil { + errCh <- err + continue + } - if lastErr = dispatch(recoverer, b, inputs, errCh); lastErr != nil { - break outer + if err := dispatch(recoverer, b, inputs); err != nil { + errCh <- err + continue + } } - } - for _, w := range inputs { - close(w) - } - - // may have broken loop early - if lastErr != nil { - return lastErr - } + for _, w := range inputs { + close(w) + } + }() finished := make(chan struct{}) go func(finished chan<- struct{}) { @@ -386,10 +367,14 @@ outer: finished <- struct{}{} }(finished) - select { - case <-finished: - case lastErr = <-errCh: + for { + select { + case <-finished: + return firstErr + case err := <-errCh: + if firstErr == nil { + firstErr = err + } + } } - - return lastErr }