Skip to content

Commit

Permalink
Wal/recover corruption (#3117)
Browse files Browse the repository at this point in the history
* simplifies recovery code and continues after errors

* does not exit on wal recovery failure

* log line parity

* less scary error msgs

* more descriptive errors

* Update pkg/ingester/ingester.go

Co-authored-by: Ed Welch <ed@oqqer.com>

* Update pkg/ingester/ingester.go

Co-authored-by: Ed Welch <ed@oqqer.com>

Co-authored-by: Ed Welch <ed@oqqer.com>
  • Loading branch information
owen-d and slim-bean authored Jan 5, 2021
1 parent 7682f13 commit ad803af
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 61 deletions.
34 changes: 25 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,20 @@ func (i *Ingester) starting(ctx context.Context) error {
}
defer checkpointCloser.Close()

if err = RecoverCheckpoint(checkpointReader, recoverer); err != nil {
checkpointRecoveryErr := RecoverCheckpoint(checkpointReader, recoverer)
if checkpointRecoveryErr != nil {
i.metrics.walCorruptionsTotal.WithLabelValues(walTypeCheckpoint).Inc()
level.Error(util.Logger).Log("msg", "failed to recover from checkpoint", "elapsed", time.Since(start).String())
return err
level.Error(util.Logger).Log(
"msg",
`Recovered from checkpoint with errors. Some streams were likely not recovered due to WAL checkpoint file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.`,
"elapsed", time.Since(start).String(),
)
}
level.Info(util.Logger).Log("msg", "recovered from checkpoint", "elapsed", time.Since(start).String())
level.Info(util.Logger).Log(
"msg", "recovered WAL checkpoint recovery finished",
"elapsed", time.Since(start).String(),
"errors", checkpointRecoveryErr != nil,
)

level.Info(util.Logger).Log("msg", "recovering from WAL")
segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1)
Expand All @@ -240,16 +248,24 @@ func (i *Ingester) starting(ctx context.Context) error {
}
defer segmentCloser.Close()

if err = RecoverWAL(segmentReader, recoverer); err != nil {
segmentRecoveryErr := RecoverWAL(segmentReader, recoverer)
if segmentRecoveryErr != nil {
i.metrics.walCorruptionsTotal.WithLabelValues(walTypeSegment).Inc()
level.Error(util.Logger).Log("msg", "failed to recover from WAL segments", "elapsed", time.Since(start).String())
return err
level.Error(util.Logger).Log(
"msg",
"Recovered from WAL segments with errors. Some streams and/or entries were likely not recovered due to WAL segment file corruptions (or WAL file deletions while Loki is running). No administrator action is needed and data loss is only a possibility if more than (replication factor / 2 + 1) ingesters suffer from this.",
"elapsed", time.Since(start).String(),
)
}
level.Info(util.Logger).Log("msg", "recovered from WAL segments", "elapsed", time.Since(start).String())
level.Info(util.Logger).Log(
"msg", "WAL segment recovery finished",
"elapsed", time.Since(start).String(),
"errors", segmentRecoveryErr != nil,
)

elapsed := time.Since(start)
i.metrics.walReplayDuration.Set(elapsed.Seconds())
level.Info(util.Logger).Log("msg", "recovery completed", "time", elapsed.String())
level.Info(util.Logger).Log("msg", "recovery finished", "time", elapsed.String())

}

Expand Down
89 changes: 37 additions & 52 deletions pkg/ingester/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,34 +194,32 @@ func (r *ingesterRecoverer) Done() <-chan struct{} {
}

func RecoverWAL(reader WALReader, recoverer Recoverer) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error {
rec := recordPool.GetRecord()
if err := decodeWALRecord(b, rec); err != nil {
return err
}

// First process all series to ensure we don't write entries to nonexistant series.
var firstErr error
for _, s := range rec.Series {
if err := recoverer.SetStream(rec.UserID, s); err != nil {
return err
if firstErr == nil {
firstErr = err
}
}

}

for _, entries := range rec.RefEntries {
worker := int(entries.Ref % uint64(len(inputs)))
select {
case err := <-errCh:
return err

case inputs[worker] <- recoveryInput{
inputs[worker] <- recoveryInput{
userID: rec.UserID,
data: entries,
}:
}
}

return nil
return firstErr
}

process := func(recoverer Recoverer, input <-chan recoveryInput, errCh chan<- error) {
Expand All @@ -244,11 +242,7 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error {

// Pass the error back, but respect the quit signal.
if err != nil {
select {
case errCh <- err:
case <-recoverer.Done():
}
return
errCh <- err
}
}
}
Expand All @@ -264,23 +258,17 @@ func RecoverWAL(reader WALReader, recoverer Recoverer) error {
}

func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput) error {
s := &Series{}
if err := decodeCheckpointRecord(b, s); err != nil {
return err
}

worker := int(s.Fingerprint % uint64(len(inputs)))
select {
case err := <-errCh:
return err

case inputs[worker] <- recoveryInput{
inputs[worker] <- recoveryInput{
userID: s.UserID,
data: s,
}:
}

return nil
}

Expand All @@ -302,13 +290,8 @@ func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error {
err = recoverer.Series(series)
}

// Pass the error back, but respect the quit signal.
if err != nil {
select {
case errCh <- err:
case <-recoverer.Done():
}
return
errCh <- err
}
}
}
Expand All @@ -335,11 +318,11 @@ type recoveryInput struct {
func recoverGeneric(
reader WALReader,
recoverer Recoverer,
dispatch func(Recoverer, []byte, []chan recoveryInput, <-chan error) error,
dispatch func(Recoverer, []byte, []chan recoveryInput) error,
process func(Recoverer, <-chan recoveryInput, chan<- error),
) error {
var wg sync.WaitGroup
var lastErr error
var firstErr error
nWorkers := recoverer.NumWorkers()

if nWorkers < 1 {
Expand All @@ -359,37 +342,39 @@ func recoverGeneric(

}

outer:
for reader.Next() {
b := reader.Record()
if lastErr = reader.Err(); lastErr != nil {
break outer
}
go func() {
for reader.Next() {
b := reader.Record()
if err := reader.Err(); err != nil {
errCh <- err
continue
}

if lastErr = dispatch(recoverer, b, inputs, errCh); lastErr != nil {
break outer
if err := dispatch(recoverer, b, inputs); err != nil {
errCh <- err
continue
}
}
}

for _, w := range inputs {
close(w)
}

// may have broken loop early
if lastErr != nil {
return lastErr
}
for _, w := range inputs {
close(w)
}
}()

finished := make(chan struct{})
go func(finished chan<- struct{}) {
wg.Wait()
finished <- struct{}{}
}(finished)

select {
case <-finished:
case lastErr = <-errCh:
for {
select {
case <-finished:
return firstErr
case err := <-errCh:
if firstErr == nil {
firstErr = err
}
}
}

return lastErr
}

0 comments on commit ad803af

Please sign in to comment.