From 437feee9c26b9aee48ff4d633a533260981c9e45 Mon Sep 17 00:00:00 2001 From: shreyas-s-rao Date: Fri, 1 Mar 2019 14:39:43 +0530 Subject: [PATCH] wal: add Verify function to perform corruption check on wal contents Signed-off-by: Shreyas Rao --- wal/wal.go | 149 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 123 insertions(+), 26 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 96d01a23af69..c2dd13ae9e7b 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -223,17 +223,55 @@ func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) { } func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { - names, err := readWalNames(dirpath) + names, nameIndex, err := selectWALFiles(dirpath, snap) if err != nil { return nil, err } + rs, ls, closer, err := openWALFiles(dirpath, names, nameIndex, write) + if err != nil { + return nil, err + } + + // create a WAL ready for reading + w := &WAL{ + dir: dirpath, + start: snap, + decoder: newDecoder(rs...), + readClose: closer, + locks: ls, + } + + if write { + // write reuses the file descriptors from read; don't close so + // WAL can append without dropping the file lock + w.readClose = nil + if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil { + closer() + return nil, err + } + w.fp = newFilePipeline(w.dir, SegmentSizeBytes) + } + + return w, nil +} + +func selectWALFiles(dirpath string, snap walpb.Snapshot) ([]string, int, error) { + names, err := readWALNames(dirpath) + if err != nil { + return nil, -1, err + } + nameIndex, ok := searchIndex(names, snap.Index) if !ok || !isValidSeq(names[nameIndex:]) { - return nil, ErrFileNotFound + err = ErrFileNotFound + return nil, -1, err } - // open the wal files + return names, nameIndex, nil +} + +func openWALFiles(dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) { rcs := make([]io.ReadCloser, 0) rs := make([]io.Reader, 0) ls := make([]*fileutil.LockedFile, 0) @@ -243,7 +281,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) if err != nil { closeAll(rcs...) - return nil, err + return nil, nil, nil, err } ls = append(ls, l) rcs = append(rcs, l) @@ -251,7 +289,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode) if err != nil { closeAll(rcs...) - return nil, err + return nil, nil, nil, err } ls = append(ls, nil) rcs = append(rcs, rf) @@ -261,27 +299,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) closer := func() error { return closeAll(rcs...) } - // create a WAL ready for reading - w := &WAL{ - dir: dirpath, - start: snap, - decoder: newDecoder(rs...), - readClose: closer, - locks: ls, - } - - if write { - // write reuses the file descriptors from read; don't close so - // WAL can append without dropping the file lock - w.readClose = nil - if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil { - closer() - return nil, err - } - w.fp = newFilePipeline(w.dir, SegmentSizeBytes) - } - - return w, nil + return rs, ls, closer, nil } // ReadAll reads out records of the current WAL. @@ -398,6 +416,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// Verify reads through the given WAL and verifies that it is not corrupted. +// It creates a new decoder to read through the records of the given WAL. +// It does not conflict with any open WAL, but it is recommended not to +// call this function after opening the WAL for writing. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If the loaded snap doesn't match with the expected one, it will +// return error ErrSnapshotMismatch. +func Verify(walDir string, snap walpb.Snapshot) error { + var metadata []byte + var err error + var match bool + + rec := &walpb.Record{} + + names, nameIndex, err := selectWALFiles(walDir, snap) + if err != nil { + return err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(walDir, names, nameIndex, false) + if err != nil { + return err + } + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case metadataType: + if metadata != nil && !bytes.Equal(metadata, rec.Data) { + return ErrMetadataConflict + } + metadata = rec.Data + case crcType: + crc := decoder.crc.Sum32() + // Current crc of decoder must match the crc of the record. + // We need not match 0 crc, since the decoder is a new one at this point. + if crc != 0 && rec.Validate(crc) != nil { + return ErrCRCMismatch + } + decoder.updateCRC(rec.Crc) + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + if loadedSnap.Index == snap.Index { + if loadedSnap.Term != snap.Term { + return ErrSnapshotMismatch + } + match = true + } + // We ignore all entry and state type records as these + // are not necessary for validating the WAL contents + case entryType: + case stateType: + default: + return fmt.Errorf("unexpected block type %d", rec.Type) + } + } + + if closer != nil { + closer() + } + + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return err + } + + if !match { + return ErrSnapshotNotFound + } + + return nil +} + // cut closes current file written and creates a new one ready to append. // cut first creates a temp wal file and writes necessary headers into it. // Then cut atomically rename temp wal file to a wal file.